From 153b9987c966492b28b67d9c7ec7eede2afa62e4 Mon Sep 17 00:00:00 2001 From: lganzzzo Date: Thu, 8 Nov 2018 12:13:46 +0200 Subject: [PATCH] Better oatpp::data::stream::transfer() --- core/data/stream/Stream.cpp | 79 ++++++++++++------- core/data/stream/Stream.hpp | 13 +-- .../http/incoming/SimpleBodyDecoder.cpp | 4 +- web/server/AsyncHttpConnectionHandler.cpp | 1 + web/server/AsyncHttpConnectionHandler.hpp | 15 +++- web/server/HttpConnectionHandler.cpp | 4 +- web/server/HttpConnectionHandler.hpp | 10 ++- 7 files changed, 87 insertions(+), 39 deletions(-) diff --git a/core/data/stream/Stream.cpp b/core/data/stream/Stream.cpp index 72170f8c..648835b5 100644 --- a/core/data/stream/Stream.cpp +++ b/core/data/stream/Stream.cpp @@ -117,22 +117,36 @@ const std::shared_ptr& operator << (const std::shared_ptr& fromStream, - const std::shared_ptr& toStream, - oatpp::os::io::Library::v_size transferSize, - void* buffer, - oatpp::os::io::Library::v_size bufferSize) { +oatpp::os::io::Library::v_size transfer(const std::shared_ptr& fromStream, + const std::shared_ptr& toStream, + oatpp::os::io::Library::v_size transferSize, + void* buffer, + oatpp::os::io::Library::v_size bufferSize) { - while (transferSize > 0) { - oatpp::os::io::Library::v_size desiredReadCount = transferSize; - if(desiredReadCount > bufferSize){ + oatpp::os::io::Library::v_size progress = 0; + + while (transferSize == 0 || progress < transferSize) { + oatpp::os::io::Library::v_size desiredReadCount = transferSize - progress; + if(transferSize == 0 || desiredReadCount > bufferSize){ desiredReadCount = bufferSize; } - auto readCount = fromStream->read(buffer, desiredReadCount); - toStream->write(buffer, readCount); - transferSize -= readCount; + auto readResult = fromStream->read(buffer, desiredReadCount); + if(readResult > 0) { + auto writeReasul = writeExactSizeData(toStream.get(), buffer, readResult); + if(writeReasul != readResult) { + throw new std::runtime_error("[oatpp::data::stream::transfer()]: Unknown Error. Can't continue transfer."); + } + progress += readResult; + } else { + if(readResult == oatpp::data::stream::Errors::ERROR_IO_RETRY || readResult == oatpp::data::stream::Errors::ERROR_IO_WAIT_RETRY) { + continue; + } + return progress; + } } + return progress; + } oatpp::async::Action transferAsync(oatpp::async::AbstractCoroutine* parentCoroutine, @@ -147,6 +161,7 @@ oatpp::async::Action transferAsync(oatpp::async::AbstractCoroutine* parentCorout std::shared_ptr m_fromStream; std::shared_ptr m_toStream; oatpp::os::io::Library::v_size m_transferSize; + oatpp::os::io::Library::v_size m_progress; std::shared_ptr m_buffer; oatpp::os::io::Library::v_size m_desiredReadCount; @@ -163,19 +178,22 @@ oatpp::async::Action transferAsync(oatpp::async::AbstractCoroutine* parentCorout : m_fromStream(fromStream) , m_toStream(toStream) , m_transferSize(transferSize) + , m_progress(0) , m_buffer(buffer) {} Action act() override { - if(m_transferSize == 0) { + if(m_progress == m_transferSize) { return finish(); + } else if(m_progress > m_transferSize) { + throw std::runtime_error("[oatpp::data::stream::transferAsync{TransferCoroutine::act()}]: Invalid state. m_progress > m_transferSize"); } else if(m_transferSize < 0) { - throw std::runtime_error("Invalid stream::TransferCoroutine state"); + throw std::runtime_error("[oatpp::data::stream::transferAsync{TransferCoroutine::act()}]: Invalid state. m_transferSize < 0"); } - m_desiredReadCount = m_transferSize; - if(m_desiredReadCount > m_buffer->getSize()){ + m_desiredReadCount = m_transferSize - m_progress; + if(m_transferSize == 0 || m_desiredReadCount > m_buffer->getSize()){ m_desiredReadCount = m_buffer->getSize(); } @@ -188,22 +206,22 @@ oatpp::async::Action transferAsync(oatpp::async::AbstractCoroutine* parentCorout Action doRead() { return oatpp::data::stream::readSomeDataAsyncInline(m_fromStream.get(), - m_readBufferPtr, - m_bytesLeft, - yieldTo(&TransferCoroutine::prepareWrite)); + m_readBufferPtr, + m_bytesLeft, + yieldTo(&TransferCoroutine::prepareWrite)); } Action prepareWrite() { m_bytesLeft = m_desiredReadCount - m_bytesLeft; - m_transferSize -= m_bytesLeft; + m_progress += m_bytesLeft; return yieldTo(&TransferCoroutine::doWrite); } Action doWrite() { return oatpp::data::stream::writeDataAsyncInline(m_toStream.get(), - m_writeBufferPtr, - m_bytesLeft, - yieldTo(&TransferCoroutine::act)); + m_writeBufferPtr, + m_bytesLeft, + yieldTo(&TransferCoroutine::act)); } }; @@ -276,18 +294,25 @@ oatpp::async::Action readExactSizeDataAsyncInline(oatpp::data::stream::InputStre } oatpp::os::io::Library::v_size writeExactSizeData(oatpp::data::stream::OutputStream* stream, const void* data, os::io::Library::v_size size) { + const char* buffer = (char*)data; oatpp::os::io::Library::v_size progress = 0; + while (progress < size) { + auto res = stream->write(&buffer[progress], size - progress); - if(res <= 0) { // if res == 0 then probably stream handles write() error incorrectly. return. - if(res == oatpp::data::stream::Errors::ERROR_IO_PIPE || - (res != oatpp::data::stream::Errors::ERROR_IO_RETRY && res != oatpp::data::stream::Errors::ERROR_IO_WAIT_RETRY)) { - return progress; + + if(res > 0) { + progress += res; + } else { // if res == 0 then probably stream handles write() error incorrectly. return. + if(res == oatpp::data::stream::Errors::ERROR_IO_RETRY || res == oatpp::data::stream::Errors::ERROR_IO_WAIT_RETRY) { + continue; } + return progress; } - progress += res; + } + return progress; } diff --git a/core/data/stream/Stream.hpp b/core/data/stream/Stream.hpp index 9fe3fe7d..90ee694e 100644 --- a/core/data/stream/Stream.hpp +++ b/core/data/stream/Stream.hpp @@ -138,12 +138,15 @@ const std::shared_ptr& operator << (const std::shared_ptr& fromStream, - const std::shared_ptr& toStream, - oatpp::os::io::Library::v_size transferSize, - void* buffer, - oatpp::os::io::Library::v_size bufferSize); +oatpp::os::io::Library::v_size transfer(const std::shared_ptr& fromStream, + const std::shared_ptr& toStream, + oatpp::os::io::Library::v_size transferSize, + void* buffer, + oatpp::os::io::Library::v_size bufferSize); + /** * Same as transfer but asynchronous diff --git a/web/protocol/http/incoming/SimpleBodyDecoder.cpp b/web/protocol/http/incoming/SimpleBodyDecoder.cpp index b787be31..a83649ba 100644 --- a/web/protocol/http/incoming/SimpleBodyDecoder.cpp +++ b/web/protocol/http/incoming/SimpleBodyDecoder.cpp @@ -85,7 +85,7 @@ void SimpleBodyDecoder::doChunkedDecoding(const std::shared_ptr& headers, const std::shared_ptr& bodyStream, - const std::shared_ptr& toStream) { + const std::shared_ptr& toStream) const { auto transferEncoding = headers->get(Header::TRANSFER_ENCODING, nullptr); if(transferEncoding && transferEncoding->equals(Header::Value::TRANSFER_ENCODING_CHUNKED)) { @@ -111,7 +111,7 @@ void SimpleBodyDecoder::decode(const std::shared_ptr& headers oatpp::async::Action SimpleBodyDecoder::doChunkedDecodingAsync(oatpp::async::AbstractCoroutine* parentCoroutine, const oatpp::async::Action& actionOnReturn, const std::shared_ptr& fromStream, - const std::shared_ptr& toStream) const { + const std::shared_ptr& toStream) { class ChunkedDecoder : public oatpp::async::Coroutine { private: diff --git a/web/server/AsyncHttpConnectionHandler.cpp b/web/server/AsyncHttpConnectionHandler.cpp index 5b313a1a..ee21b8d6 100644 --- a/web/server/AsyncHttpConnectionHandler.cpp +++ b/web/server/AsyncHttpConnectionHandler.cpp @@ -45,6 +45,7 @@ void AsyncHttpConnectionHandler::Task::consumeConnections(oatpp::async::Processo while (curr != nullptr) { auto coroutine = HttpProcessor::Coroutine::getBench().obtain(m_router, + m_bodyDecoder, m_errorHandler, m_requestInterceptors, curr->getData()->connection, diff --git a/web/server/AsyncHttpConnectionHandler.hpp b/web/server/AsyncHttpConnectionHandler.hpp index bfba4807..161f0fd0 100644 --- a/web/server/AsyncHttpConnectionHandler.hpp +++ b/web/server/AsyncHttpConnectionHandler.hpp @@ -31,6 +31,8 @@ #include "./HttpRouter.hpp" +#include "oatpp/web/protocol/http/incoming/SimpleBodyDecoder.hpp" + #include "oatpp/web/protocol/http/incoming/Request.hpp" #include "oatpp/web/protocol/http/outgoing/Response.hpp" @@ -62,25 +64,29 @@ private: Connections m_connections; private: HttpRouter* m_router; + std::shared_ptr m_bodyDecoder; std::shared_ptr m_errorHandler; HttpProcessor::RequestInterceptors* m_requestInterceptors; std::mutex m_taskMutex; std::condition_variable m_taskCondition; public: Task(HttpRouter* router, + const std::shared_ptr& bodyDecoder, const std::shared_ptr& errorHandler, HttpProcessor::RequestInterceptors* requestInterceptors) : m_atom(false) , m_router(router) + , m_bodyDecoder(bodyDecoder) , m_errorHandler(errorHandler) , m_requestInterceptors(requestInterceptors) {} public: static std::shared_ptr createShared(HttpRouter* router, + const std::shared_ptr& bodyDecoder, const std::shared_ptr& errorHandler, HttpProcessor::RequestInterceptors* requestInterceptors){ - return std::make_shared(router, errorHandler, requestInterceptors); + return std::make_shared(router, bodyDecoder, errorHandler, requestInterceptors); } void run() override; @@ -108,9 +114,14 @@ public: , m_taskBalancer(0) , m_threadCount(threadCount) { + + // TODO make bodyDecoder configurable here + std::shared_ptr bodyDecoder = + std::make_shared(); + m_tasks = new std::shared_ptr[m_threadCount]; for(v_int32 i = 0; i < m_threadCount; i++) { - auto task = Task::createShared(m_router.get(), m_errorHandler, &m_requestInterceptors); + auto task = Task::createShared(m_router.get(), bodyDecoder, m_errorHandler, &m_requestInterceptors); m_tasks[i] = task; concurrency::Thread thread(task); thread.detach(); diff --git a/web/server/HttpConnectionHandler.cpp b/web/server/HttpConnectionHandler.cpp index e0e1fe65..b661d01b 100644 --- a/web/server/HttpConnectionHandler.cpp +++ b/web/server/HttpConnectionHandler.cpp @@ -42,7 +42,7 @@ void HttpConnectionHandler::Task::run(){ bool keepAlive = true; do { - auto response = HttpProcessor::processRequest(m_router, m_connection, m_errorHandler, m_requestInterceptors, buffer, bufferSize, inStream, keepAlive); + auto response = HttpProcessor::processRequest(m_router, m_connection, m_bodyDecoder, m_errorHandler, m_requestInterceptors, buffer, bufferSize, inStream, keepAlive); if(response) { outStream->setBufferPosition(0, 0); @@ -59,7 +59,7 @@ void HttpConnectionHandler::Task::run(){ void HttpConnectionHandler::handleConnection(const std::shared_ptr& connection){ /* Create working thread */ - concurrency::Thread thread(Task::createShared(m_router.get(), connection, m_errorHandler, &m_requestInterceptors)); + concurrency::Thread thread(Task::createShared(m_router.get(), connection, m_bodyDecoder, m_errorHandler, &m_requestInterceptors)); /* Get hardware concurrency -1 in order to have 1cpu free of workers. */ v_int32 concurrency = oatpp::concurrency::Thread::getHardwareConcurrency(); diff --git a/web/server/HttpConnectionHandler.hpp b/web/server/HttpConnectionHandler.hpp index 85773bbf..bc2a117d 100644 --- a/web/server/HttpConnectionHandler.hpp +++ b/web/server/HttpConnectionHandler.hpp @@ -29,6 +29,8 @@ #include "./handler/ErrorHandler.hpp" #include "./HttpRouter.hpp" +#include "oatpp/web/protocol/http/incoming/SimpleBodyDecoder.hpp" + #include "oatpp/web/protocol/http/incoming/Request.hpp" #include "oatpp/web/protocol/http/outgoing/Response.hpp" @@ -50,15 +52,18 @@ private: private: HttpRouter* m_router; std::shared_ptr m_connection; + std::shared_ptr m_bodyDecoder; std::shared_ptr m_errorHandler; HttpProcessor::RequestInterceptors* m_requestInterceptors; public: Task(HttpRouter* router, const std::shared_ptr& connection, + const std::shared_ptr& bodyDecoder, const std::shared_ptr& errorHandler, HttpProcessor::RequestInterceptors* requestInterceptors) : m_router(router) , m_connection(connection) + , m_bodyDecoder(bodyDecoder) , m_errorHandler(errorHandler) , m_requestInterceptors(requestInterceptors) {} @@ -66,9 +71,10 @@ private: static std::shared_ptr createShared(HttpRouter* router, const std::shared_ptr& connection, + const std::shared_ptr& bodyDecoder, const std::shared_ptr& errorHandler, HttpProcessor::RequestInterceptors* requestInterceptors) { - return std::make_shared(router, connection, errorHandler, requestInterceptors); + return std::make_shared(router, connection, bodyDecoder, errorHandler, requestInterceptors); } void run() override; @@ -77,11 +83,13 @@ private: private: std::shared_ptr m_router; + std::shared_ptr m_bodyDecoder; std::shared_ptr m_errorHandler; HttpProcessor::RequestInterceptors m_requestInterceptors; public: HttpConnectionHandler(const std::shared_ptr& router) : m_router(router) + , m_bodyDecoder(std::make_shared()) , m_errorHandler(handler::DefaultErrorHandler::createShared()) {} public: