diff --git a/src/oatpp/core/async/Coroutine.hpp b/src/oatpp/core/async/Coroutine.hpp index ecfbd2f2..eb92801e 100644 --- a/src/oatpp/core/async/Coroutine.hpp +++ b/src/oatpp/core/async/Coroutine.hpp @@ -401,7 +401,6 @@ private: public: CoroutineHandle(Processor* processor, AbstractCoroutine* rootCoroutine); - ~CoroutineHandle(); Action takeAction(Action&& action); diff --git a/src/oatpp/core/async/Processor.cpp b/src/oatpp/core/async/Processor.cpp index d3314e01..6fe750a8 100644 --- a/src/oatpp/core/async/Processor.cpp +++ b/src/oatpp/core/async/Processor.cpp @@ -151,8 +151,7 @@ void Processor::popTasks() { void Processor::consumeAllTasks() { for(auto& submission : m_taskList) { - auto coroutine = submission->createCoroutine(); - m_queue.pushBack(new CoroutineHandle(this, coroutine)); + m_queue.pushBack(submission->createCoroutine(this)); } m_taskList.clear(); } @@ -197,55 +196,43 @@ bool Processor::iterate(v_int32 numIterations) { for(v_int32 i = 0; i < numIterations; i++) { - for(v_int32 j = 0; j < 10; j ++) { + auto CP = m_queue.first; + if (CP == nullptr) { + goto end_loop; + } + if (CP->finished()) { + m_queue.popFrontNoData(); + -- m_tasksCounter; + } else { - auto CP = m_queue.first; - if (CP == nullptr) { - goto end_loop; - } - if (CP->finished()) { - m_queue.popFrontNoData(); - -- m_tasksCounter; - } else { + const Action &action = CP->iterateAndTakeAction(); - const Action &action = CP->iterateAndTakeAction(); + switch (action.m_type) { - switch (action.m_type) { + case Action::TYPE_IO_WAIT: + CP->_SCH_A = Action::clone(action); + m_queue.popFront(); + popIOTask(CP); + break; - case Action::TYPE_IO_WAIT: - CP->_SCH_A = Action::clone(action); - m_queue.popFront(); - popIOTask(CP); - break; + case Action::TYPE_WAIT_REPEAT: + CP->_SCH_A = Action::clone(action); + m_queue.popFront(); + popTimerTask(CP); + break; -// case Action::TYPE_IO_REPEAT: // DO NOT RESCHEDULE COROUTINE WITH ACTIVE I/O -// CP->_SCH_A = Action::clone(action); -// m_queue.popFront(); -// popIOTask(CP); -// break; - - case Action::TYPE_WAIT_REPEAT: - CP->_SCH_A = Action::clone(action); - m_queue.popFront(); - popTimerTask(CP); - break; - - case Action::TYPE_WAIT_LIST: - CP->_SCH_A = Action::createActionByType(Action::TYPE_NONE); - m_queue.popFront(); - action.m_data.waitList->pushBack(CP); - break; - -// default: -// m_queue.round(); - } + case Action::TYPE_WAIT_LIST: + CP->_SCH_A = Action::createActionByType(Action::TYPE_NONE); + m_queue.popFront(); + action.m_data.waitList->pushBack(CP); + break; + default: + m_queue.round(); } } - m_queue.round(); - } end_loop: diff --git a/src/oatpp/core/async/Processor.hpp b/src/oatpp/core/async/Processor.hpp index e24f2f31..d6308109 100644 --- a/src/oatpp/core/async/Processor.hpp +++ b/src/oatpp/core/async/Processor.hpp @@ -46,7 +46,7 @@ private: class TaskSubmission { public: virtual ~TaskSubmission() {}; - virtual AbstractCoroutine* createCoroutine() = 0; + virtual CoroutineHandle* createCoroutine(Processor* processor) = 0; }; /* @@ -73,13 +73,13 @@ private: : m_params(std::make_tuple(params...)) {} - virtual AbstractCoroutine* createCoroutine() { - return creator(typename SequenceGenerator::type()); + virtual CoroutineHandle* createCoroutine(Processor* processor) { + return creator(processor, typename SequenceGenerator::type()); } template - AbstractCoroutine* creator(IndexSequence) { - return new CoroutineType(std::get(m_params) ...); + CoroutineHandle* creator(Processor* processor, IndexSequence) { + return new CoroutineHandle(processor, new CoroutineType(std::get(m_params) ...)); } }; diff --git a/src/oatpp/core/data/buffer/FIFOBuffer.cpp b/src/oatpp/core/data/buffer/FIFOBuffer.cpp index 0a16ea1c..138b7185 100644 --- a/src/oatpp/core/data/buffer/FIFOBuffer.cpp +++ b/src/oatpp/core/data/buffer/FIFOBuffer.cpp @@ -377,14 +377,14 @@ async::CoroutineStarter FIFOBuffer::flushToStreamAsync(const std::shared_ptr { private: - std::shared_ptr m_fifo; + FIFOBuffer* m_fifo; std::shared_ptr m_stream; private: data::stream::AsyncInlineWriteData m_data1; data::stream::AsyncInlineWriteData m_data2; public: - FlushCoroutine(const std::shared_ptr& fifo, const std::shared_ptr& stream) + FlushCoroutine(FIFOBuffer* fifo, const std::shared_ptr& stream) : m_fifo(fifo) , m_stream(stream) {} @@ -428,7 +428,7 @@ async::CoroutineStarter FIFOBuffer::flushToStreamAsync(const std::shared_ptr { +class FIFOBuffer { private: p_char8 m_buffer; v_io_size m_bufferSize; diff --git a/src/oatpp/core/data/share/MemoryLabel.hpp b/src/oatpp/core/data/share/MemoryLabel.hpp index 3d4026a8..3818404d 100644 --- a/src/oatpp/core/data/share/MemoryLabel.hpp +++ b/src/oatpp/core/data/share/MemoryLabel.hpp @@ -51,6 +51,12 @@ public: , m_size(0) {} + /** + * Constructor. + * @param str + */ + MemoryLabel(const std::shared_ptr& str) : MemoryLabel(str, str->getData(), str->getSize()) {} + /** * Constructor. * @param memHandle - memory handle. `std::shared_ptr` to buffer pointed by a memory label. diff --git a/src/oatpp/core/data/stream/StreamBufferedProxy.cpp b/src/oatpp/core/data/stream/StreamBufferedProxy.cpp index 2e51a677..dc7e077f 100644 --- a/src/oatpp/core/data/stream/StreamBufferedProxy.cpp +++ b/src/oatpp/core/data/stream/StreamBufferedProxy.cpp @@ -27,12 +27,12 @@ namespace oatpp { namespace data{ namespace stream { data::v_io_size OutputStreamBufferedProxy::write(const void *data, data::v_io_size count) { - if(m_buffer->availableToWrite() > 0) { - return m_buffer->write(data, count); + if(m_buffer.availableToWrite() > 0) { + return m_buffer.write(data, count); } else { - auto bytesFlushed = m_buffer->readAndWriteToStream(m_outputStream.get(), m_buffer->getBufferSize()); + auto bytesFlushed = m_buffer.readAndWriteToStream(m_outputStream.get(), m_buffer.getBufferSize()); if(bytesFlushed > 0) { - return m_buffer->write(data, count); + return m_buffer.write(data, count); } return bytesFlushed; } @@ -54,11 +54,11 @@ oatpp::data::stream::IOMode OutputStreamBufferedProxy::getOutputStreamIOMode() { } data::v_io_size OutputStreamBufferedProxy::flush() { - return m_buffer->flushToStream(m_outputStream.get()); + return m_buffer.flushToStream(m_outputStream.get()); } oatpp::async::CoroutineStarter OutputStreamBufferedProxy::flushAsync() { - return m_buffer->flushToStreamAsync(m_outputStream); + return m_buffer.flushToStreamAsync(m_outputStream); } data::v_io_size InputStreamBufferedProxy::read(void *data, data::v_io_size count) { diff --git a/src/oatpp/core/data/stream/StreamBufferedProxy.hpp b/src/oatpp/core/data/stream/StreamBufferedProxy.hpp index 3b18fceb..9e604b17 100644 --- a/src/oatpp/core/data/stream/StreamBufferedProxy.hpp +++ b/src/oatpp/core/data/stream/StreamBufferedProxy.hpp @@ -27,7 +27,7 @@ #include "Stream.hpp" #include "oatpp/core/data/buffer/FIFOBuffer.hpp" -#include "oatpp/core/data/buffer/IOBuffer.hpp" +#include "oatpp/core/data/share/MemoryLabel.hpp" #include "oatpp/core/async/Coroutine.hpp" namespace oatpp { namespace data{ namespace stream { @@ -40,36 +40,19 @@ public: typedef v_int32 v_bufferSize; private: std::shared_ptr m_outputStream; - std::shared_ptr m_bufferPtr; - std::shared_ptr m_buffer; + oatpp::data::share::MemoryLabel m_memoryLabel; + buffer::FIFOBuffer m_buffer; public: OutputStreamBufferedProxy(const std::shared_ptr& outputStream, - const std::shared_ptr& bufferPtr, - p_char8 buffer, - v_bufferSize bufferSize) + const oatpp::data::share::MemoryLabel& memoryLabel) : m_outputStream(outputStream) - , m_bufferPtr(bufferPtr) - , m_buffer(std::make_shared(buffer, bufferSize)) + , m_memoryLabel(memoryLabel) + , m_buffer(memoryLabel.getData(), memoryLabel.getSize()) {} public: - static std::shared_ptr createShared(const std::shared_ptr& outputStream, - const std::shared_ptr& buffer) - { - return Shared_OutputStreamBufferedProxy_Pool::allocateShared(outputStream, - buffer, - (p_char8) buffer->getData(), - buffer->getSize()); - } - - static std::shared_ptr createShared(const std::shared_ptr& outputStream, - p_char8 buffer, - v_bufferSize bufferSize) - { - return Shared_OutputStreamBufferedProxy_Pool::allocateShared(outputStream, - nullptr, - buffer, - bufferSize); + static std::shared_ptr createShared(const std::shared_ptr& outputStream, const oatpp::data::share::MemoryLabel& memoryLabel) { + return Shared_OutputStreamBufferedProxy_Pool::allocateShared(outputStream, memoryLabel); } data::v_io_size write(const void *data, data::v_io_size count) override; @@ -92,7 +75,7 @@ public: oatpp::async::CoroutineStarter flushAsync(); void setBufferPosition(data::v_io_size readPosition, data::v_io_size writePosition, bool canRead) { - m_buffer->setBufferPosition(readPosition, writePosition, canRead); + m_buffer.setBufferPosition(readPosition, writePosition, canRead); } }; @@ -105,72 +88,33 @@ public: typedef v_int32 v_bufferSize; protected: std::shared_ptr m_inputStream; - std::shared_ptr m_bufferPtr; + oatpp::data::share::MemoryLabel m_memoryLabel; buffer::FIFOBuffer m_buffer; public: InputStreamBufferedProxy(const std::shared_ptr& inputStream, - const std::shared_ptr& bufferPtr, - p_char8 buffer, - v_bufferSize bufferSize, + const oatpp::data::share::MemoryLabel& memoryLabel, data::v_io_size bufferReadPosition, data::v_io_size bufferWritePosition, bool bufferCanRead) : m_inputStream(inputStream) - , m_bufferPtr(bufferPtr) - , m_buffer(buffer, bufferSize, bufferReadPosition, bufferWritePosition, bufferCanRead) + , m_memoryLabel(memoryLabel) + , m_buffer(memoryLabel.getData(), memoryLabel.getSize(), bufferReadPosition, bufferWritePosition, bufferCanRead) {} public: static std::shared_ptr createShared(const std::shared_ptr& inputStream, - const std::shared_ptr& buffer) + const oatpp::data::share::MemoryLabel& memoryLabel) { - return Shared_InputStreamBufferedProxy_Pool::allocateShared(inputStream, - buffer, - (p_char8) buffer->getData(), - buffer->getSize(), - 0, 0, false); + return Shared_InputStreamBufferedProxy_Pool::allocateShared(inputStream, memoryLabel, 0, 0, false); } static std::shared_ptr createShared(const std::shared_ptr& inputStream, - const std::shared_ptr& buffer, + const oatpp::data::share::MemoryLabel& memoryLabel, data::v_io_size bufferReadPosition, data::v_io_size bufferWritePosition, bool bufferCanRead) { - return Shared_InputStreamBufferedProxy_Pool::allocateShared(inputStream, - buffer, - (p_char8) buffer->getData(), - buffer->getSize(), - bufferReadPosition, - bufferWritePosition, - bufferCanRead); - } - - static std::shared_ptr createShared(const std::shared_ptr& inputStream, - p_char8 buffer, - v_bufferSize bufferSize) - { - return Shared_InputStreamBufferedProxy_Pool::allocateShared(inputStream, - nullptr, - buffer, - bufferSize, - 0, 0, false); - } - - static std::shared_ptr createShared(const std::shared_ptr& inputStream, - p_char8 buffer, - v_bufferSize bufferSize, - data::v_io_size bufferReadPosition, - data::v_io_size bufferWritePosition, - bool bufferCanRead) - { - return Shared_InputStreamBufferedProxy_Pool::allocateShared(inputStream, - nullptr, - buffer, - bufferSize, - bufferReadPosition, - bufferWritePosition, - bufferCanRead); + return Shared_InputStreamBufferedProxy_Pool::allocateShared(inputStream, memoryLabel, bufferReadPosition, bufferWritePosition, bufferCanRead); } data::v_io_size read(void *data, data::v_io_size count) override; diff --git a/src/oatpp/web/client/HttpRequestExecutor.cpp b/src/oatpp/web/client/HttpRequestExecutor.cpp index bc08db98..1aca2f8e 100644 --- a/src/oatpp/web/client/HttpRequestExecutor.cpp +++ b/src/oatpp/web/client/HttpRequestExecutor.cpp @@ -115,14 +115,14 @@ HttpRequestExecutor::execute(const String& method, auto request = oatpp::web::protocol::http::outgoing::Request::createShared(method, path, headers, body); request->putHeaderIfNotExists(oatpp::web::protocol::http::Header::HOST, m_connectionProvider->getProperty("host")); request->putHeaderIfNotExists(oatpp::web::protocol::http::Header::CONNECTION, oatpp::web::protocol::http::Header::Value::CONNECTION_KEEP_ALIVE); - - auto ioBuffer = oatpp::data::buffer::IOBuffer::createShared(); - oatpp::data::stream::OutputStreamBufferedProxy upStream(connection, ioBuffer, (p_char8)ioBuffer->getData(), ioBuffer->getSize()); + oatpp::data::share::MemoryLabel buffer(oatpp::base::StrBuffer::createShared(oatpp::data::buffer::IOBuffer::BUFFER_SIZE)); + + oatpp::data::stream::OutputStreamBufferedProxy upStream(connection, buffer); request->send(&upStream); upStream.flush(); - oatpp::web::protocol::http::incoming::ResponseHeadersReader headerReader(ioBuffer->getData(), ioBuffer->getSize(), 4096); + oatpp::web::protocol::http::incoming::ResponseHeadersReader headerReader(buffer, 4096); oatpp::web::protocol::http::HttpError::Info error; const auto& result = headerReader.readHeaders(connection, error); @@ -137,7 +137,7 @@ HttpRequestExecutor::execute(const String& method, } auto bodyStream = oatpp::data::stream::InputStreamBufferedProxy::createShared(connection, - ioBuffer, + buffer, result.bufferPosStart, result.bufferPosEnd, result.bufferPosStart != result.bufferPosEnd); @@ -169,7 +169,7 @@ HttpRequestExecutor::executeAsync(const String& method, std::shared_ptr m_upstream; private: std::shared_ptr m_connection; - std::shared_ptr m_ioBuffer; + oatpp::data::share::MemoryLabel m_buffer; public: ExecutorCoroutine(const std::shared_ptr& connectionProvider, @@ -205,20 +205,20 @@ HttpRequestExecutor::executeAsync(const String& method, auto request = oatpp::web::protocol::http::outgoing::Request::createShared(m_method, m_path, m_headers, m_body); request->putHeaderIfNotExists(Header::HOST, m_connectionProvider->getProperty("host")); request->putHeaderIfNotExists(Header::CONNECTION, Header::Value::CONNECTION_KEEP_ALIVE); - m_ioBuffer = oatpp::data::buffer::IOBuffer::createShared(); - m_upstream = oatpp::data::stream::OutputStreamBufferedProxy::createShared(connection, m_ioBuffer); + m_buffer = oatpp::data::share::MemoryLabel(oatpp::base::StrBuffer::createShared(oatpp::data::buffer::IOBuffer::BUFFER_SIZE)); + m_upstream = oatpp::data::stream::OutputStreamBufferedProxy::createShared(connection, m_buffer); return request->sendAsync(m_upstream).next(m_upstream->flushAsync()).next(yieldTo(&ExecutorCoroutine::readResponse)); } Action readResponse() { - ResponseHeadersReader headersReader(m_ioBuffer->getData(), m_ioBuffer->getSize(), 4096); + ResponseHeadersReader headersReader(m_buffer, 4096); return headersReader.readHeadersAsync(m_connection).callbackTo(&ExecutorCoroutine::onHeadersParsed); } Action onHeadersParsed(const ResponseHeadersReader::Result& result) { auto bodyStream = oatpp::data::stream::InputStreamBufferedProxy::createShared(m_connection, - m_ioBuffer, + m_buffer, result.bufferPosStart, result.bufferPosEnd, result.bufferPosStart != result.bufferPosEnd); diff --git a/src/oatpp/web/protocol/http/incoming/RequestHeadersReader.cpp b/src/oatpp/web/protocol/http/incoming/RequestHeadersReader.cpp index 1ae7740f..b2cf18a0 100644 --- a/src/oatpp/web/protocol/http/incoming/RequestHeadersReader.cpp +++ b/src/oatpp/web/protocol/http/incoming/RequestHeadersReader.cpp @@ -37,23 +37,24 @@ data::v_io_size RequestHeadersReader::readHeadersSection(data::stream::InputStre data::v_io_size res; while (true) { - v_int32 desiredToRead = m_bufferSize; + v_int32 desiredToRead = m_buffer.getSize(); if(progress + desiredToRead > m_maxHeadersSize) { desiredToRead = m_maxHeadersSize - progress; if(desiredToRead <= 0) { return -1; } } - - res = stream->peek(m_buffer, desiredToRead); + + auto bufferData = m_buffer.getData(); + res = stream->peek(bufferData, desiredToRead); if(res > 0) { - bufferStream->write(m_buffer, res); + bufferStream->write(bufferData, res); progress += res; for(v_int32 i = 0; i < res; i ++) { accumulator <<= 8; - accumulator |= m_buffer[i]; + accumulator |= bufferData[i]; if(accumulator == SECTION_END) { stream->commitReadOffset(i + 1); return res; @@ -104,8 +105,7 @@ RequestHeadersReader::readHeadersAsync(const std::shared_ptr { private: std::shared_ptr m_stream; - p_char8 m_buffer; - v_int32 m_bufferSize; + oatpp::data::share::MemoryLabel m_buffer; v_int32 m_maxHeadersSize; v_word32 m_accumulator; v_int32 m_progress; @@ -114,10 +114,9 @@ RequestHeadersReader::readHeadersAsync(const std::shared_ptr& stream, - p_char8 buffer, v_int32 bufferSize, v_int32 maxHeadersSize) + const oatpp::data::share::MemoryLabel& buffer, v_int32 maxHeadersSize) : m_stream(stream) , m_buffer(buffer) - , m_bufferSize(bufferSize) , m_maxHeadersSize(maxHeadersSize) , m_accumulator(0) , m_progress(0) @@ -125,23 +124,24 @@ RequestHeadersReader::readHeadersAsync(const std::shared_ptr m_maxHeadersSize) { desiredToRead = m_maxHeadersSize - m_progress; if(desiredToRead <= 0) { return error("[oatpp::web::protocol::http::incoming::RequestHeadersReader::readHeadersAsync()]: Error. Headers section is too large."); } } - - auto res = m_stream->peek(m_buffer, desiredToRead); + + auto bufferData = m_buffer.getData(); + auto res = m_stream->peek(bufferData, desiredToRead); if(res > 0) { - m_bufferStream.write(m_buffer, res); + m_bufferStream.write(bufferData, res); m_progress += res; for(v_int32 i = 0; i < res; i ++) { m_accumulator <<= 8; - m_accumulator |= m_buffer[i]; + m_accumulator |= bufferData[i]; if(m_accumulator == SECTION_END) { m_stream->commitReadOffset(i + 1); return yieldTo(&ReaderCoroutine::parseHeaders); @@ -185,7 +185,7 @@ RequestHeadersReader::readHeadersAsync(const std::shared_ptr m_maxHeadersSize) { desiredToRead = m_maxHeadersSize - progress; if(desiredToRead <= 0) { return -1; } } - - res = connection->read(m_buffer, desiredToRead); + + auto bufferData = m_buffer.getData(); + res = connection->read(bufferData, desiredToRead); if(res > 0) { - bufferStream->write(m_buffer, res); + bufferStream->write(bufferData, res); for(v_int32 i = 0; i < res; i ++) { accumulator <<= 8; - accumulator |= m_buffer[i]; + accumulator |= bufferData[i]; if(accumulator == sectionEnd) { result.bufferPosStart = i + 1; result.bufferPosEnd = (v_int32) res; @@ -101,8 +102,7 @@ ResponseHeadersReader::readHeadersAsync(const std::shared_ptr { private: std::shared_ptr m_connection; - p_char8 m_buffer; - v_int32 m_bufferSize; + oatpp::data::share::MemoryLabel m_buffer; v_int32 m_maxHeadersSize; v_word32 m_accumulator; v_int32 m_progress; @@ -111,10 +111,9 @@ ResponseHeadersReader::readHeadersAsync(const std::shared_ptr& connection, - p_char8 buffer, v_int32 bufferSize, v_int32 maxHeadersSize) + const oatpp::data::share::MemoryLabel buffer, v_int32 maxHeadersSize) : m_connection(connection) , m_buffer(buffer) - , m_bufferSize(bufferSize) , m_maxHeadersSize(maxHeadersSize) , m_accumulator(0) , m_progress(0) @@ -122,22 +121,22 @@ ResponseHeadersReader::readHeadersAsync(const std::shared_ptr m_maxHeadersSize) { desiredToRead = m_maxHeadersSize - m_progress; if(desiredToRead <= 0) { return error("[oatpp::web::protocol::http::incoming::RequestHeadersReader::readHeadersAsync()]: Error. Headers section is too large."); } } - - auto res = m_connection->read(m_buffer, desiredToRead); + auto bufferData = m_buffer.getData(); + auto res = m_connection->read(bufferData, desiredToRead); if(res > 0) { - m_bufferStream.write(m_buffer, res); + m_bufferStream.write(bufferData, res); m_progress += res; for(v_int32 i = 0; i < res; i ++) { m_accumulator <<= 8; - m_accumulator |= m_buffer[i]; + m_accumulator |= bufferData[i]; if(m_accumulator == SECTION_END) { m_result.bufferPosStart = i + 1; m_result.bufferPosEnd = (v_int32) res; @@ -180,7 +179,7 @@ ResponseHeadersReader::readHeadersAsync(const std::shared_ptrsetOutputStreamIOMode(oatpp::data::stream::IOMode::NON_BLOCKING); connection->setInputStreamIOMode(oatpp::data::stream::IOMode::NON_BLOCKING); + + auto bufferMemory = oatpp::base::StrBuffer::createShared(oatpp::data::buffer::IOBuffer::BUFFER_SIZE * 2); + + oatpp::data::share::MemoryLabel inBuffer(bufferMemory, + &bufferMemory->getData()[0], + oatpp::data::buffer::IOBuffer::BUFFER_SIZE); - auto inBuffer = oatpp::data::buffer::IOBuffer::createShared(); - auto outBuffer = oatpp::data::buffer::IOBuffer::createShared(); + oatpp::data::share::MemoryLabel outBuffer(bufferMemory, + &bufferMemory->getData()[oatpp::data::buffer::IOBuffer::BUFFER_SIZE], + oatpp::data::buffer::IOBuffer::BUFFER_SIZE); auto inStream = oatpp::data::stream::InputStreamBufferedProxy::createShared(connection, inBuffer); auto outStream = oatpp::data::stream::OutputStreamBufferedProxy::createShared(connection, outBuffer); diff --git a/src/oatpp/web/server/HttpConnectionHandler.cpp b/src/oatpp/web/server/HttpConnectionHandler.cpp index 9dfdcfb9..f78a34d6 100644 --- a/src/oatpp/web/server/HttpConnectionHandler.cpp +++ b/src/oatpp/web/server/HttpConnectionHandler.cpp @@ -55,13 +55,15 @@ HttpConnectionHandler::Task::createShared(HttpRouter* router, } void HttpConnectionHandler::Task::run(){ - - const v_int32 bufferSize = oatpp::data::buffer::IOBuffer::BUFFER_SIZE; - v_char8 inBuffer [bufferSize]; - v_char8 outBuffer [bufferSize]; - auto inStream = oatpp::data::stream::InputStreamBufferedProxy::createShared(m_connection, inBuffer, bufferSize); - auto outStream = oatpp::data::stream::OutputStreamBufferedProxy::createShared(m_connection, outBuffer, bufferSize); + const v_int32 bufferSize = oatpp::data::buffer::IOBuffer::BUFFER_SIZE; + v_char8 bufferMemory[bufferSize * 2]; + + oatpp::data::share::MemoryLabel inBuffer(nullptr, bufferMemory, bufferSize); + oatpp::data::share::MemoryLabel outBuffer(nullptr, bufferMemory + bufferSize, bufferSize); + + auto inStream = oatpp::data::stream::InputStreamBufferedProxy::createShared(m_connection, inBuffer); + oatpp::data::stream::OutputStreamBufferedProxy outStream(m_connection, outBuffer); v_int32 connectionState = oatpp::web::protocol::http::outgoing::CommunicationUtils::CONNECTION_STATE_CLOSE; std::shared_ptr response; @@ -70,9 +72,9 @@ void HttpConnectionHandler::Task::run(){ response = HttpProcessor::processRequest(m_router, inStream, m_bodyDecoder, m_errorHandler, m_requestInterceptors, connectionState); if(response) { - outStream->setBufferPosition(0, 0, false); - response->send(outStream.get()); - outStream->flush(); + outStream.setBufferPosition(0, 0, false); + response->send(&outStream); + outStream.flush(); } else { return; } diff --git a/src/oatpp/web/server/HttpProcessor.cpp b/src/oatpp/web/server/HttpProcessor.cpp index c3fa58e8..d895827a 100644 --- a/src/oatpp/web/server/HttpProcessor.cpp +++ b/src/oatpp/web/server/HttpProcessor.cpp @@ -41,7 +41,8 @@ HttpProcessor::processRequest(HttpRouter* router, { const v_int32 bufferSize = 2048; v_char8 buffer [bufferSize]; - RequestHeadersReader headersReader(buffer, bufferSize, 4096); + oatpp::data::share::MemoryLabel bufferLabel(nullptr, buffer, bufferSize); + RequestHeadersReader headersReader(bufferLabel, 4096); headersReadResult = headersReader.readHeaders(inStream.get(), error); } @@ -129,7 +130,7 @@ oatpp::async::Action HttpProcessor::Coroutine::onHeadersParsed(const RequestHead } HttpProcessor::Coroutine::Action HttpProcessor::Coroutine::act() { - RequestHeadersReader headersReader(m_headerReaderBuffer->getData(), m_headerReaderBuffer->getSize(), 4096); + RequestHeadersReader headersReader(m_headerReaderBuffer, 4096); return headersReader.readHeadersAsync(m_inStream).callbackTo(&HttpProcessor::Coroutine::onHeadersParsed); } @@ -143,12 +144,12 @@ HttpProcessor::Coroutine::Action HttpProcessor::Coroutine::onResponse(const std: } HttpProcessor::Coroutine::Action HttpProcessor::Coroutine::onResponseFormed() { - + m_currentResponse->putHeaderIfNotExists(protocol::http::Header::SERVER, protocol::http::Header::Value::SERVER); m_connectionState = oatpp::web::protocol::http::outgoing::CommunicationUtils::considerConnectionState(m_currentRequest, m_currentResponse); m_outStream->setBufferPosition(0, 0, false); return m_currentResponse->sendAsync(m_outStream).next(m_outStream->flushAsync()).next(yieldTo(&HttpProcessor::Coroutine::onRequestDone)); - + } HttpProcessor::Coroutine::Action HttpProcessor::Coroutine::onRequestDone() { diff --git a/src/oatpp/web/server/HttpProcessor.hpp b/src/oatpp/web/server/HttpProcessor.hpp index 1ee54ef6..9d348a0b 100644 --- a/src/oatpp/web/server/HttpProcessor.hpp +++ b/src/oatpp/web/server/HttpProcessor.hpp @@ -76,7 +76,7 @@ public: std::shared_ptr m_outStream; v_int32 m_connectionState; private: - oatpp::String m_headerReaderBuffer; + oatpp::data::share::MemoryLabel m_headerReaderBuffer; oatpp::web::server::HttpRouter::BranchRouter::Route m_currentRoute; std::shared_ptr m_currentRequest; std::shared_ptr m_currentResponse; @@ -97,7 +97,7 @@ public: , m_inStream(inStream) , m_outStream(outStream) , m_connectionState(oatpp::web::protocol::http::outgoing::CommunicationUtils::CONNECTION_STATE_KEEP_ALIVE) - , m_headerReaderBuffer(2048) + , m_headerReaderBuffer(oatpp::base::StrBuffer::createShared(2048)) {} Action act() override;