diff --git a/CMakeLists.txt b/CMakeLists.txt index 786db6b4..d6b9b5a9 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -126,6 +126,8 @@ add_library(oatpp web/protocol/http/Http.hpp web/protocol/http/incoming/BodyDecoder.cpp web/protocol/http/incoming/BodyDecoder.hpp + web/protocol/http/incoming/SimpleBodyDecoder.cpp + web/protocol/http/incoming/SimpleBodyDecoder.hpp web/protocol/http/incoming/Request.cpp web/protocol/http/incoming/Request.hpp web/protocol/http/incoming/Response.cpp diff --git a/core/data/stream/Stream.cpp b/core/data/stream/Stream.cpp index 72170f8c..df276a64 100644 --- a/core/data/stream/Stream.cpp +++ b/core/data/stream/Stream.cpp @@ -117,22 +117,36 @@ const std::shared_ptr& operator << (const std::shared_ptr& fromStream, - const std::shared_ptr& toStream, - oatpp::os::io::Library::v_size transferSize, - void* buffer, - oatpp::os::io::Library::v_size bufferSize) { +oatpp::os::io::Library::v_size transfer(const std::shared_ptr& fromStream, + const std::shared_ptr& toStream, + oatpp::os::io::Library::v_size transferSize, + void* buffer, + oatpp::os::io::Library::v_size bufferSize) { - while (transferSize > 0) { - oatpp::os::io::Library::v_size desiredReadCount = transferSize; - if(desiredReadCount > bufferSize){ + oatpp::os::io::Library::v_size progress = 0; + + while (transferSize == 0 || progress < transferSize) { + oatpp::os::io::Library::v_size desiredReadCount = transferSize - progress; + if(transferSize == 0 || desiredReadCount > bufferSize){ desiredReadCount = bufferSize; } - auto readCount = fromStream->read(buffer, desiredReadCount); - toStream->write(buffer, readCount); - transferSize -= readCount; + auto readResult = fromStream->read(buffer, desiredReadCount); + if(readResult > 0) { + auto writeReasul = writeExactSizeData(toStream.get(), buffer, readResult); + if(writeReasul != readResult) { + throw std::runtime_error("[oatpp::data::stream::transfer()]: Unknown Error. Can't continue transfer."); + } + progress += readResult; + } else { + if(readResult == oatpp::data::stream::Errors::ERROR_IO_RETRY || readResult == oatpp::data::stream::Errors::ERROR_IO_WAIT_RETRY) { + continue; + } + return progress; + } } + return progress; + } oatpp::async::Action transferAsync(oatpp::async::AbstractCoroutine* parentCoroutine, @@ -147,6 +161,7 @@ oatpp::async::Action transferAsync(oatpp::async::AbstractCoroutine* parentCorout std::shared_ptr m_fromStream; std::shared_ptr m_toStream; oatpp::os::io::Library::v_size m_transferSize; + oatpp::os::io::Library::v_size m_progress; std::shared_ptr m_buffer; oatpp::os::io::Library::v_size m_desiredReadCount; @@ -163,19 +178,24 @@ oatpp::async::Action transferAsync(oatpp::async::AbstractCoroutine* parentCorout : m_fromStream(fromStream) , m_toStream(toStream) , m_transferSize(transferSize) + , m_progress(0) , m_buffer(buffer) {} Action act() override { - - if(m_transferSize == 0) { - return finish(); + /* m_transferSize == 0 - is a legal state. */ + if(m_transferSize > 0) { + if(m_progress == m_transferSize) { + return finish(); + } else if(m_progress > m_transferSize) { + throw std::runtime_error("[oatpp::data::stream::transferAsync{TransferCoroutine::act()}]: Invalid state. m_progress > m_transferSize"); + } } else if(m_transferSize < 0) { - throw std::runtime_error("Invalid stream::TransferCoroutine state"); + throw std::runtime_error("[oatpp::data::stream::transferAsync{TransferCoroutine::act()}]: Invalid state. m_transferSize < 0"); } - m_desiredReadCount = m_transferSize; - if(m_desiredReadCount > m_buffer->getSize()){ + m_desiredReadCount = m_transferSize - m_progress; + if(m_transferSize == 0 || m_desiredReadCount > m_buffer->getSize()){ m_desiredReadCount = m_buffer->getSize(); } @@ -188,22 +208,31 @@ oatpp::async::Action transferAsync(oatpp::async::AbstractCoroutine* parentCorout Action doRead() { return oatpp::data::stream::readSomeDataAsyncInline(m_fromStream.get(), - m_readBufferPtr, - m_bytesLeft, - yieldTo(&TransferCoroutine::prepareWrite)); + m_readBufferPtr, + m_bytesLeft, + yieldTo(&TransferCoroutine::prepareWrite)); } Action prepareWrite() { m_bytesLeft = m_desiredReadCount - m_bytesLeft; - m_transferSize -= m_bytesLeft; + m_progress += m_bytesLeft; return yieldTo(&TransferCoroutine::doWrite); } Action doWrite() { return oatpp::data::stream::writeDataAsyncInline(m_toStream.get(), - m_writeBufferPtr, - m_bytesLeft, - yieldTo(&TransferCoroutine::act)); + m_writeBufferPtr, + m_bytesLeft, + yieldTo(&TransferCoroutine::act)); + } + + Action handleError(const oatpp::async::Error& error) override { + if(!error.isExceptionThrown && error.message == Errors::ERROR_ASYNC_FAILED_TO_READ_DATA) { + if(m_transferSize == 0) { + return finish(); + } + } + return error; } }; @@ -276,18 +305,25 @@ oatpp::async::Action readExactSizeDataAsyncInline(oatpp::data::stream::InputStre } oatpp::os::io::Library::v_size writeExactSizeData(oatpp::data::stream::OutputStream* stream, const void* data, os::io::Library::v_size size) { + const char* buffer = (char*)data; oatpp::os::io::Library::v_size progress = 0; + while (progress < size) { + auto res = stream->write(&buffer[progress], size - progress); - if(res <= 0) { // if res == 0 then probably stream handles write() error incorrectly. return. - if(res == oatpp::data::stream::Errors::ERROR_IO_PIPE || - (res != oatpp::data::stream::Errors::ERROR_IO_RETRY && res != oatpp::data::stream::Errors::ERROR_IO_WAIT_RETRY)) { - return progress; + + if(res > 0) { + progress += res; + } else { // if res == 0 then probably stream handles write() error incorrectly. return. + if(res == oatpp::data::stream::Errors::ERROR_IO_RETRY || res == oatpp::data::stream::Errors::ERROR_IO_WAIT_RETRY) { + continue; } + return progress; } - progress += res; + } + return progress; } diff --git a/core/data/stream/Stream.hpp b/core/data/stream/Stream.hpp index 9fe3fe7d..90ee694e 100644 --- a/core/data/stream/Stream.hpp +++ b/core/data/stream/Stream.hpp @@ -138,12 +138,15 @@ const std::shared_ptr& operator << (const std::shared_ptr& fromStream, - const std::shared_ptr& toStream, - oatpp::os::io::Library::v_size transferSize, - void* buffer, - oatpp::os::io::Library::v_size bufferSize); +oatpp::os::io::Library::v_size transfer(const std::shared_ptr& fromStream, + const std::shared_ptr& toStream, + oatpp::os::io::Library::v_size transferSize, + void* buffer, + oatpp::os::io::Library::v_size bufferSize); + /** * Same as transfer but asynchronous diff --git a/web/client/HttpRequestExecutor.cpp b/web/client/HttpRequestExecutor.cpp index 95dc99a3..2d731354 100644 --- a/web/client/HttpRequestExecutor.cpp +++ b/web/client/HttpRequestExecutor.cpp @@ -92,7 +92,7 @@ HttpRequestExecutor::execute(const String& method, caret.getPosition(), (v_int32) readCount); - return Response::createShared(line->statusCode, line->description, responseHeaders, bodyStream); + return Response::createShared(line->statusCode, line->description, responseHeaders, bodyStream, m_bodyDecoder); } @@ -110,6 +110,7 @@ oatpp::async::Action HttpRequestExecutor::executeAsync(oatpp::async::AbstractCor String m_path; std::shared_ptr m_headers; std::shared_ptr m_body; + std::shared_ptr m_bodyDecoder; private: std::shared_ptr m_connection; std::shared_ptr m_ioBuffer; @@ -121,12 +122,14 @@ oatpp::async::Action HttpRequestExecutor::executeAsync(oatpp::async::AbstractCor const String& method, const String& path, const std::shared_ptr& headers, - const std::shared_ptr& body) + const std::shared_ptr& body, + const std::shared_ptr& bodyDecoder) : m_connectionProvider(connectionProvider) , m_method(method) , m_path(path) , m_headers(headers) , m_body(body) + , m_bodyDecoder(bodyDecoder) {} Action act() override { @@ -175,7 +178,7 @@ oatpp::async::Action HttpRequestExecutor::executeAsync(oatpp::async::AbstractCor caret.getPosition(), (v_int32) readCount); - return _return(Response::createShared(line->statusCode, line->description, headers, bodyStream)); + return _return(Response::createShared(line->statusCode, line->description, headers, bodyStream, m_bodyDecoder)); } @@ -185,7 +188,7 @@ oatpp::async::Action HttpRequestExecutor::executeAsync(oatpp::async::AbstractCor }; - return parentCoroutine->startCoroutineForResult(callback, m_connectionProvider, method, path, headers, body); + return parentCoroutine->startCoroutineForResult(callback, m_connectionProvider, method, path, headers, body, m_bodyDecoder); } diff --git a/web/client/HttpRequestExecutor.hpp b/web/client/HttpRequestExecutor.hpp index 2fac126d..763ab0c3 100644 --- a/web/client/HttpRequestExecutor.hpp +++ b/web/client/HttpRequestExecutor.hpp @@ -26,6 +26,8 @@ #define oatpp_web_client_HttpRequestExecutor_hpp #include "./RequestExecutor.hpp" + +#include "oatpp/web/protocol/http/incoming/SimpleBodyDecoder.hpp" #include "oatpp/network/ConnectionProvider.hpp" namespace oatpp { namespace web { namespace client { @@ -33,15 +35,21 @@ namespace oatpp { namespace web { namespace client { class HttpRequestExecutor : public oatpp::base::Controllable, public RequestExecutor { protected: std::shared_ptr m_connectionProvider; + std::shared_ptr m_bodyDecoder; public: - HttpRequestExecutor(const std::shared_ptr& connectionProvider) + HttpRequestExecutor(const std::shared_ptr& connectionProvider, + const std::shared_ptr& bodyDecoder = + std::make_shared()) : m_connectionProvider(connectionProvider) + , m_bodyDecoder(bodyDecoder) {} public: static std::shared_ptr - createShared(const std::shared_ptr& connectionProvider){ - return std::make_shared(connectionProvider); + createShared(const std::shared_ptr& connectionProvider, + const std::shared_ptr& bodyDecoder = + std::make_shared()){ + return std::make_shared(connectionProvider, bodyDecoder); } /** diff --git a/web/protocol/http/Http.cpp b/web/protocol/http/Http.cpp index ae0c1500..17b7e43d 100644 --- a/web/protocol/http/Http.cpp +++ b/web/protocol/http/Http.cpp @@ -316,31 +316,35 @@ oatpp::String Protocol::parseHeaderName(oatpp::parser::ParsingCaret& caret) { return nullptr; } +void Protocol::parseOneHeader(Headers& headers, oatpp::parser::ParsingCaret& caret, Status& error) { + caret.findNotSpaceChar(); + auto name = parseHeaderName(caret); + if(name) { + caret.findNotSpaceChar(); + if(!caret.canContinueAtChar(':', 1)) { + error = Status::CODE_400; + return; + } + caret.findNotSpaceChar(); + oatpp::parser::ParsingCaret::Label label(caret); + caret.findRN(); + headers.put(name, label.toString(true)); + caret.skipRN(); + } else { + error = Status::CODE_431; + return; + } +} std::shared_ptr Protocol::parseHeaders(oatpp::parser::ParsingCaret& caret, Status& error) { auto headers = Headers::createShared(); while (!caret.isAtRN()) { - - caret.findNotSpaceChar(); - auto name = parseHeaderName(caret); - if(name) { - caret.findNotSpaceChar(); - if(!caret.canContinueAtChar(':', 1)) { - error = Status::CODE_400; - return nullptr; - } - caret.findNotSpaceChar(); - oatpp::parser::ParsingCaret::Label label(caret); - caret.findRN(); - headers->put(name, label.toString(true)); - caret.skipRN(); - } else { - error = Status::CODE_431; + parseOneHeader(*headers, caret, error); + if(error.code != 0) { return nullptr; } - } caret.skipRN(); diff --git a/web/protocol/http/Http.hpp b/web/protocol/http/Http.hpp index 9ae1b198..b9b6c1dc 100644 --- a/web/protocol/http/Http.hpp +++ b/web/protocol/http/Http.hpp @@ -273,6 +273,11 @@ public: static std::shared_ptr parseRequestStartingLine(oatpp::parser::ParsingCaret& caret); static std::shared_ptr parseResponseStartingLine(oatpp::parser::ParsingCaret& caret); + + /** + * Parse header and store it in headers map + */ + static void parseOneHeader(Headers& headers, oatpp::parser::ParsingCaret& caret, Status& error); static std::shared_ptr parseHeaders(oatpp::parser::ParsingCaret& caret, Status& error); }; diff --git a/web/protocol/http/incoming/BodyDecoder.cpp b/web/protocol/http/incoming/BodyDecoder.cpp index 1c32e665..f69db75f 100644 --- a/web/protocol/http/incoming/BodyDecoder.cpp +++ b/web/protocol/http/incoming/BodyDecoder.cpp @@ -24,224 +24,10 @@ #include "BodyDecoder.hpp" -#include "oatpp/core/data/stream/StreamBufferedProxy.hpp" -#include "oatpp/core/utils/ConversionUtils.hpp" + namespace oatpp { namespace web { namespace protocol { namespace http { namespace incoming { -os::io::Library::v_size BodyDecoder::readLine(const std::shared_ptr& fromStream, - p_char8 buffer, - os::io::Library::v_size maxLineSize) { - - v_char8 a; - os::io::Library::v_size count = 0; - while (fromStream->read(&a, 1) > 0) { - if(a != '\r') { - if(count + 1 > maxLineSize) { - OATPP_LOGE("BodyDecoder", "Error - too long line"); - return 0; - } - buffer[count++] = a; - } else { - fromStream->read(&a, 1); - if(a != '\n'){ - OATPP_LOGE("BodyDecoder", "Warning - invalid line breaker"); - } - return count; // size of line - } - } - - return count; - -} - -void BodyDecoder::doChunkedDecoding(const std::shared_ptr& fromStream, - const std::shared_ptr& toStream) { - - auto buffer = oatpp::data::buffer::IOBuffer::createShared(); - - v_int32 maxLineSize = 8; // 0xFFFFFFFF 4Gb for chunk - v_char8 lineBuffer[maxLineSize + 1]; - os::io::Library::v_size countToRead; - - do { - - auto lineSize = readLine(fromStream, lineBuffer, maxLineSize); - if(lineSize == 0 || lineSize >= maxLineSize) { - return; // error reading stream - } - lineBuffer[lineSize] = 0; - countToRead = std::strtol((const char*)lineBuffer, nullptr, 16); - - if(countToRead > 0) { - oatpp::data::stream::transfer(fromStream, toStream, countToRead, buffer->getData(), buffer->getSize()); - } - - fromStream->read(lineBuffer, 2); // just skip "\r\n" - - } while (countToRead > 0); - -} - -void BodyDecoder::decode(const std::shared_ptr& headers, - const std::shared_ptr& bodyStream, - const std::shared_ptr& toStream) { - - auto transferEncoding = headers->get(Header::TRANSFER_ENCODING, nullptr); - if(transferEncoding && transferEncoding->equals(Header::Value::TRANSFER_ENCODING_CHUNKED)) { - doChunkedDecoding(bodyStream, toStream); - } else { - oatpp::os::io::Library::v_size contentLength = 0; - auto contentLengthStr = headers->get(Header::CONTENT_LENGTH, nullptr); - if(!contentLengthStr) { - return; // DO NOTHING // it is an empty or invalid body - } else { - bool success; - contentLength = oatpp::utils::conversion::strToInt64(contentLengthStr, success); - if(!success){ - return; // it is an invalid request/response - } - auto buffer = oatpp::data::buffer::IOBuffer::createShared(); - oatpp::data::stream::transfer(bodyStream, toStream, contentLength, buffer->getData(), buffer->getSize()); - } - } - -} - -oatpp::async::Action BodyDecoder::doChunkedDecodingAsync(oatpp::async::AbstractCoroutine* parentCoroutine, - const oatpp::async::Action& actionOnReturn, - const std::shared_ptr& fromStream, - const std::shared_ptr& toStream) { - - class ChunkedDecoder : public oatpp::async::Coroutine { - private: - const v_int32 MAX_LINE_SIZE = 8; - private: - std::shared_ptr m_fromStream; - std::shared_ptr m_toStream; - std::shared_ptr m_buffer = oatpp::data::buffer::IOBuffer::createShared(); - v_int32 m_currLineLength; - v_char8 m_lineChar; - bool m_lineEnding; - v_char8 m_lineBuffer [16]; // used max 8 - void* m_skipData; - os::io::Library::v_size m_skipSize; - bool m_done = false; - private: - void prepareSkipRN() { - m_skipData = &m_lineBuffer[0]; - m_skipSize = 2; - m_currLineLength = 0; - m_lineEnding = false; - } - public: - - ChunkedDecoder(const std::shared_ptr& fromStream, - const std::shared_ptr& toStream) - : m_fromStream(fromStream) - , m_toStream(toStream) - {} - - Action act() override { - m_currLineLength = 0; - m_lineEnding = false; - return yieldTo(&ChunkedDecoder::readLineChar); - } - - Action readLineChar() { - auto res = m_fromStream->read(&m_lineChar, 1); - if(res == oatpp::data::stream::Errors::ERROR_IO_WAIT_RETRY) { - return oatpp::async::Action::_WAIT_RETRY; - } else if(res == oatpp::data::stream::Errors::ERROR_IO_RETRY) { - return oatpp::async::Action::_REPEAT; - } else if( res < 0) { - return error("[BodyDecoder::ChunkedDecoder] Can't read line char"); - } - return yieldTo(&ChunkedDecoder::onLineCharRead); - } - - Action onLineCharRead() { - if(!m_lineEnding) { - if(m_lineChar != '\r') { - if(m_currLineLength + 1 > MAX_LINE_SIZE){ - return error("[BodyDecoder::ChunkedDecoder] too long line"); - } - m_lineBuffer[m_currLineLength ++] = m_lineChar; - return yieldTo(&ChunkedDecoder::readLineChar); - } else { - m_lineEnding = true; - return yieldTo(&ChunkedDecoder::readLineChar); - } - } else { - if(m_lineChar != '\n') { - OATPP_LOGD("[BodyDecoder::ChunkedDecoder]", "Warning - invalid line breaker") - } - } - if(m_currLineLength == 0) { - return error("Error reading stream. 0-length line"); - } - m_lineBuffer[m_currLineLength] = 0; - return yieldTo(&ChunkedDecoder::onLineRead); - } - - Action onLineRead() { - os::io::Library::v_size countToRead = std::strtol((const char*) m_lineBuffer, nullptr, 16); - if(countToRead > 0) { - prepareSkipRN(); - return oatpp::data::stream::transferAsync(this, yieldTo(&ChunkedDecoder::skipRN), m_fromStream, m_toStream, countToRead, m_buffer); - } - m_done = true; - prepareSkipRN(); - return yieldTo(&ChunkedDecoder::skipRN); - } - - Action skipRN() { - if(m_done) { - return oatpp::data::stream::readExactSizeDataAsyncInline(m_fromStream.get(), - m_skipData, - m_skipSize, - finish()); - } else { - return oatpp::data::stream::readExactSizeDataAsyncInline(m_fromStream.get(), - m_skipData, - m_skipSize, - yieldTo(&ChunkedDecoder::readLineChar)); - } - } - - }; - - return parentCoroutine->startCoroutine(actionOnReturn, fromStream, toStream); - -} - -oatpp::async::Action BodyDecoder::decodeAsync(oatpp::async::AbstractCoroutine* parentCoroutine, - const oatpp::async::Action& actionOnReturn, - const std::shared_ptr& headers, - const std::shared_ptr& bodyStream, - const std::shared_ptr& toStream) { - auto transferEncoding = headers->get(Header::TRANSFER_ENCODING, nullptr); - if(transferEncoding && transferEncoding->equals(Header::Value::TRANSFER_ENCODING_CHUNKED)) { - return doChunkedDecodingAsync(parentCoroutine, actionOnReturn, bodyStream, toStream); - } else { - oatpp::os::io::Library::v_size contentLength = 0; - auto contentLengthStr = headers->get(Header::CONTENT_LENGTH, nullptr); - if(!contentLengthStr) { - return actionOnReturn; // DO NOTHING // it is an empty or invalid body - } else { - bool success; - contentLength = oatpp::utils::conversion::strToInt64(contentLengthStr, success); - if(!success){ - return oatpp::async::Action(oatpp::async::Error("Invalid 'Content-Length' Header")); - } - return oatpp::data::stream::transferAsync(parentCoroutine, - actionOnReturn, - bodyStream, - toStream, - contentLength, - oatpp::data::buffer::IOBuffer::createShared()); - } - } -} + }}}}} diff --git a/web/protocol/http/incoming/BodyDecoder.hpp b/web/protocol/http/incoming/BodyDecoder.hpp index 4d0bb410..2c12ca7b 100644 --- a/web/protocol/http/incoming/BodyDecoder.hpp +++ b/web/protocol/http/incoming/BodyDecoder.hpp @@ -36,19 +36,22 @@ private: class ToStringDecoder : public oatpp::async::CoroutineWithResult { private: + const BodyDecoder* m_decoder; std::shared_ptr m_headers; std::shared_ptr m_bodyStream; std::shared_ptr m_chunkedBuffer = oatpp::data::stream::ChunkedBuffer::createShared(); public: - ToStringDecoder(const std::shared_ptr& headers, + ToStringDecoder(const BodyDecoder* decoder, + const std::shared_ptr& headers, const std::shared_ptr& bodyStream) - : m_headers(headers) + : m_decoder(decoder) + , m_headers(headers) , m_bodyStream(bodyStream) {} Action act() override { - return decodeAsync(this, yieldTo(&ToStringDecoder::onDecoded), m_headers, m_bodyStream, m_chunkedBuffer); + return m_decoder->decodeAsync(this, yieldTo(&ToStringDecoder::onDecoded), m_headers, m_bodyStream, m_chunkedBuffer); } Action onDecoded() { @@ -60,22 +63,25 @@ private: template class ToDtoDecoder : public oatpp::async::CoroutineWithResult, typename Type::ObjectWrapper> { private: + const BodyDecoder* m_decoder; std::shared_ptr m_headers; std::shared_ptr m_bodyStream; std::shared_ptr m_objectMapper; std::shared_ptr m_chunkedBuffer = oatpp::data::stream::ChunkedBuffer::createShared(); public: - ToDtoDecoder(const std::shared_ptr& headers, + ToDtoDecoder(const BodyDecoder* decoder, + const std::shared_ptr& headers, const std::shared_ptr& bodyStream, const std::shared_ptr& objectMapper) - : m_headers(headers) + : m_decoder(decoder) + , m_headers(headers) , m_bodyStream(bodyStream) , m_objectMapper(objectMapper) {} oatpp::async::Action act() override { - return decodeAsync(this, this->yieldTo(&ToDtoDecoder::onDecoded), m_headers, m_bodyStream, m_chunkedBuffer); + return m_decoder->decodeAsync(this, this->yieldTo(&ToDtoDecoder::onDecoded), m_headers, m_bodyStream, m_chunkedBuffer); } oatpp::async::Action onDecoded() { @@ -90,65 +96,47 @@ private: }; -private: - static os::io::Library::v_size readLine(const std::shared_ptr& fromStream, - p_char8 buffer, - os::io::Library::v_size maxLineSize); - static void doChunkedDecoding(const std::shared_ptr& from, - const std::shared_ptr& toStream); - - static oatpp::async::Action doChunkedDecodingAsync(oatpp::async::AbstractCoroutine* parentCoroutine, - const oatpp::async::Action& actionOnReturn, - const std::shared_ptr& fromStream, - const std::shared_ptr& toStream); public: - // read chunk by chunk from 'bodyStream' and write to 'toStream' using buffer::IOBuffer + virtual void decode(const std::shared_ptr& headers, + const std::shared_ptr& bodyStream, + const std::shared_ptr& toStream) const = 0; - static void decode(const std::shared_ptr& headers, - const std::shared_ptr& bodyStream, - const std::shared_ptr& toStream); + virtual oatpp::async::Action decodeAsync(oatpp::async::AbstractCoroutine* parentCoroutine, + const oatpp::async::Action& actionOnReturn, + const std::shared_ptr& headers, + const std::shared_ptr& bodyStream, + const std::shared_ptr& toStream) const = 0; - static oatpp::String - decodeToString(const std::shared_ptr& headers, - const std::shared_ptr& bodyStream) { + oatpp::String decodeToString(const std::shared_ptr& headers, + const std::shared_ptr& bodyStream) const { auto chunkedBuffer = oatpp::data::stream::ChunkedBuffer::createShared(); decode(headers, bodyStream, chunkedBuffer); return chunkedBuffer->toString(); } template - static typename Type::ObjectWrapper decodeToDto(const std::shared_ptr& headers, - const std::shared_ptr& bodyStream, - const std::shared_ptr& objectMapper){ + typename Type::ObjectWrapper decodeToDto(const std::shared_ptr& headers, + const std::shared_ptr& bodyStream, + const std::shared_ptr& objectMapper) const { return objectMapper->readFromString(decodeToString(headers, bodyStream)); } - // Async - - static oatpp::async::Action decodeAsync(oatpp::async::AbstractCoroutine* parentCoroutine, - const oatpp::async::Action& actionOnReturn, - const std::shared_ptr& headers, - const std::shared_ptr& bodyStream, - const std::shared_ptr& toStream); - template - static oatpp::async::Action - decodeToStringAsync(oatpp::async::AbstractCoroutine* parentCoroutine, - oatpp::async::Action (ParentCoroutineType::*callback)(const oatpp::String&), - const std::shared_ptr& headers, - const std::shared_ptr& bodyStream) { - return parentCoroutine->startCoroutineForResult(callback, headers, bodyStream); + oatpp::async::Action decodeToStringAsync(oatpp::async::AbstractCoroutine* parentCoroutine, + oatpp::async::Action (ParentCoroutineType::*callback)(const oatpp::String&), + const std::shared_ptr& headers, + const std::shared_ptr& bodyStream) const { + return parentCoroutine->startCoroutineForResult(callback, this, headers, bodyStream); } template - static oatpp::async::Action - decodeToDtoAsync(oatpp::async::AbstractCoroutine* parentCoroutine, - oatpp::async::Action (ParentCoroutineType::*callback)(const typename DtoType::ObjectWrapper&), - const std::shared_ptr& headers, - const std::shared_ptr& bodyStream, - const std::shared_ptr& objectMapper) { - return parentCoroutine->startCoroutineForResult>(callback, headers, bodyStream, objectMapper); + oatpp::async::Action decodeToDtoAsync(oatpp::async::AbstractCoroutine* parentCoroutine, + oatpp::async::Action (ParentCoroutineType::*callback)(const typename DtoType::ObjectWrapper&), + const std::shared_ptr& headers, + const std::shared_ptr& bodyStream, + const std::shared_ptr& objectMapper) const { + return parentCoroutine->startCoroutineForResult>(callback, this, headers, bodyStream, objectMapper); } }; diff --git a/web/protocol/http/incoming/Request.hpp b/web/protocol/http/incoming/Request.hpp index b09656c2..ff1165ed 100644 --- a/web/protocol/http/incoming/Request.hpp +++ b/web/protocol/http/incoming/Request.hpp @@ -36,23 +36,29 @@ public: OBJECT_POOL(Incoming_Request_Pool, Request, 32) SHARED_OBJECT_POOL(Shared_Incoming_Request_Pool, Request, 32) public: - Request(){} + Request(const std::shared_ptr& pBodyDecoder) + : bodyDecoder(pBodyDecoder) + {} + Request(const std::shared_ptr& pStartingLine, const std::shared_ptr& pPathVariables, const std::shared_ptr& pHeaders, - const std::shared_ptr& pBodyStream) + const std::shared_ptr& pBodyStream, + const std::shared_ptr& pBodyDecoder) : startingLine(pStartingLine) , pathVariables(pPathVariables) , headers(pHeaders) , bodyStream(pBodyStream) + , bodyDecoder(pBodyDecoder) {} public: static std::shared_ptr createShared(const std::shared_ptr& startingLine, - const std::shared_ptr& pathVariables, - const std::shared_ptr& headers, - const std::shared_ptr& bodyStream) { - return Shared_Incoming_Request_Pool::allocateShared(startingLine, pathVariables, headers, bodyStream); + const std::shared_ptr& pathVariables, + const std::shared_ptr& headers, + const std::shared_ptr& bodyStream, + const std::shared_ptr& bodyDecoder) { + return Shared_Incoming_Request_Pool::allocateShared(startingLine, pathVariables, headers, bodyStream, bodyDecoder); } std::shared_ptr startingLine; @@ -60,6 +66,12 @@ public: std::shared_ptr headers; std::shared_ptr bodyStream; + /** + * Request should be preconfigured with default BodyDecoder. + * Custom BodyDecoder can be set on demand + */ + std::shared_ptr bodyDecoder; + oatpp::String getHeader(const oatpp::String& headerName) const{ auto entry = headers->find(headerName); if(entry != nullptr) { @@ -81,24 +93,22 @@ public: } void streamBody(const std::shared_ptr& toStream) const { - protocol::http::incoming::BodyDecoder::decode(headers, bodyStream, toStream); + bodyDecoder->decode(headers, bodyStream, toStream); } oatpp::String readBodyToString() const { - return protocol::http::incoming::BodyDecoder::decodeToString(headers, bodyStream); + return bodyDecoder->decodeToString(headers, bodyStream); } template typename Type::ObjectWrapper readBodyToDto(const std::shared_ptr& objectMapper) const { - return objectMapper->readFromString - (protocol::http::incoming::BodyDecoder::decodeToString(headers, bodyStream)); + return objectMapper->readFromString(bodyDecoder->decodeToString(headers, bodyStream)); } template void readBodyToDto(oatpp::data::mapping::type::PolymorphicWrapper& objectWrapper, const std::shared_ptr& objectMapper) const { - objectWrapper = objectMapper->readFromString - (protocol::http::incoming::BodyDecoder::decodeToString(headers, bodyStream)); + objectWrapper = objectMapper->readFromString(bodyDecoder->decodeToString(headers, bodyStream)); } // Async @@ -106,20 +116,20 @@ public: oatpp::async::Action streamBodyAsync(oatpp::async::AbstractCoroutine* parentCoroutine, const oatpp::async::Action& actionOnReturn, const std::shared_ptr& toStream) const { - return protocol::http::incoming::BodyDecoder::decodeAsync(parentCoroutine, actionOnReturn, headers, bodyStream, toStream); + return bodyDecoder->decodeAsync(parentCoroutine, actionOnReturn, headers, bodyStream, toStream); } template oatpp::async::Action readBodyToStringAsync(oatpp::async::AbstractCoroutine* parentCoroutine, oatpp::async::Action (ParentCoroutineType::*callback)(const oatpp::String&)) const { - return protocol::http::incoming::BodyDecoder::decodeToStringAsync(parentCoroutine, callback, headers, bodyStream); + return bodyDecoder->decodeToStringAsync(parentCoroutine, callback, headers, bodyStream); } template oatpp::async::Action readBodyToDtoAsync(oatpp::async::AbstractCoroutine* parentCoroutine, oatpp::async::Action (ParentCoroutineType::*callback)(const typename DtoType::ObjectWrapper&), const std::shared_ptr& objectMapper) const { - return protocol::http::incoming::BodyDecoder::decodeToDtoAsync(parentCoroutine, callback, headers, bodyStream, objectMapper); + return bodyDecoder->decodeToDtoAsync(parentCoroutine, callback, headers, bodyStream, objectMapper); } }; diff --git a/web/protocol/http/incoming/Response.hpp b/web/protocol/http/incoming/Response.hpp index e9e4d6b0..3e720e25 100644 --- a/web/protocol/http/incoming/Response.hpp +++ b/web/protocol/http/incoming/Response.hpp @@ -38,19 +38,22 @@ public: Response(v_int32 pStatusCode, const oatpp::String& pStatusDescription, const std::shared_ptr& pHeaders, - const std::shared_ptr& pBodyStream) + const std::shared_ptr& pBodyStream, + const std::shared_ptr& pBodyDecoder) : statusCode(pStatusCode) , statusDescription(pStatusDescription) , headers(pHeaders) , bodyStream(pBodyStream) + , bodyDecoder(pBodyDecoder) {} public: static std::shared_ptr createShared(v_int32 statusCode, - const oatpp::String& statusDescription, - const std::shared_ptr& headers, - const std::shared_ptr& bodyStream) { - return Shared_Incoming_Response_Pool::allocateShared(statusCode, statusDescription, headers, bodyStream); + const oatpp::String& statusDescription, + const std::shared_ptr& headers, + const std::shared_ptr& bodyStream, + const std::shared_ptr& bodyDecoder) { + return Shared_Incoming_Response_Pool::allocateShared(statusCode, statusDescription, headers, bodyStream, bodyDecoder); } const v_int32 statusCode; @@ -58,18 +61,24 @@ public: const std::shared_ptr headers; const std::shared_ptr bodyStream; + /** + * Response should be preconfigured with default BodyDecoder. + * Entity that created response object is responsible for providing correct BodyDecoder. + * Custom BodyDecoder can be set on demand + */ + std::shared_ptr bodyDecoder; + void streamBody(const std::shared_ptr& toStream) const { - protocol::http::incoming::BodyDecoder::decode(headers, bodyStream, toStream); + bodyDecoder->decode(headers, bodyStream, toStream); } oatpp::String readBodyToString() const { - return protocol::http::incoming::BodyDecoder::decodeToString(headers, bodyStream); + return bodyDecoder->decodeToString(headers, bodyStream); } template typename Type::ObjectWrapper readBodyToDto(const std::shared_ptr& objectMapper) const { - return objectMapper->readFromString - (protocol::http::incoming::BodyDecoder::decodeToString(headers, bodyStream)); + return bodyDecoder->decodeToDto(headers, bodyStream, objectMapper); } // Async @@ -77,20 +86,20 @@ public: oatpp::async::Action streamBodyAsync(oatpp::async::AbstractCoroutine* parentCoroutine, const oatpp::async::Action& actionOnReturn, const std::shared_ptr& toStream) const { - return protocol::http::incoming::BodyDecoder::decodeAsync(parentCoroutine, actionOnReturn, headers, bodyStream, toStream); + return bodyDecoder->decodeAsync(parentCoroutine, actionOnReturn, headers, bodyStream, toStream); } template oatpp::async::Action readBodyToStringAsync(oatpp::async::AbstractCoroutine* parentCoroutine, oatpp::async::Action (ParentCoroutineType::*callback)(const oatpp::String&)) const { - return protocol::http::incoming::BodyDecoder::decodeToStringAsync(parentCoroutine, callback, headers, bodyStream); + return bodyDecoder->decodeToStringAsync(parentCoroutine, callback, headers, bodyStream); } template oatpp::async::Action readBodyToDtoAsync(oatpp::async::AbstractCoroutine* parentCoroutine, oatpp::async::Action (ParentCoroutineType::*callback)(const typename DtoType::ObjectWrapper&), const std::shared_ptr& objectMapper) const { - return protocol::http::incoming::BodyDecoder::decodeToDtoAsync(parentCoroutine, callback, headers, bodyStream, objectMapper); + return bodyDecoder->decodeToDtoAsync(parentCoroutine, callback, headers, bodyStream, objectMapper); } }; diff --git a/web/protocol/http/incoming/SimpleBodyDecoder.cpp b/web/protocol/http/incoming/SimpleBodyDecoder.cpp new file mode 100644 index 00000000..a83649ba --- /dev/null +++ b/web/protocol/http/incoming/SimpleBodyDecoder.cpp @@ -0,0 +1,247 @@ +/*************************************************************************** + * + * Project _____ __ ____ _ _ + * ( _ ) /__\ (_ _)_| |_ _| |_ + * )(_)( /(__)\ )( (_ _)(_ _) + * (_____)(__)(__)(__) |_| |_| + * + * + * Copyright 2018-present, Leonid Stryzhevskyi, + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + ***************************************************************************/ + +#include "SimpleBodyDecoder.hpp" + +#include "oatpp/core/data/stream/StreamBufferedProxy.hpp" +#include "oatpp/core/utils/ConversionUtils.hpp" + +namespace oatpp { namespace web { namespace protocol { namespace http { namespace incoming { + +os::io::Library::v_size SimpleBodyDecoder::readLine(const std::shared_ptr& fromStream, + p_char8 buffer, + os::io::Library::v_size maxLineSize) { + + v_char8 a; + os::io::Library::v_size count = 0; + while (fromStream->read(&a, 1) > 0) { + if(a != '\r') { + if(count + 1 > maxLineSize) { + OATPP_LOGE("BodyDecoder", "Error - too long line"); + return 0; + } + buffer[count++] = a; + } else { + fromStream->read(&a, 1); + if(a != '\n'){ + OATPP_LOGE("BodyDecoder", "Warning - invalid line breaker"); + } + return count; // size of line + } + } + + return count; + +} + +void SimpleBodyDecoder::doChunkedDecoding(const std::shared_ptr& fromStream, + const std::shared_ptr& toStream) { + + auto buffer = oatpp::data::buffer::IOBuffer::createShared(); + + v_int32 maxLineSize = 8; // 0xFFFFFFFF 4Gb for chunk + v_char8 lineBuffer[maxLineSize + 1]; + os::io::Library::v_size countToRead; + + do { + + auto lineSize = readLine(fromStream, lineBuffer, maxLineSize); + if(lineSize == 0 || lineSize >= maxLineSize) { + return; // error reading stream + } + lineBuffer[lineSize] = 0; + countToRead = std::strtol((const char*)lineBuffer, nullptr, 16); + + if(countToRead > 0) { + oatpp::data::stream::transfer(fromStream, toStream, countToRead, buffer->getData(), buffer->getSize()); + } + + fromStream->read(lineBuffer, 2); // just skip "\r\n" + + } while (countToRead > 0); + +} + +void SimpleBodyDecoder::decode(const std::shared_ptr& headers, + const std::shared_ptr& bodyStream, + const std::shared_ptr& toStream) const { + + auto transferEncoding = headers->get(Header::TRANSFER_ENCODING, nullptr); + if(transferEncoding && transferEncoding->equals(Header::Value::TRANSFER_ENCODING_CHUNKED)) { + doChunkedDecoding(bodyStream, toStream); + } else { + oatpp::os::io::Library::v_size contentLength = 0; + auto contentLengthStr = headers->get(Header::CONTENT_LENGTH, nullptr); + if(!contentLengthStr) { + return; // DO NOTHING // it is an empty or invalid body + } else { + bool success; + contentLength = oatpp::utils::conversion::strToInt64(contentLengthStr, success); + if(!success){ + return; // it is an invalid request/response + } + auto buffer = oatpp::data::buffer::IOBuffer::createShared(); + oatpp::data::stream::transfer(bodyStream, toStream, contentLength, buffer->getData(), buffer->getSize()); + } + } + +} + +oatpp::async::Action SimpleBodyDecoder::doChunkedDecodingAsync(oatpp::async::AbstractCoroutine* parentCoroutine, + const oatpp::async::Action& actionOnReturn, + const std::shared_ptr& fromStream, + const std::shared_ptr& toStream) { + + class ChunkedDecoder : public oatpp::async::Coroutine { + private: + const v_int32 MAX_LINE_SIZE = 8; + private: + std::shared_ptr m_fromStream; + std::shared_ptr m_toStream; + std::shared_ptr m_buffer = oatpp::data::buffer::IOBuffer::createShared(); + v_int32 m_currLineLength; + v_char8 m_lineChar; + bool m_lineEnding; + v_char8 m_lineBuffer [16]; // used max 8 + void* m_skipData; + os::io::Library::v_size m_skipSize; + bool m_done = false; + private: + void prepareSkipRN() { + m_skipData = &m_lineBuffer[0]; + m_skipSize = 2; + m_currLineLength = 0; + m_lineEnding = false; + } + public: + + ChunkedDecoder(const std::shared_ptr& fromStream, + const std::shared_ptr& toStream) + : m_fromStream(fromStream) + , m_toStream(toStream) + {} + + Action act() override { + m_currLineLength = 0; + m_lineEnding = false; + return yieldTo(&ChunkedDecoder::readLineChar); + } + + Action readLineChar() { + auto res = m_fromStream->read(&m_lineChar, 1); + if(res == oatpp::data::stream::Errors::ERROR_IO_WAIT_RETRY) { + return oatpp::async::Action::_WAIT_RETRY; + } else if(res == oatpp::data::stream::Errors::ERROR_IO_RETRY) { + return oatpp::async::Action::_REPEAT; + } else if( res < 0) { + return error("[BodyDecoder::ChunkedDecoder] Can't read line char"); + } + return yieldTo(&ChunkedDecoder::onLineCharRead); + } + + Action onLineCharRead() { + if(!m_lineEnding) { + if(m_lineChar != '\r') { + if(m_currLineLength + 1 > MAX_LINE_SIZE){ + return error("[BodyDecoder::ChunkedDecoder] too long line"); + } + m_lineBuffer[m_currLineLength ++] = m_lineChar; + return yieldTo(&ChunkedDecoder::readLineChar); + } else { + m_lineEnding = true; + return yieldTo(&ChunkedDecoder::readLineChar); + } + } else { + if(m_lineChar != '\n') { + OATPP_LOGD("[BodyDecoder::ChunkedDecoder]", "Warning - invalid line breaker") + } + } + if(m_currLineLength == 0) { + return error("Error reading stream. 0-length line"); + } + m_lineBuffer[m_currLineLength] = 0; + return yieldTo(&ChunkedDecoder::onLineRead); + } + + Action onLineRead() { + os::io::Library::v_size countToRead = std::strtol((const char*) m_lineBuffer, nullptr, 16); + if(countToRead > 0) { + prepareSkipRN(); + return oatpp::data::stream::transferAsync(this, yieldTo(&ChunkedDecoder::skipRN), m_fromStream, m_toStream, countToRead, m_buffer); + } + m_done = true; + prepareSkipRN(); + return yieldTo(&ChunkedDecoder::skipRN); + } + + Action skipRN() { + if(m_done) { + return oatpp::data::stream::readExactSizeDataAsyncInline(m_fromStream.get(), + m_skipData, + m_skipSize, + finish()); + } else { + return oatpp::data::stream::readExactSizeDataAsyncInline(m_fromStream.get(), + m_skipData, + m_skipSize, + yieldTo(&ChunkedDecoder::readLineChar)); + } + } + + }; + + return parentCoroutine->startCoroutine(actionOnReturn, fromStream, toStream); + +} + +oatpp::async::Action SimpleBodyDecoder::decodeAsync(oatpp::async::AbstractCoroutine* parentCoroutine, + const oatpp::async::Action& actionOnReturn, + const std::shared_ptr& headers, + const std::shared_ptr& bodyStream, + const std::shared_ptr& toStream) const { + auto transferEncoding = headers->get(Header::TRANSFER_ENCODING, nullptr); + if(transferEncoding && transferEncoding->equals(Header::Value::TRANSFER_ENCODING_CHUNKED)) { + return doChunkedDecodingAsync(parentCoroutine, actionOnReturn, bodyStream, toStream); + } else { + oatpp::os::io::Library::v_size contentLength = 0; + auto contentLengthStr = headers->get(Header::CONTENT_LENGTH, nullptr); + if(!contentLengthStr) { + return actionOnReturn; // DO NOTHING // it is an empty or invalid body + } else { + bool success; + contentLength = oatpp::utils::conversion::strToInt64(contentLengthStr, success); + if(!success){ + return oatpp::async::Action(oatpp::async::Error("Invalid 'Content-Length' Header")); + } + return oatpp::data::stream::transferAsync(parentCoroutine, + actionOnReturn, + bodyStream, + toStream, + contentLength, + oatpp::data::buffer::IOBuffer::createShared()); + } + } +} + +}}}}} diff --git a/web/protocol/http/incoming/SimpleBodyDecoder.hpp b/web/protocol/http/incoming/SimpleBodyDecoder.hpp new file mode 100644 index 00000000..6052a888 --- /dev/null +++ b/web/protocol/http/incoming/SimpleBodyDecoder.hpp @@ -0,0 +1,61 @@ +/*************************************************************************** + * + * Project _____ __ ____ _ _ + * ( _ ) /__\ (_ _)_| |_ _| |_ + * )(_)( /(__)\ )( (_ _)(_ _) + * (_____)(__)(__)(__) |_| |_| + * + * + * Copyright 2018-present, Leonid Stryzhevskyi, + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + ***************************************************************************/ + +#ifndef oatpp_web_protocol_http_incoming_SimpleBodyDecoder_hpp +#define oatpp_web_protocol_http_incoming_SimpleBodyDecoder_hpp + +#include "BodyDecoder.hpp" + +namespace oatpp { namespace web { namespace protocol { namespace http { namespace incoming { + +class SimpleBodyDecoder : public BodyDecoder { +private: + static os::io::Library::v_size readLine(const std::shared_ptr& fromStream, + p_char8 buffer, + os::io::Library::v_size maxLineSize); + static void doChunkedDecoding(const std::shared_ptr& from, + const std::shared_ptr& toStream); + + static oatpp::async::Action doChunkedDecodingAsync(oatpp::async::AbstractCoroutine* parentCoroutine, + const oatpp::async::Action& actionOnReturn, + const std::shared_ptr& fromStream, + const std::shared_ptr& toStream); +public: + + void decode(const std::shared_ptr& headers, + const std::shared_ptr& bodyStream, + const std::shared_ptr& toStream) const override; + + oatpp::async::Action decodeAsync(oatpp::async::AbstractCoroutine* parentCoroutine, + const oatpp::async::Action& actionOnReturn, + const std::shared_ptr& headers, + const std::shared_ptr& bodyStream, + const std::shared_ptr& toStream) const override; + + +}; + +}}}}} + +#endif /* oatpp_web_protocol_http_incoming_SimpleBodyDecoder_hpp */ diff --git a/web/server/AsyncHttpConnectionHandler.cpp b/web/server/AsyncHttpConnectionHandler.cpp index 5b313a1a..ee21b8d6 100644 --- a/web/server/AsyncHttpConnectionHandler.cpp +++ b/web/server/AsyncHttpConnectionHandler.cpp @@ -45,6 +45,7 @@ void AsyncHttpConnectionHandler::Task::consumeConnections(oatpp::async::Processo while (curr != nullptr) { auto coroutine = HttpProcessor::Coroutine::getBench().obtain(m_router, + m_bodyDecoder, m_errorHandler, m_requestInterceptors, curr->getData()->connection, diff --git a/web/server/AsyncHttpConnectionHandler.hpp b/web/server/AsyncHttpConnectionHandler.hpp index bfba4807..161f0fd0 100644 --- a/web/server/AsyncHttpConnectionHandler.hpp +++ b/web/server/AsyncHttpConnectionHandler.hpp @@ -31,6 +31,8 @@ #include "./HttpRouter.hpp" +#include "oatpp/web/protocol/http/incoming/SimpleBodyDecoder.hpp" + #include "oatpp/web/protocol/http/incoming/Request.hpp" #include "oatpp/web/protocol/http/outgoing/Response.hpp" @@ -62,25 +64,29 @@ private: Connections m_connections; private: HttpRouter* m_router; + std::shared_ptr m_bodyDecoder; std::shared_ptr m_errorHandler; HttpProcessor::RequestInterceptors* m_requestInterceptors; std::mutex m_taskMutex; std::condition_variable m_taskCondition; public: Task(HttpRouter* router, + const std::shared_ptr& bodyDecoder, const std::shared_ptr& errorHandler, HttpProcessor::RequestInterceptors* requestInterceptors) : m_atom(false) , m_router(router) + , m_bodyDecoder(bodyDecoder) , m_errorHandler(errorHandler) , m_requestInterceptors(requestInterceptors) {} public: static std::shared_ptr createShared(HttpRouter* router, + const std::shared_ptr& bodyDecoder, const std::shared_ptr& errorHandler, HttpProcessor::RequestInterceptors* requestInterceptors){ - return std::make_shared(router, errorHandler, requestInterceptors); + return std::make_shared(router, bodyDecoder, errorHandler, requestInterceptors); } void run() override; @@ -108,9 +114,14 @@ public: , m_taskBalancer(0) , m_threadCount(threadCount) { + + // TODO make bodyDecoder configurable here + std::shared_ptr bodyDecoder = + std::make_shared(); + m_tasks = new std::shared_ptr[m_threadCount]; for(v_int32 i = 0; i < m_threadCount; i++) { - auto task = Task::createShared(m_router.get(), m_errorHandler, &m_requestInterceptors); + auto task = Task::createShared(m_router.get(), bodyDecoder, m_errorHandler, &m_requestInterceptors); m_tasks[i] = task; concurrency::Thread thread(task); thread.detach(); diff --git a/web/server/HttpConnectionHandler.cpp b/web/server/HttpConnectionHandler.cpp index e0e1fe65..b661d01b 100644 --- a/web/server/HttpConnectionHandler.cpp +++ b/web/server/HttpConnectionHandler.cpp @@ -42,7 +42,7 @@ void HttpConnectionHandler::Task::run(){ bool keepAlive = true; do { - auto response = HttpProcessor::processRequest(m_router, m_connection, m_errorHandler, m_requestInterceptors, buffer, bufferSize, inStream, keepAlive); + auto response = HttpProcessor::processRequest(m_router, m_connection, m_bodyDecoder, m_errorHandler, m_requestInterceptors, buffer, bufferSize, inStream, keepAlive); if(response) { outStream->setBufferPosition(0, 0); @@ -59,7 +59,7 @@ void HttpConnectionHandler::Task::run(){ void HttpConnectionHandler::handleConnection(const std::shared_ptr& connection){ /* Create working thread */ - concurrency::Thread thread(Task::createShared(m_router.get(), connection, m_errorHandler, &m_requestInterceptors)); + concurrency::Thread thread(Task::createShared(m_router.get(), connection, m_bodyDecoder, m_errorHandler, &m_requestInterceptors)); /* Get hardware concurrency -1 in order to have 1cpu free of workers. */ v_int32 concurrency = oatpp::concurrency::Thread::getHardwareConcurrency(); diff --git a/web/server/HttpConnectionHandler.hpp b/web/server/HttpConnectionHandler.hpp index 85773bbf..bc2a117d 100644 --- a/web/server/HttpConnectionHandler.hpp +++ b/web/server/HttpConnectionHandler.hpp @@ -29,6 +29,8 @@ #include "./handler/ErrorHandler.hpp" #include "./HttpRouter.hpp" +#include "oatpp/web/protocol/http/incoming/SimpleBodyDecoder.hpp" + #include "oatpp/web/protocol/http/incoming/Request.hpp" #include "oatpp/web/protocol/http/outgoing/Response.hpp" @@ -50,15 +52,18 @@ private: private: HttpRouter* m_router; std::shared_ptr m_connection; + std::shared_ptr m_bodyDecoder; std::shared_ptr m_errorHandler; HttpProcessor::RequestInterceptors* m_requestInterceptors; public: Task(HttpRouter* router, const std::shared_ptr& connection, + const std::shared_ptr& bodyDecoder, const std::shared_ptr& errorHandler, HttpProcessor::RequestInterceptors* requestInterceptors) : m_router(router) , m_connection(connection) + , m_bodyDecoder(bodyDecoder) , m_errorHandler(errorHandler) , m_requestInterceptors(requestInterceptors) {} @@ -66,9 +71,10 @@ private: static std::shared_ptr createShared(HttpRouter* router, const std::shared_ptr& connection, + const std::shared_ptr& bodyDecoder, const std::shared_ptr& errorHandler, HttpProcessor::RequestInterceptors* requestInterceptors) { - return std::make_shared(router, connection, errorHandler, requestInterceptors); + return std::make_shared(router, connection, bodyDecoder, errorHandler, requestInterceptors); } void run() override; @@ -77,11 +83,13 @@ private: private: std::shared_ptr m_router; + std::shared_ptr m_bodyDecoder; std::shared_ptr m_errorHandler; HttpProcessor::RequestInterceptors m_requestInterceptors; public: HttpConnectionHandler(const std::shared_ptr& router) : m_router(router) + , m_bodyDecoder(std::make_shared()) , m_errorHandler(handler::DefaultErrorHandler::createShared()) {} public: diff --git a/web/server/HttpProcessor.cpp b/web/server/HttpProcessor.cpp index 90c32e03..e3235fff 100644 --- a/web/server/HttpProcessor.cpp +++ b/web/server/HttpProcessor.cpp @@ -34,6 +34,7 @@ const char* HttpProcessor::RETURN_KEEP_ALIVE = "RETURN_KEEP_ALIVE"; std::shared_ptr HttpProcessor::processRequest(HttpRouter* router, const std::shared_ptr& connection, + const std::shared_ptr& bodyDecoder, const std::shared_ptr& errorHandler, RequestInterceptors* requestInterceptors, void* buffer, @@ -66,7 +67,7 @@ HttpProcessor::processRequest(HttpRouter* router, auto bodyStream = inStream; bodyStream->setBufferPosition(caret.getPosition(), (v_int32) readCount); - auto request = protocol::http::incoming::Request::createShared(line, route.matchMap, headers, bodyStream); + auto request = protocol::http::incoming::Request::createShared(line, route.matchMap, headers, bodyStream, bodyDecoder); std::shared_ptr response; try{ auto currInterceptor = requestInterceptors->getFirstNode(); @@ -136,7 +137,7 @@ HttpProcessor::Coroutine::Action HttpProcessor::Coroutine::parseRequest(v_int32 auto bodyStream = m_inStream; bodyStream->setBufferPosition(caret.getPosition(), readCount); - m_currentRequest = protocol::http::incoming::Request::createShared(line, m_currentRoute.matchMap, headers, bodyStream); + m_currentRequest = protocol::http::incoming::Request::createShared(line, m_currentRoute.matchMap, headers, bodyStream, m_bodyDecoder); auto currInterceptor = m_requestInterceptors->getFirstNode(); while (currInterceptor != nullptr) { diff --git a/web/server/HttpProcessor.hpp b/web/server/HttpProcessor.hpp index 35324257..c6198440 100644 --- a/web/server/HttpProcessor.hpp +++ b/web/server/HttpProcessor.hpp @@ -68,6 +68,7 @@ public: Action parseRequest(v_int32 readCount); private: HttpRouter* m_router; + std::shared_ptr m_bodyDecoder; std::shared_ptr m_errorHandler; RequestInterceptors* m_requestInterceptors; std::shared_ptr m_connection; @@ -82,6 +83,7 @@ public: public: Coroutine(HttpRouter* router, + const std::shared_ptr& bodyDecoder, const std::shared_ptr& errorHandler, RequestInterceptors* requestInterceptors, const std::shared_ptr& connection, @@ -89,6 +91,7 @@ public: const std::shared_ptr& outStream, const std::shared_ptr& inStream) : m_router(router) + , m_bodyDecoder(bodyDecoder) , m_errorHandler(errorHandler) , m_requestInterceptors(requestInterceptors) , m_connection(connection) @@ -114,6 +117,7 @@ public: static std::shared_ptr processRequest(HttpRouter* router, const std::shared_ptr& connection, + const std::shared_ptr& bodyDecoder, const std::shared_ptr& errorHandler, RequestInterceptors* requestInterceptors, void* buffer,