Merge pull request #24 from oatpp/body_decoder_configuring

Body decoder configuring
This commit is contained in:
Leonid Stryzhevskyi 2018-11-09 02:40:31 +02:00 committed by GitHub
commit 8bb6b2c265
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
19 changed files with 543 additions and 356 deletions

View File

@ -126,6 +126,8 @@ add_library(oatpp
web/protocol/http/Http.hpp
web/protocol/http/incoming/BodyDecoder.cpp
web/protocol/http/incoming/BodyDecoder.hpp
web/protocol/http/incoming/SimpleBodyDecoder.cpp
web/protocol/http/incoming/SimpleBodyDecoder.hpp
web/protocol/http/incoming/Request.cpp
web/protocol/http/incoming/Request.hpp
web/protocol/http/incoming/Response.cpp

View File

@ -117,22 +117,36 @@ const std::shared_ptr<OutputStream>& operator << (const std::shared_ptr<OutputSt
return s;
}
void transfer(const std::shared_ptr<InputStream>& fromStream,
const std::shared_ptr<OutputStream>& toStream,
oatpp::os::io::Library::v_size transferSize,
void* buffer,
oatpp::os::io::Library::v_size bufferSize) {
oatpp::os::io::Library::v_size transfer(const std::shared_ptr<InputStream>& fromStream,
const std::shared_ptr<OutputStream>& toStream,
oatpp::os::io::Library::v_size transferSize,
void* buffer,
oatpp::os::io::Library::v_size bufferSize) {
while (transferSize > 0) {
oatpp::os::io::Library::v_size desiredReadCount = transferSize;
if(desiredReadCount > bufferSize){
oatpp::os::io::Library::v_size progress = 0;
while (transferSize == 0 || progress < transferSize) {
oatpp::os::io::Library::v_size desiredReadCount = transferSize - progress;
if(transferSize == 0 || desiredReadCount > bufferSize){
desiredReadCount = bufferSize;
}
auto readCount = fromStream->read(buffer, desiredReadCount);
toStream->write(buffer, readCount);
transferSize -= readCount;
auto readResult = fromStream->read(buffer, desiredReadCount);
if(readResult > 0) {
auto writeReasul = writeExactSizeData(toStream.get(), buffer, readResult);
if(writeReasul != readResult) {
throw std::runtime_error("[oatpp::data::stream::transfer()]: Unknown Error. Can't continue transfer.");
}
progress += readResult;
} else {
if(readResult == oatpp::data::stream::Errors::ERROR_IO_RETRY || readResult == oatpp::data::stream::Errors::ERROR_IO_WAIT_RETRY) {
continue;
}
return progress;
}
}
return progress;
}
oatpp::async::Action transferAsync(oatpp::async::AbstractCoroutine* parentCoroutine,
@ -147,6 +161,7 @@ oatpp::async::Action transferAsync(oatpp::async::AbstractCoroutine* parentCorout
std::shared_ptr<InputStream> m_fromStream;
std::shared_ptr<OutputStream> m_toStream;
oatpp::os::io::Library::v_size m_transferSize;
oatpp::os::io::Library::v_size m_progress;
std::shared_ptr<oatpp::data::buffer::IOBuffer> m_buffer;
oatpp::os::io::Library::v_size m_desiredReadCount;
@ -163,19 +178,24 @@ oatpp::async::Action transferAsync(oatpp::async::AbstractCoroutine* parentCorout
: m_fromStream(fromStream)
, m_toStream(toStream)
, m_transferSize(transferSize)
, m_progress(0)
, m_buffer(buffer)
{}
Action act() override {
if(m_transferSize == 0) {
return finish();
/* m_transferSize == 0 - is a legal state. */
if(m_transferSize > 0) {
if(m_progress == m_transferSize) {
return finish();
} else if(m_progress > m_transferSize) {
throw std::runtime_error("[oatpp::data::stream::transferAsync{TransferCoroutine::act()}]: Invalid state. m_progress > m_transferSize");
}
} else if(m_transferSize < 0) {
throw std::runtime_error("Invalid stream::TransferCoroutine state");
throw std::runtime_error("[oatpp::data::stream::transferAsync{TransferCoroutine::act()}]: Invalid state. m_transferSize < 0");
}
m_desiredReadCount = m_transferSize;
if(m_desiredReadCount > m_buffer->getSize()){
m_desiredReadCount = m_transferSize - m_progress;
if(m_transferSize == 0 || m_desiredReadCount > m_buffer->getSize()){
m_desiredReadCount = m_buffer->getSize();
}
@ -188,22 +208,31 @@ oatpp::async::Action transferAsync(oatpp::async::AbstractCoroutine* parentCorout
Action doRead() {
return oatpp::data::stream::readSomeDataAsyncInline(m_fromStream.get(),
m_readBufferPtr,
m_bytesLeft,
yieldTo(&TransferCoroutine::prepareWrite));
m_readBufferPtr,
m_bytesLeft,
yieldTo(&TransferCoroutine::prepareWrite));
}
Action prepareWrite() {
m_bytesLeft = m_desiredReadCount - m_bytesLeft;
m_transferSize -= m_bytesLeft;
m_progress += m_bytesLeft;
return yieldTo(&TransferCoroutine::doWrite);
}
Action doWrite() {
return oatpp::data::stream::writeDataAsyncInline(m_toStream.get(),
m_writeBufferPtr,
m_bytesLeft,
yieldTo(&TransferCoroutine::act));
m_writeBufferPtr,
m_bytesLeft,
yieldTo(&TransferCoroutine::act));
}
Action handleError(const oatpp::async::Error& error) override {
if(!error.isExceptionThrown && error.message == Errors::ERROR_ASYNC_FAILED_TO_READ_DATA) {
if(m_transferSize == 0) {
return finish();
}
}
return error;
}
};
@ -276,18 +305,25 @@ oatpp::async::Action readExactSizeDataAsyncInline(oatpp::data::stream::InputStre
}
oatpp::os::io::Library::v_size writeExactSizeData(oatpp::data::stream::OutputStream* stream, const void* data, os::io::Library::v_size size) {
const char* buffer = (char*)data;
oatpp::os::io::Library::v_size progress = 0;
while (progress < size) {
auto res = stream->write(&buffer[progress], size - progress);
if(res <= 0) { // if res == 0 then probably stream handles write() error incorrectly. return.
if(res == oatpp::data::stream::Errors::ERROR_IO_PIPE ||
(res != oatpp::data::stream::Errors::ERROR_IO_RETRY && res != oatpp::data::stream::Errors::ERROR_IO_WAIT_RETRY)) {
return progress;
if(res > 0) {
progress += res;
} else { // if res == 0 then probably stream handles write() error incorrectly. return.
if(res == oatpp::data::stream::Errors::ERROR_IO_RETRY || res == oatpp::data::stream::Errors::ERROR_IO_WAIT_RETRY) {
continue;
}
return progress;
}
progress += res;
}
return progress;
}

View File

@ -138,12 +138,15 @@ const std::shared_ptr<OutputStream>& operator << (const std::shared_ptr<OutputSt
/**
* Read bytes from @fromStream" and write to @toStream" using @buffer of size @bufferSize
* transfer up to transferSize or until error if transferSize == 0
* throws in case readCount != writeCount
*/
void transfer(const std::shared_ptr<InputStream>& fromStream,
const std::shared_ptr<OutputStream>& toStream,
oatpp::os::io::Library::v_size transferSize,
void* buffer,
oatpp::os::io::Library::v_size bufferSize);
oatpp::os::io::Library::v_size transfer(const std::shared_ptr<InputStream>& fromStream,
const std::shared_ptr<OutputStream>& toStream,
oatpp::os::io::Library::v_size transferSize,
void* buffer,
oatpp::os::io::Library::v_size bufferSize);
/**
* Same as transfer but asynchronous

View File

@ -92,7 +92,7 @@ HttpRequestExecutor::execute(const String& method,
caret.getPosition(),
(v_int32) readCount);
return Response::createShared(line->statusCode, line->description, responseHeaders, bodyStream);
return Response::createShared(line->statusCode, line->description, responseHeaders, bodyStream, m_bodyDecoder);
}
@ -110,6 +110,7 @@ oatpp::async::Action HttpRequestExecutor::executeAsync(oatpp::async::AbstractCor
String m_path;
std::shared_ptr<Headers> m_headers;
std::shared_ptr<Body> m_body;
std::shared_ptr<const oatpp::web::protocol::http::incoming::BodyDecoder> m_bodyDecoder;
private:
std::shared_ptr<oatpp::data::stream::IOStream> m_connection;
std::shared_ptr<oatpp::data::buffer::IOBuffer> m_ioBuffer;
@ -121,12 +122,14 @@ oatpp::async::Action HttpRequestExecutor::executeAsync(oatpp::async::AbstractCor
const String& method,
const String& path,
const std::shared_ptr<Headers>& headers,
const std::shared_ptr<Body>& body)
const std::shared_ptr<Body>& body,
const std::shared_ptr<const oatpp::web::protocol::http::incoming::BodyDecoder>& bodyDecoder)
: m_connectionProvider(connectionProvider)
, m_method(method)
, m_path(path)
, m_headers(headers)
, m_body(body)
, m_bodyDecoder(bodyDecoder)
{}
Action act() override {
@ -175,7 +178,7 @@ oatpp::async::Action HttpRequestExecutor::executeAsync(oatpp::async::AbstractCor
caret.getPosition(),
(v_int32) readCount);
return _return(Response::createShared(line->statusCode, line->description, headers, bodyStream));
return _return(Response::createShared(line->statusCode, line->description, headers, bodyStream, m_bodyDecoder));
}
@ -185,7 +188,7 @@ oatpp::async::Action HttpRequestExecutor::executeAsync(oatpp::async::AbstractCor
};
return parentCoroutine->startCoroutineForResult<ExecutorCoroutine>(callback, m_connectionProvider, method, path, headers, body);
return parentCoroutine->startCoroutineForResult<ExecutorCoroutine>(callback, m_connectionProvider, method, path, headers, body, m_bodyDecoder);
}

View File

@ -26,6 +26,8 @@
#define oatpp_web_client_HttpRequestExecutor_hpp
#include "./RequestExecutor.hpp"
#include "oatpp/web/protocol/http/incoming/SimpleBodyDecoder.hpp"
#include "oatpp/network/ConnectionProvider.hpp"
namespace oatpp { namespace web { namespace client {
@ -33,15 +35,21 @@ namespace oatpp { namespace web { namespace client {
class HttpRequestExecutor : public oatpp::base::Controllable, public RequestExecutor {
protected:
std::shared_ptr<oatpp::network::ClientConnectionProvider> m_connectionProvider;
std::shared_ptr<const oatpp::web::protocol::http::incoming::BodyDecoder> m_bodyDecoder;
public:
HttpRequestExecutor(const std::shared_ptr<oatpp::network::ClientConnectionProvider>& connectionProvider)
HttpRequestExecutor(const std::shared_ptr<oatpp::network::ClientConnectionProvider>& connectionProvider,
const std::shared_ptr<const oatpp::web::protocol::http::incoming::BodyDecoder>& bodyDecoder =
std::make_shared<oatpp::web::protocol::http::incoming::SimpleBodyDecoder>())
: m_connectionProvider(connectionProvider)
, m_bodyDecoder(bodyDecoder)
{}
public:
static std::shared_ptr<HttpRequestExecutor>
createShared(const std::shared_ptr<oatpp::network::ClientConnectionProvider>& connectionProvider){
return std::make_shared<HttpRequestExecutor>(connectionProvider);
createShared(const std::shared_ptr<oatpp::network::ClientConnectionProvider>& connectionProvider,
const std::shared_ptr<const oatpp::web::protocol::http::incoming::BodyDecoder>& bodyDecoder =
std::make_shared<oatpp::web::protocol::http::incoming::SimpleBodyDecoder>()){
return std::make_shared<HttpRequestExecutor>(connectionProvider, bodyDecoder);
}
/**

View File

@ -316,31 +316,35 @@ oatpp::String Protocol::parseHeaderName(oatpp::parser::ParsingCaret& caret) {
return nullptr;
}
void Protocol::parseOneHeader(Headers& headers, oatpp::parser::ParsingCaret& caret, Status& error) {
caret.findNotSpaceChar();
auto name = parseHeaderName(caret);
if(name) {
caret.findNotSpaceChar();
if(!caret.canContinueAtChar(':', 1)) {
error = Status::CODE_400;
return;
}
caret.findNotSpaceChar();
oatpp::parser::ParsingCaret::Label label(caret);
caret.findRN();
headers.put(name, label.toString(true));
caret.skipRN();
} else {
error = Status::CODE_431;
return;
}
}
std::shared_ptr<Protocol::Headers> Protocol::parseHeaders(oatpp::parser::ParsingCaret& caret, Status& error) {
auto headers = Headers::createShared();
while (!caret.isAtRN()) {
caret.findNotSpaceChar();
auto name = parseHeaderName(caret);
if(name) {
caret.findNotSpaceChar();
if(!caret.canContinueAtChar(':', 1)) {
error = Status::CODE_400;
return nullptr;
}
caret.findNotSpaceChar();
oatpp::parser::ParsingCaret::Label label(caret);
caret.findRN();
headers->put(name, label.toString(true));
caret.skipRN();
} else {
error = Status::CODE_431;
parseOneHeader(*headers, caret, error);
if(error.code != 0) {
return nullptr;
}
}
caret.skipRN();

View File

@ -273,6 +273,11 @@ public:
static std::shared_ptr<RequestStartingLine> parseRequestStartingLine(oatpp::parser::ParsingCaret& caret);
static std::shared_ptr<ResponseStartingLine> parseResponseStartingLine(oatpp::parser::ParsingCaret& caret);
/**
* Parse header and store it in headers map
*/
static void parseOneHeader(Headers& headers, oatpp::parser::ParsingCaret& caret, Status& error);
static std::shared_ptr<Headers> parseHeaders(oatpp::parser::ParsingCaret& caret, Status& error);
};

View File

@ -24,224 +24,10 @@
#include "BodyDecoder.hpp"
#include "oatpp/core/data/stream/StreamBufferedProxy.hpp"
#include "oatpp/core/utils/ConversionUtils.hpp"
namespace oatpp { namespace web { namespace protocol { namespace http { namespace incoming {
os::io::Library::v_size BodyDecoder::readLine(const std::shared_ptr<oatpp::data::stream::InputStream>& fromStream,
p_char8 buffer,
os::io::Library::v_size maxLineSize) {
v_char8 a;
os::io::Library::v_size count = 0;
while (fromStream->read(&a, 1) > 0) {
if(a != '\r') {
if(count + 1 > maxLineSize) {
OATPP_LOGE("BodyDecoder", "Error - too long line");
return 0;
}
buffer[count++] = a;
} else {
fromStream->read(&a, 1);
if(a != '\n'){
OATPP_LOGE("BodyDecoder", "Warning - invalid line breaker");
}
return count; // size of line
}
}
return count;
}
void BodyDecoder::doChunkedDecoding(const std::shared_ptr<oatpp::data::stream::InputStream>& fromStream,
const std::shared_ptr<oatpp::data::stream::OutputStream>& toStream) {
auto buffer = oatpp::data::buffer::IOBuffer::createShared();
v_int32 maxLineSize = 8; // 0xFFFFFFFF 4Gb for chunk
v_char8 lineBuffer[maxLineSize + 1];
os::io::Library::v_size countToRead;
do {
auto lineSize = readLine(fromStream, lineBuffer, maxLineSize);
if(lineSize == 0 || lineSize >= maxLineSize) {
return; // error reading stream
}
lineBuffer[lineSize] = 0;
countToRead = std::strtol((const char*)lineBuffer, nullptr, 16);
if(countToRead > 0) {
oatpp::data::stream::transfer(fromStream, toStream, countToRead, buffer->getData(), buffer->getSize());
}
fromStream->read(lineBuffer, 2); // just skip "\r\n"
} while (countToRead > 0);
}
void BodyDecoder::decode(const std::shared_ptr<Protocol::Headers>& headers,
const std::shared_ptr<oatpp::data::stream::InputStream>& bodyStream,
const std::shared_ptr<oatpp::data::stream::OutputStream>& toStream) {
auto transferEncoding = headers->get(Header::TRANSFER_ENCODING, nullptr);
if(transferEncoding && transferEncoding->equals(Header::Value::TRANSFER_ENCODING_CHUNKED)) {
doChunkedDecoding(bodyStream, toStream);
} else {
oatpp::os::io::Library::v_size contentLength = 0;
auto contentLengthStr = headers->get(Header::CONTENT_LENGTH, nullptr);
if(!contentLengthStr) {
return; // DO NOTHING // it is an empty or invalid body
} else {
bool success;
contentLength = oatpp::utils::conversion::strToInt64(contentLengthStr, success);
if(!success){
return; // it is an invalid request/response
}
auto buffer = oatpp::data::buffer::IOBuffer::createShared();
oatpp::data::stream::transfer(bodyStream, toStream, contentLength, buffer->getData(), buffer->getSize());
}
}
}
oatpp::async::Action BodyDecoder::doChunkedDecodingAsync(oatpp::async::AbstractCoroutine* parentCoroutine,
const oatpp::async::Action& actionOnReturn,
const std::shared_ptr<oatpp::data::stream::InputStream>& fromStream,
const std::shared_ptr<oatpp::data::stream::OutputStream>& toStream) {
class ChunkedDecoder : public oatpp::async::Coroutine<ChunkedDecoder> {
private:
const v_int32 MAX_LINE_SIZE = 8;
private:
std::shared_ptr<oatpp::data::stream::InputStream> m_fromStream;
std::shared_ptr<oatpp::data::stream::OutputStream> m_toStream;
std::shared_ptr<oatpp::data::buffer::IOBuffer> m_buffer = oatpp::data::buffer::IOBuffer::createShared();
v_int32 m_currLineLength;
v_char8 m_lineChar;
bool m_lineEnding;
v_char8 m_lineBuffer [16]; // used max 8
void* m_skipData;
os::io::Library::v_size m_skipSize;
bool m_done = false;
private:
void prepareSkipRN() {
m_skipData = &m_lineBuffer[0];
m_skipSize = 2;
m_currLineLength = 0;
m_lineEnding = false;
}
public:
ChunkedDecoder(const std::shared_ptr<oatpp::data::stream::InputStream>& fromStream,
const std::shared_ptr<oatpp::data::stream::OutputStream>& toStream)
: m_fromStream(fromStream)
, m_toStream(toStream)
{}
Action act() override {
m_currLineLength = 0;
m_lineEnding = false;
return yieldTo(&ChunkedDecoder::readLineChar);
}
Action readLineChar() {
auto res = m_fromStream->read(&m_lineChar, 1);
if(res == oatpp::data::stream::Errors::ERROR_IO_WAIT_RETRY) {
return oatpp::async::Action::_WAIT_RETRY;
} else if(res == oatpp::data::stream::Errors::ERROR_IO_RETRY) {
return oatpp::async::Action::_REPEAT;
} else if( res < 0) {
return error("[BodyDecoder::ChunkedDecoder] Can't read line char");
}
return yieldTo(&ChunkedDecoder::onLineCharRead);
}
Action onLineCharRead() {
if(!m_lineEnding) {
if(m_lineChar != '\r') {
if(m_currLineLength + 1 > MAX_LINE_SIZE){
return error("[BodyDecoder::ChunkedDecoder] too long line");
}
m_lineBuffer[m_currLineLength ++] = m_lineChar;
return yieldTo(&ChunkedDecoder::readLineChar);
} else {
m_lineEnding = true;
return yieldTo(&ChunkedDecoder::readLineChar);
}
} else {
if(m_lineChar != '\n') {
OATPP_LOGD("[BodyDecoder::ChunkedDecoder]", "Warning - invalid line breaker")
}
}
if(m_currLineLength == 0) {
return error("Error reading stream. 0-length line");
}
m_lineBuffer[m_currLineLength] = 0;
return yieldTo(&ChunkedDecoder::onLineRead);
}
Action onLineRead() {
os::io::Library::v_size countToRead = std::strtol((const char*) m_lineBuffer, nullptr, 16);
if(countToRead > 0) {
prepareSkipRN();
return oatpp::data::stream::transferAsync(this, yieldTo(&ChunkedDecoder::skipRN), m_fromStream, m_toStream, countToRead, m_buffer);
}
m_done = true;
prepareSkipRN();
return yieldTo(&ChunkedDecoder::skipRN);
}
Action skipRN() {
if(m_done) {
return oatpp::data::stream::readExactSizeDataAsyncInline(m_fromStream.get(),
m_skipData,
m_skipSize,
finish());
} else {
return oatpp::data::stream::readExactSizeDataAsyncInline(m_fromStream.get(),
m_skipData,
m_skipSize,
yieldTo(&ChunkedDecoder::readLineChar));
}
}
};
return parentCoroutine->startCoroutine<ChunkedDecoder>(actionOnReturn, fromStream, toStream);
}
oatpp::async::Action BodyDecoder::decodeAsync(oatpp::async::AbstractCoroutine* parentCoroutine,
const oatpp::async::Action& actionOnReturn,
const std::shared_ptr<Protocol::Headers>& headers,
const std::shared_ptr<oatpp::data::stream::InputStream>& bodyStream,
const std::shared_ptr<oatpp::data::stream::OutputStream>& toStream) {
auto transferEncoding = headers->get(Header::TRANSFER_ENCODING, nullptr);
if(transferEncoding && transferEncoding->equals(Header::Value::TRANSFER_ENCODING_CHUNKED)) {
return doChunkedDecodingAsync(parentCoroutine, actionOnReturn, bodyStream, toStream);
} else {
oatpp::os::io::Library::v_size contentLength = 0;
auto contentLengthStr = headers->get(Header::CONTENT_LENGTH, nullptr);
if(!contentLengthStr) {
return actionOnReturn; // DO NOTHING // it is an empty or invalid body
} else {
bool success;
contentLength = oatpp::utils::conversion::strToInt64(contentLengthStr, success);
if(!success){
return oatpp::async::Action(oatpp::async::Error("Invalid 'Content-Length' Header"));
}
return oatpp::data::stream::transferAsync(parentCoroutine,
actionOnReturn,
bodyStream,
toStream,
contentLength,
oatpp::data::buffer::IOBuffer::createShared());
}
}
}
}}}}}

View File

@ -36,19 +36,22 @@ private:
class ToStringDecoder : public oatpp::async::CoroutineWithResult<ToStringDecoder, oatpp::String> {
private:
const BodyDecoder* m_decoder;
std::shared_ptr<Protocol::Headers> m_headers;
std::shared_ptr<oatpp::data::stream::InputStream> m_bodyStream;
std::shared_ptr<oatpp::data::stream::ChunkedBuffer> m_chunkedBuffer = oatpp::data::stream::ChunkedBuffer::createShared();
public:
ToStringDecoder(const std::shared_ptr<Protocol::Headers>& headers,
ToStringDecoder(const BodyDecoder* decoder,
const std::shared_ptr<Protocol::Headers>& headers,
const std::shared_ptr<oatpp::data::stream::InputStream>& bodyStream)
: m_headers(headers)
: m_decoder(decoder)
, m_headers(headers)
, m_bodyStream(bodyStream)
{}
Action act() override {
return decodeAsync(this, yieldTo(&ToStringDecoder::onDecoded), m_headers, m_bodyStream, m_chunkedBuffer);
return m_decoder->decodeAsync(this, yieldTo(&ToStringDecoder::onDecoded), m_headers, m_bodyStream, m_chunkedBuffer);
}
Action onDecoded() {
@ -60,22 +63,25 @@ private:
template<class Type>
class ToDtoDecoder : public oatpp::async::CoroutineWithResult<ToDtoDecoder<Type>, typename Type::ObjectWrapper> {
private:
const BodyDecoder* m_decoder;
std::shared_ptr<Protocol::Headers> m_headers;
std::shared_ptr<oatpp::data::stream::InputStream> m_bodyStream;
std::shared_ptr<oatpp::data::mapping::ObjectMapper> m_objectMapper;
std::shared_ptr<oatpp::data::stream::ChunkedBuffer> m_chunkedBuffer = oatpp::data::stream::ChunkedBuffer::createShared();
public:
ToDtoDecoder(const std::shared_ptr<Protocol::Headers>& headers,
ToDtoDecoder(const BodyDecoder* decoder,
const std::shared_ptr<Protocol::Headers>& headers,
const std::shared_ptr<oatpp::data::stream::InputStream>& bodyStream,
const std::shared_ptr<oatpp::data::mapping::ObjectMapper>& objectMapper)
: m_headers(headers)
: m_decoder(decoder)
, m_headers(headers)
, m_bodyStream(bodyStream)
, m_objectMapper(objectMapper)
{}
oatpp::async::Action act() override {
return decodeAsync(this, this->yieldTo(&ToDtoDecoder::onDecoded), m_headers, m_bodyStream, m_chunkedBuffer);
return m_decoder->decodeAsync(this, this->yieldTo(&ToDtoDecoder::onDecoded), m_headers, m_bodyStream, m_chunkedBuffer);
}
oatpp::async::Action onDecoded() {
@ -90,65 +96,47 @@ private:
};
private:
static os::io::Library::v_size readLine(const std::shared_ptr<oatpp::data::stream::InputStream>& fromStream,
p_char8 buffer,
os::io::Library::v_size maxLineSize);
static void doChunkedDecoding(const std::shared_ptr<oatpp::data::stream::InputStream>& from,
const std::shared_ptr<oatpp::data::stream::OutputStream>& toStream);
static oatpp::async::Action doChunkedDecodingAsync(oatpp::async::AbstractCoroutine* parentCoroutine,
const oatpp::async::Action& actionOnReturn,
const std::shared_ptr<oatpp::data::stream::InputStream>& fromStream,
const std::shared_ptr<oatpp::data::stream::OutputStream>& toStream);
public:
// read chunk by chunk from 'bodyStream' and write to 'toStream' using buffer::IOBuffer
virtual void decode(const std::shared_ptr<Protocol::Headers>& headers,
const std::shared_ptr<oatpp::data::stream::InputStream>& bodyStream,
const std::shared_ptr<oatpp::data::stream::OutputStream>& toStream) const = 0;
static void decode(const std::shared_ptr<Protocol::Headers>& headers,
const std::shared_ptr<oatpp::data::stream::InputStream>& bodyStream,
const std::shared_ptr<oatpp::data::stream::OutputStream>& toStream);
virtual oatpp::async::Action decodeAsync(oatpp::async::AbstractCoroutine* parentCoroutine,
const oatpp::async::Action& actionOnReturn,
const std::shared_ptr<Protocol::Headers>& headers,
const std::shared_ptr<oatpp::data::stream::InputStream>& bodyStream,
const std::shared_ptr<oatpp::data::stream::OutputStream>& toStream) const = 0;
static oatpp::String
decodeToString(const std::shared_ptr<Protocol::Headers>& headers,
const std::shared_ptr<oatpp::data::stream::InputStream>& bodyStream) {
oatpp::String decodeToString(const std::shared_ptr<Protocol::Headers>& headers,
const std::shared_ptr<oatpp::data::stream::InputStream>& bodyStream) const {
auto chunkedBuffer = oatpp::data::stream::ChunkedBuffer::createShared();
decode(headers, bodyStream, chunkedBuffer);
return chunkedBuffer->toString();
}
template<class Type>
static typename Type::ObjectWrapper decodeToDto(const std::shared_ptr<Protocol::Headers>& headers,
const std::shared_ptr<oatpp::data::stream::InputStream>& bodyStream,
const std::shared_ptr<oatpp::data::mapping::ObjectMapper>& objectMapper){
typename Type::ObjectWrapper decodeToDto(const std::shared_ptr<Protocol::Headers>& headers,
const std::shared_ptr<oatpp::data::stream::InputStream>& bodyStream,
const std::shared_ptr<oatpp::data::mapping::ObjectMapper>& objectMapper) const {
return objectMapper->readFromString<Type>(decodeToString(headers, bodyStream));
}
// Async
static oatpp::async::Action decodeAsync(oatpp::async::AbstractCoroutine* parentCoroutine,
const oatpp::async::Action& actionOnReturn,
const std::shared_ptr<Protocol::Headers>& headers,
const std::shared_ptr<oatpp::data::stream::InputStream>& bodyStream,
const std::shared_ptr<oatpp::data::stream::OutputStream>& toStream);
template<typename ParentCoroutineType>
static oatpp::async::Action
decodeToStringAsync(oatpp::async::AbstractCoroutine* parentCoroutine,
oatpp::async::Action (ParentCoroutineType::*callback)(const oatpp::String&),
const std::shared_ptr<Protocol::Headers>& headers,
const std::shared_ptr<oatpp::data::stream::InputStream>& bodyStream) {
return parentCoroutine->startCoroutineForResult<ToStringDecoder>(callback, headers, bodyStream);
oatpp::async::Action decodeToStringAsync(oatpp::async::AbstractCoroutine* parentCoroutine,
oatpp::async::Action (ParentCoroutineType::*callback)(const oatpp::String&),
const std::shared_ptr<Protocol::Headers>& headers,
const std::shared_ptr<oatpp::data::stream::InputStream>& bodyStream) const {
return parentCoroutine->startCoroutineForResult<ToStringDecoder>(callback, this, headers, bodyStream);
}
template<class DtoType, typename ParentCoroutineType>
static oatpp::async::Action
decodeToDtoAsync(oatpp::async::AbstractCoroutine* parentCoroutine,
oatpp::async::Action (ParentCoroutineType::*callback)(const typename DtoType::ObjectWrapper&),
const std::shared_ptr<Protocol::Headers>& headers,
const std::shared_ptr<oatpp::data::stream::InputStream>& bodyStream,
const std::shared_ptr<oatpp::data::mapping::ObjectMapper>& objectMapper) {
return parentCoroutine->startCoroutineForResult<ToDtoDecoder<DtoType>>(callback, headers, bodyStream, objectMapper);
oatpp::async::Action decodeToDtoAsync(oatpp::async::AbstractCoroutine* parentCoroutine,
oatpp::async::Action (ParentCoroutineType::*callback)(const typename DtoType::ObjectWrapper&),
const std::shared_ptr<Protocol::Headers>& headers,
const std::shared_ptr<oatpp::data::stream::InputStream>& bodyStream,
const std::shared_ptr<oatpp::data::mapping::ObjectMapper>& objectMapper) const {
return parentCoroutine->startCoroutineForResult<ToDtoDecoder<DtoType>>(callback, this, headers, bodyStream, objectMapper);
}
};

View File

@ -36,23 +36,29 @@ public:
OBJECT_POOL(Incoming_Request_Pool, Request, 32)
SHARED_OBJECT_POOL(Shared_Incoming_Request_Pool, Request, 32)
public:
Request(){}
Request(const std::shared_ptr<const http::incoming::BodyDecoder>& pBodyDecoder)
: bodyDecoder(pBodyDecoder)
{}
Request(const std::shared_ptr<http::RequestStartingLine>& pStartingLine,
const std::shared_ptr<url::mapping::Pattern::MatchMap>& pPathVariables,
const std::shared_ptr<http::Protocol::Headers>& pHeaders,
const std::shared_ptr<oatpp::data::stream::InputStream>& pBodyStream)
const std::shared_ptr<oatpp::data::stream::InputStream>& pBodyStream,
const std::shared_ptr<const http::incoming::BodyDecoder>& pBodyDecoder)
: startingLine(pStartingLine)
, pathVariables(pPathVariables)
, headers(pHeaders)
, bodyStream(pBodyStream)
, bodyDecoder(pBodyDecoder)
{}
public:
static std::shared_ptr<Request> createShared(const std::shared_ptr<http::RequestStartingLine>& startingLine,
const std::shared_ptr<url::mapping::Pattern::MatchMap>& pathVariables,
const std::shared_ptr<http::Protocol::Headers>& headers,
const std::shared_ptr<oatpp::data::stream::InputStream>& bodyStream) {
return Shared_Incoming_Request_Pool::allocateShared(startingLine, pathVariables, headers, bodyStream);
const std::shared_ptr<url::mapping::Pattern::MatchMap>& pathVariables,
const std::shared_ptr<http::Protocol::Headers>& headers,
const std::shared_ptr<oatpp::data::stream::InputStream>& bodyStream,
const std::shared_ptr<const http::incoming::BodyDecoder>& bodyDecoder) {
return Shared_Incoming_Request_Pool::allocateShared(startingLine, pathVariables, headers, bodyStream, bodyDecoder);
}
std::shared_ptr<http::RequestStartingLine> startingLine;
@ -60,6 +66,12 @@ public:
std::shared_ptr<http::Protocol::Headers> headers;
std::shared_ptr<oatpp::data::stream::InputStream> bodyStream;
/**
* Request should be preconfigured with default BodyDecoder.
* Custom BodyDecoder can be set on demand
*/
std::shared_ptr<const http::incoming::BodyDecoder> bodyDecoder;
oatpp::String getHeader(const oatpp::String& headerName) const{
auto entry = headers->find(headerName);
if(entry != nullptr) {
@ -81,24 +93,22 @@ public:
}
void streamBody(const std::shared_ptr<oatpp::data::stream::OutputStream>& toStream) const {
protocol::http::incoming::BodyDecoder::decode(headers, bodyStream, toStream);
bodyDecoder->decode(headers, bodyStream, toStream);
}
oatpp::String readBodyToString() const {
return protocol::http::incoming::BodyDecoder::decodeToString(headers, bodyStream);
return bodyDecoder->decodeToString(headers, bodyStream);
}
template<class Type>
typename Type::ObjectWrapper readBodyToDto(const std::shared_ptr<oatpp::data::mapping::ObjectMapper>& objectMapper) const {
return objectMapper->readFromString<Type>
(protocol::http::incoming::BodyDecoder::decodeToString(headers, bodyStream));
return objectMapper->readFromString<Type>(bodyDecoder->decodeToString(headers, bodyStream));
}
template<class Type>
void readBodyToDto(oatpp::data::mapping::type::PolymorphicWrapper<Type>& objectWrapper,
const std::shared_ptr<oatpp::data::mapping::ObjectMapper>& objectMapper) const {
objectWrapper = objectMapper->readFromString<Type>
(protocol::http::incoming::BodyDecoder::decodeToString(headers, bodyStream));
objectWrapper = objectMapper->readFromString<Type>(bodyDecoder->decodeToString(headers, bodyStream));
}
// Async
@ -106,20 +116,20 @@ public:
oatpp::async::Action streamBodyAsync(oatpp::async::AbstractCoroutine* parentCoroutine,
const oatpp::async::Action& actionOnReturn,
const std::shared_ptr<oatpp::data::stream::OutputStream>& toStream) const {
return protocol::http::incoming::BodyDecoder::decodeAsync(parentCoroutine, actionOnReturn, headers, bodyStream, toStream);
return bodyDecoder->decodeAsync(parentCoroutine, actionOnReturn, headers, bodyStream, toStream);
}
template<typename ParentCoroutineType>
oatpp::async::Action readBodyToStringAsync(oatpp::async::AbstractCoroutine* parentCoroutine,
oatpp::async::Action (ParentCoroutineType::*callback)(const oatpp::String&)) const {
return protocol::http::incoming::BodyDecoder::decodeToStringAsync(parentCoroutine, callback, headers, bodyStream);
return bodyDecoder->decodeToStringAsync(parentCoroutine, callback, headers, bodyStream);
}
template<class DtoType, typename ParentCoroutineType>
oatpp::async::Action readBodyToDtoAsync(oatpp::async::AbstractCoroutine* parentCoroutine,
oatpp::async::Action (ParentCoroutineType::*callback)(const typename DtoType::ObjectWrapper&),
const std::shared_ptr<oatpp::data::mapping::ObjectMapper>& objectMapper) const {
return protocol::http::incoming::BodyDecoder::decodeToDtoAsync<DtoType>(parentCoroutine, callback, headers, bodyStream, objectMapper);
return bodyDecoder->decodeToDtoAsync<DtoType>(parentCoroutine, callback, headers, bodyStream, objectMapper);
}
};

View File

@ -38,19 +38,22 @@ public:
Response(v_int32 pStatusCode,
const oatpp::String& pStatusDescription,
const std::shared_ptr<http::Protocol::Headers>& pHeaders,
const std::shared_ptr<oatpp::data::stream::InputStream>& pBodyStream)
const std::shared_ptr<oatpp::data::stream::InputStream>& pBodyStream,
const std::shared_ptr<const http::incoming::BodyDecoder>& pBodyDecoder)
: statusCode(pStatusCode)
, statusDescription(pStatusDescription)
, headers(pHeaders)
, bodyStream(pBodyStream)
, bodyDecoder(pBodyDecoder)
{}
public:
static std::shared_ptr<Response> createShared(v_int32 statusCode,
const oatpp::String& statusDescription,
const std::shared_ptr<http::Protocol::Headers>& headers,
const std::shared_ptr<oatpp::data::stream::InputStream>& bodyStream) {
return Shared_Incoming_Response_Pool::allocateShared(statusCode, statusDescription, headers, bodyStream);
const oatpp::String& statusDescription,
const std::shared_ptr<http::Protocol::Headers>& headers,
const std::shared_ptr<oatpp::data::stream::InputStream>& bodyStream,
const std::shared_ptr<const http::incoming::BodyDecoder>& bodyDecoder) {
return Shared_Incoming_Response_Pool::allocateShared(statusCode, statusDescription, headers, bodyStream, bodyDecoder);
}
const v_int32 statusCode;
@ -58,18 +61,24 @@ public:
const std::shared_ptr<http::Protocol::Headers> headers;
const std::shared_ptr<oatpp::data::stream::InputStream> bodyStream;
/**
* Response should be preconfigured with default BodyDecoder.
* Entity that created response object is responsible for providing correct BodyDecoder.
* Custom BodyDecoder can be set on demand
*/
std::shared_ptr<const http::incoming::BodyDecoder> bodyDecoder;
void streamBody(const std::shared_ptr<oatpp::data::stream::OutputStream>& toStream) const {
protocol::http::incoming::BodyDecoder::decode(headers, bodyStream, toStream);
bodyDecoder->decode(headers, bodyStream, toStream);
}
oatpp::String readBodyToString() const {
return protocol::http::incoming::BodyDecoder::decodeToString(headers, bodyStream);
return bodyDecoder->decodeToString(headers, bodyStream);
}
template<class Type>
typename Type::ObjectWrapper readBodyToDto(const std::shared_ptr<oatpp::data::mapping::ObjectMapper>& objectMapper) const {
return objectMapper->readFromString<Type>
(protocol::http::incoming::BodyDecoder::decodeToString(headers, bodyStream));
return bodyDecoder->decodeToDto<Type>(headers, bodyStream, objectMapper);
}
// Async
@ -77,20 +86,20 @@ public:
oatpp::async::Action streamBodyAsync(oatpp::async::AbstractCoroutine* parentCoroutine,
const oatpp::async::Action& actionOnReturn,
const std::shared_ptr<oatpp::data::stream::OutputStream>& toStream) const {
return protocol::http::incoming::BodyDecoder::decodeAsync(parentCoroutine, actionOnReturn, headers, bodyStream, toStream);
return bodyDecoder->decodeAsync(parentCoroutine, actionOnReturn, headers, bodyStream, toStream);
}
template<typename ParentCoroutineType>
oatpp::async::Action readBodyToStringAsync(oatpp::async::AbstractCoroutine* parentCoroutine,
oatpp::async::Action (ParentCoroutineType::*callback)(const oatpp::String&)) const {
return protocol::http::incoming::BodyDecoder::decodeToStringAsync(parentCoroutine, callback, headers, bodyStream);
return bodyDecoder->decodeToStringAsync(parentCoroutine, callback, headers, bodyStream);
}
template<class DtoType, typename ParentCoroutineType>
oatpp::async::Action readBodyToDtoAsync(oatpp::async::AbstractCoroutine* parentCoroutine,
oatpp::async::Action (ParentCoroutineType::*callback)(const typename DtoType::ObjectWrapper&),
const std::shared_ptr<oatpp::data::mapping::ObjectMapper>& objectMapper) const {
return protocol::http::incoming::BodyDecoder::decodeToDtoAsync<DtoType>(parentCoroutine, callback, headers, bodyStream, objectMapper);
return bodyDecoder->decodeToDtoAsync<DtoType>(parentCoroutine, callback, headers, bodyStream, objectMapper);
}
};

View File

@ -0,0 +1,247 @@
/***************************************************************************
*
* Project _____ __ ____ _ _
* ( _ ) /__\ (_ _)_| |_ _| |_
* )(_)( /(__)\ )( (_ _)(_ _)
* (_____)(__)(__)(__) |_| |_|
*
*
* Copyright 2018-present, Leonid Stryzhevskyi, <lganzzzo@gmail.com>
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
***************************************************************************/
#include "SimpleBodyDecoder.hpp"
#include "oatpp/core/data/stream/StreamBufferedProxy.hpp"
#include "oatpp/core/utils/ConversionUtils.hpp"
namespace oatpp { namespace web { namespace protocol { namespace http { namespace incoming {
os::io::Library::v_size SimpleBodyDecoder::readLine(const std::shared_ptr<oatpp::data::stream::InputStream>& fromStream,
p_char8 buffer,
os::io::Library::v_size maxLineSize) {
v_char8 a;
os::io::Library::v_size count = 0;
while (fromStream->read(&a, 1) > 0) {
if(a != '\r') {
if(count + 1 > maxLineSize) {
OATPP_LOGE("BodyDecoder", "Error - too long line");
return 0;
}
buffer[count++] = a;
} else {
fromStream->read(&a, 1);
if(a != '\n'){
OATPP_LOGE("BodyDecoder", "Warning - invalid line breaker");
}
return count; // size of line
}
}
return count;
}
void SimpleBodyDecoder::doChunkedDecoding(const std::shared_ptr<oatpp::data::stream::InputStream>& fromStream,
const std::shared_ptr<oatpp::data::stream::OutputStream>& toStream) {
auto buffer = oatpp::data::buffer::IOBuffer::createShared();
v_int32 maxLineSize = 8; // 0xFFFFFFFF 4Gb for chunk
v_char8 lineBuffer[maxLineSize + 1];
os::io::Library::v_size countToRead;
do {
auto lineSize = readLine(fromStream, lineBuffer, maxLineSize);
if(lineSize == 0 || lineSize >= maxLineSize) {
return; // error reading stream
}
lineBuffer[lineSize] = 0;
countToRead = std::strtol((const char*)lineBuffer, nullptr, 16);
if(countToRead > 0) {
oatpp::data::stream::transfer(fromStream, toStream, countToRead, buffer->getData(), buffer->getSize());
}
fromStream->read(lineBuffer, 2); // just skip "\r\n"
} while (countToRead > 0);
}
void SimpleBodyDecoder::decode(const std::shared_ptr<Protocol::Headers>& headers,
const std::shared_ptr<oatpp::data::stream::InputStream>& bodyStream,
const std::shared_ptr<oatpp::data::stream::OutputStream>& toStream) const {
auto transferEncoding = headers->get(Header::TRANSFER_ENCODING, nullptr);
if(transferEncoding && transferEncoding->equals(Header::Value::TRANSFER_ENCODING_CHUNKED)) {
doChunkedDecoding(bodyStream, toStream);
} else {
oatpp::os::io::Library::v_size contentLength = 0;
auto contentLengthStr = headers->get(Header::CONTENT_LENGTH, nullptr);
if(!contentLengthStr) {
return; // DO NOTHING // it is an empty or invalid body
} else {
bool success;
contentLength = oatpp::utils::conversion::strToInt64(contentLengthStr, success);
if(!success){
return; // it is an invalid request/response
}
auto buffer = oatpp::data::buffer::IOBuffer::createShared();
oatpp::data::stream::transfer(bodyStream, toStream, contentLength, buffer->getData(), buffer->getSize());
}
}
}
oatpp::async::Action SimpleBodyDecoder::doChunkedDecodingAsync(oatpp::async::AbstractCoroutine* parentCoroutine,
const oatpp::async::Action& actionOnReturn,
const std::shared_ptr<oatpp::data::stream::InputStream>& fromStream,
const std::shared_ptr<oatpp::data::stream::OutputStream>& toStream) {
class ChunkedDecoder : public oatpp::async::Coroutine<ChunkedDecoder> {
private:
const v_int32 MAX_LINE_SIZE = 8;
private:
std::shared_ptr<oatpp::data::stream::InputStream> m_fromStream;
std::shared_ptr<oatpp::data::stream::OutputStream> m_toStream;
std::shared_ptr<oatpp::data::buffer::IOBuffer> m_buffer = oatpp::data::buffer::IOBuffer::createShared();
v_int32 m_currLineLength;
v_char8 m_lineChar;
bool m_lineEnding;
v_char8 m_lineBuffer [16]; // used max 8
void* m_skipData;
os::io::Library::v_size m_skipSize;
bool m_done = false;
private:
void prepareSkipRN() {
m_skipData = &m_lineBuffer[0];
m_skipSize = 2;
m_currLineLength = 0;
m_lineEnding = false;
}
public:
ChunkedDecoder(const std::shared_ptr<oatpp::data::stream::InputStream>& fromStream,
const std::shared_ptr<oatpp::data::stream::OutputStream>& toStream)
: m_fromStream(fromStream)
, m_toStream(toStream)
{}
Action act() override {
m_currLineLength = 0;
m_lineEnding = false;
return yieldTo(&ChunkedDecoder::readLineChar);
}
Action readLineChar() {
auto res = m_fromStream->read(&m_lineChar, 1);
if(res == oatpp::data::stream::Errors::ERROR_IO_WAIT_RETRY) {
return oatpp::async::Action::_WAIT_RETRY;
} else if(res == oatpp::data::stream::Errors::ERROR_IO_RETRY) {
return oatpp::async::Action::_REPEAT;
} else if( res < 0) {
return error("[BodyDecoder::ChunkedDecoder] Can't read line char");
}
return yieldTo(&ChunkedDecoder::onLineCharRead);
}
Action onLineCharRead() {
if(!m_lineEnding) {
if(m_lineChar != '\r') {
if(m_currLineLength + 1 > MAX_LINE_SIZE){
return error("[BodyDecoder::ChunkedDecoder] too long line");
}
m_lineBuffer[m_currLineLength ++] = m_lineChar;
return yieldTo(&ChunkedDecoder::readLineChar);
} else {
m_lineEnding = true;
return yieldTo(&ChunkedDecoder::readLineChar);
}
} else {
if(m_lineChar != '\n') {
OATPP_LOGD("[BodyDecoder::ChunkedDecoder]", "Warning - invalid line breaker")
}
}
if(m_currLineLength == 0) {
return error("Error reading stream. 0-length line");
}
m_lineBuffer[m_currLineLength] = 0;
return yieldTo(&ChunkedDecoder::onLineRead);
}
Action onLineRead() {
os::io::Library::v_size countToRead = std::strtol((const char*) m_lineBuffer, nullptr, 16);
if(countToRead > 0) {
prepareSkipRN();
return oatpp::data::stream::transferAsync(this, yieldTo(&ChunkedDecoder::skipRN), m_fromStream, m_toStream, countToRead, m_buffer);
}
m_done = true;
prepareSkipRN();
return yieldTo(&ChunkedDecoder::skipRN);
}
Action skipRN() {
if(m_done) {
return oatpp::data::stream::readExactSizeDataAsyncInline(m_fromStream.get(),
m_skipData,
m_skipSize,
finish());
} else {
return oatpp::data::stream::readExactSizeDataAsyncInline(m_fromStream.get(),
m_skipData,
m_skipSize,
yieldTo(&ChunkedDecoder::readLineChar));
}
}
};
return parentCoroutine->startCoroutine<ChunkedDecoder>(actionOnReturn, fromStream, toStream);
}
oatpp::async::Action SimpleBodyDecoder::decodeAsync(oatpp::async::AbstractCoroutine* parentCoroutine,
const oatpp::async::Action& actionOnReturn,
const std::shared_ptr<Protocol::Headers>& headers,
const std::shared_ptr<oatpp::data::stream::InputStream>& bodyStream,
const std::shared_ptr<oatpp::data::stream::OutputStream>& toStream) const {
auto transferEncoding = headers->get(Header::TRANSFER_ENCODING, nullptr);
if(transferEncoding && transferEncoding->equals(Header::Value::TRANSFER_ENCODING_CHUNKED)) {
return doChunkedDecodingAsync(parentCoroutine, actionOnReturn, bodyStream, toStream);
} else {
oatpp::os::io::Library::v_size contentLength = 0;
auto contentLengthStr = headers->get(Header::CONTENT_LENGTH, nullptr);
if(!contentLengthStr) {
return actionOnReturn; // DO NOTHING // it is an empty or invalid body
} else {
bool success;
contentLength = oatpp::utils::conversion::strToInt64(contentLengthStr, success);
if(!success){
return oatpp::async::Action(oatpp::async::Error("Invalid 'Content-Length' Header"));
}
return oatpp::data::stream::transferAsync(parentCoroutine,
actionOnReturn,
bodyStream,
toStream,
contentLength,
oatpp::data::buffer::IOBuffer::createShared());
}
}
}
}}}}}

View File

@ -0,0 +1,61 @@
/***************************************************************************
*
* Project _____ __ ____ _ _
* ( _ ) /__\ (_ _)_| |_ _| |_
* )(_)( /(__)\ )( (_ _)(_ _)
* (_____)(__)(__)(__) |_| |_|
*
*
* Copyright 2018-present, Leonid Stryzhevskyi, <lganzzzo@gmail.com>
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
***************************************************************************/
#ifndef oatpp_web_protocol_http_incoming_SimpleBodyDecoder_hpp
#define oatpp_web_protocol_http_incoming_SimpleBodyDecoder_hpp
#include "BodyDecoder.hpp"
namespace oatpp { namespace web { namespace protocol { namespace http { namespace incoming {
class SimpleBodyDecoder : public BodyDecoder {
private:
static os::io::Library::v_size readLine(const std::shared_ptr<oatpp::data::stream::InputStream>& fromStream,
p_char8 buffer,
os::io::Library::v_size maxLineSize);
static void doChunkedDecoding(const std::shared_ptr<oatpp::data::stream::InputStream>& from,
const std::shared_ptr<oatpp::data::stream::OutputStream>& toStream);
static oatpp::async::Action doChunkedDecodingAsync(oatpp::async::AbstractCoroutine* parentCoroutine,
const oatpp::async::Action& actionOnReturn,
const std::shared_ptr<oatpp::data::stream::InputStream>& fromStream,
const std::shared_ptr<oatpp::data::stream::OutputStream>& toStream);
public:
void decode(const std::shared_ptr<Protocol::Headers>& headers,
const std::shared_ptr<oatpp::data::stream::InputStream>& bodyStream,
const std::shared_ptr<oatpp::data::stream::OutputStream>& toStream) const override;
oatpp::async::Action decodeAsync(oatpp::async::AbstractCoroutine* parentCoroutine,
const oatpp::async::Action& actionOnReturn,
const std::shared_ptr<Protocol::Headers>& headers,
const std::shared_ptr<oatpp::data::stream::InputStream>& bodyStream,
const std::shared_ptr<oatpp::data::stream::OutputStream>& toStream) const override;
};
}}}}}
#endif /* oatpp_web_protocol_http_incoming_SimpleBodyDecoder_hpp */

View File

@ -45,6 +45,7 @@ void AsyncHttpConnectionHandler::Task::consumeConnections(oatpp::async::Processo
while (curr != nullptr) {
auto coroutine = HttpProcessor::Coroutine::getBench().obtain(m_router,
m_bodyDecoder,
m_errorHandler,
m_requestInterceptors,
curr->getData()->connection,

View File

@ -31,6 +31,8 @@
#include "./HttpRouter.hpp"
#include "oatpp/web/protocol/http/incoming/SimpleBodyDecoder.hpp"
#include "oatpp/web/protocol/http/incoming/Request.hpp"
#include "oatpp/web/protocol/http/outgoing/Response.hpp"
@ -62,25 +64,29 @@ private:
Connections m_connections;
private:
HttpRouter* m_router;
std::shared_ptr<const oatpp::web::protocol::http::incoming::BodyDecoder> m_bodyDecoder;
std::shared_ptr<handler::ErrorHandler> m_errorHandler;
HttpProcessor::RequestInterceptors* m_requestInterceptors;
std::mutex m_taskMutex;
std::condition_variable m_taskCondition;
public:
Task(HttpRouter* router,
const std::shared_ptr<const oatpp::web::protocol::http::incoming::BodyDecoder>& bodyDecoder,
const std::shared_ptr<handler::ErrorHandler>& errorHandler,
HttpProcessor::RequestInterceptors* requestInterceptors)
: m_atom(false)
, m_router(router)
, m_bodyDecoder(bodyDecoder)
, m_errorHandler(errorHandler)
, m_requestInterceptors(requestInterceptors)
{}
public:
static std::shared_ptr<Task> createShared(HttpRouter* router,
const std::shared_ptr<const oatpp::web::protocol::http::incoming::BodyDecoder>& bodyDecoder,
const std::shared_ptr<handler::ErrorHandler>& errorHandler,
HttpProcessor::RequestInterceptors* requestInterceptors){
return std::make_shared<Task>(router, errorHandler, requestInterceptors);
return std::make_shared<Task>(router, bodyDecoder, errorHandler, requestInterceptors);
}
void run() override;
@ -108,9 +114,14 @@ public:
, m_taskBalancer(0)
, m_threadCount(threadCount)
{
// TODO make bodyDecoder configurable here
std::shared_ptr<const oatpp::web::protocol::http::incoming::BodyDecoder> bodyDecoder =
std::make_shared<oatpp::web::protocol::http::incoming::SimpleBodyDecoder>();
m_tasks = new std::shared_ptr<Task>[m_threadCount];
for(v_int32 i = 0; i < m_threadCount; i++) {
auto task = Task::createShared(m_router.get(), m_errorHandler, &m_requestInterceptors);
auto task = Task::createShared(m_router.get(), bodyDecoder, m_errorHandler, &m_requestInterceptors);
m_tasks[i] = task;
concurrency::Thread thread(task);
thread.detach();

View File

@ -42,7 +42,7 @@ void HttpConnectionHandler::Task::run(){
bool keepAlive = true;
do {
auto response = HttpProcessor::processRequest(m_router, m_connection, m_errorHandler, m_requestInterceptors, buffer, bufferSize, inStream, keepAlive);
auto response = HttpProcessor::processRequest(m_router, m_connection, m_bodyDecoder, m_errorHandler, m_requestInterceptors, buffer, bufferSize, inStream, keepAlive);
if(response) {
outStream->setBufferPosition(0, 0);
@ -59,7 +59,7 @@ void HttpConnectionHandler::Task::run(){
void HttpConnectionHandler::handleConnection(const std::shared_ptr<oatpp::data::stream::IOStream>& connection){
/* Create working thread */
concurrency::Thread thread(Task::createShared(m_router.get(), connection, m_errorHandler, &m_requestInterceptors));
concurrency::Thread thread(Task::createShared(m_router.get(), connection, m_bodyDecoder, m_errorHandler, &m_requestInterceptors));
/* Get hardware concurrency -1 in order to have 1cpu free of workers. */
v_int32 concurrency = oatpp::concurrency::Thread::getHardwareConcurrency();

View File

@ -29,6 +29,8 @@
#include "./handler/ErrorHandler.hpp"
#include "./HttpRouter.hpp"
#include "oatpp/web/protocol/http/incoming/SimpleBodyDecoder.hpp"
#include "oatpp/web/protocol/http/incoming/Request.hpp"
#include "oatpp/web/protocol/http/outgoing/Response.hpp"
@ -50,15 +52,18 @@ private:
private:
HttpRouter* m_router;
std::shared_ptr<oatpp::data::stream::IOStream> m_connection;
std::shared_ptr<const oatpp::web::protocol::http::incoming::BodyDecoder> m_bodyDecoder;
std::shared_ptr<handler::ErrorHandler> m_errorHandler;
HttpProcessor::RequestInterceptors* m_requestInterceptors;
public:
Task(HttpRouter* router,
const std::shared_ptr<oatpp::data::stream::IOStream>& connection,
const std::shared_ptr<const oatpp::web::protocol::http::incoming::BodyDecoder>& bodyDecoder,
const std::shared_ptr<handler::ErrorHandler>& errorHandler,
HttpProcessor::RequestInterceptors* requestInterceptors)
: m_router(router)
, m_connection(connection)
, m_bodyDecoder(bodyDecoder)
, m_errorHandler(errorHandler)
, m_requestInterceptors(requestInterceptors)
{}
@ -66,9 +71,10 @@ private:
static std::shared_ptr<Task> createShared(HttpRouter* router,
const std::shared_ptr<oatpp::data::stream::IOStream>& connection,
const std::shared_ptr<const oatpp::web::protocol::http::incoming::BodyDecoder>& bodyDecoder,
const std::shared_ptr<handler::ErrorHandler>& errorHandler,
HttpProcessor::RequestInterceptors* requestInterceptors) {
return std::make_shared<Task>(router, connection, errorHandler, requestInterceptors);
return std::make_shared<Task>(router, connection, bodyDecoder, errorHandler, requestInterceptors);
}
void run() override;
@ -77,11 +83,13 @@ private:
private:
std::shared_ptr<HttpRouter> m_router;
std::shared_ptr<const oatpp::web::protocol::http::incoming::BodyDecoder> m_bodyDecoder;
std::shared_ptr<handler::ErrorHandler> m_errorHandler;
HttpProcessor::RequestInterceptors m_requestInterceptors;
public:
HttpConnectionHandler(const std::shared_ptr<HttpRouter>& router)
: m_router(router)
, m_bodyDecoder(std::make_shared<oatpp::web::protocol::http::incoming::SimpleBodyDecoder>())
, m_errorHandler(handler::DefaultErrorHandler::createShared())
{}
public:

View File

@ -34,6 +34,7 @@ const char* HttpProcessor::RETURN_KEEP_ALIVE = "RETURN_KEEP_ALIVE";
std::shared_ptr<protocol::http::outgoing::Response>
HttpProcessor::processRequest(HttpRouter* router,
const std::shared_ptr<oatpp::data::stream::IOStream>& connection,
const std::shared_ptr<const oatpp::web::protocol::http::incoming::BodyDecoder>& bodyDecoder,
const std::shared_ptr<handler::ErrorHandler>& errorHandler,
RequestInterceptors* requestInterceptors,
void* buffer,
@ -66,7 +67,7 @@ HttpProcessor::processRequest(HttpRouter* router,
auto bodyStream = inStream;
bodyStream->setBufferPosition(caret.getPosition(), (v_int32) readCount);
auto request = protocol::http::incoming::Request::createShared(line, route.matchMap, headers, bodyStream);
auto request = protocol::http::incoming::Request::createShared(line, route.matchMap, headers, bodyStream, bodyDecoder);
std::shared_ptr<protocol::http::outgoing::Response> response;
try{
auto currInterceptor = requestInterceptors->getFirstNode();
@ -136,7 +137,7 @@ HttpProcessor::Coroutine::Action HttpProcessor::Coroutine::parseRequest(v_int32
auto bodyStream = m_inStream;
bodyStream->setBufferPosition(caret.getPosition(), readCount);
m_currentRequest = protocol::http::incoming::Request::createShared(line, m_currentRoute.matchMap, headers, bodyStream);
m_currentRequest = protocol::http::incoming::Request::createShared(line, m_currentRoute.matchMap, headers, bodyStream, m_bodyDecoder);
auto currInterceptor = m_requestInterceptors->getFirstNode();
while (currInterceptor != nullptr) {

View File

@ -68,6 +68,7 @@ public:
Action parseRequest(v_int32 readCount);
private:
HttpRouter* m_router;
std::shared_ptr<const oatpp::web::protocol::http::incoming::BodyDecoder> m_bodyDecoder;
std::shared_ptr<handler::ErrorHandler> m_errorHandler;
RequestInterceptors* m_requestInterceptors;
std::shared_ptr<oatpp::data::stream::IOStream> m_connection;
@ -82,6 +83,7 @@ public:
public:
Coroutine(HttpRouter* router,
const std::shared_ptr<const oatpp::web::protocol::http::incoming::BodyDecoder>& bodyDecoder,
const std::shared_ptr<handler::ErrorHandler>& errorHandler,
RequestInterceptors* requestInterceptors,
const std::shared_ptr<oatpp::data::stream::IOStream>& connection,
@ -89,6 +91,7 @@ public:
const std::shared_ptr<oatpp::data::stream::OutputStreamBufferedProxy>& outStream,
const std::shared_ptr<oatpp::data::stream::InputStreamBufferedProxy>& inStream)
: m_router(router)
, m_bodyDecoder(bodyDecoder)
, m_errorHandler(errorHandler)
, m_requestInterceptors(requestInterceptors)
, m_connection(connection)
@ -114,6 +117,7 @@ public:
static std::shared_ptr<protocol::http::outgoing::Response>
processRequest(HttpRouter* router,
const std::shared_ptr<oatpp::data::stream::IOStream>& connection,
const std::shared_ptr<const oatpp::web::protocol::http::incoming::BodyDecoder>& bodyDecoder,
const std::shared_ptr<handler::ErrorHandler>& errorHandler,
RequestInterceptors* requestInterceptors,
void* buffer,