Better oatpp::data::stream::transfer()

This commit is contained in:
lganzzzo 2018-11-08 12:13:46 +02:00
parent 1d7180ba02
commit 153b9987c9
7 changed files with 87 additions and 39 deletions

View File

@ -117,22 +117,36 @@ const std::shared_ptr<OutputStream>& operator << (const std::shared_ptr<OutputSt
return s; return s;
} }
void transfer(const std::shared_ptr<InputStream>& fromStream, oatpp::os::io::Library::v_size transfer(const std::shared_ptr<InputStream>& fromStream,
const std::shared_ptr<OutputStream>& toStream, const std::shared_ptr<OutputStream>& toStream,
oatpp::os::io::Library::v_size transferSize, oatpp::os::io::Library::v_size transferSize,
void* buffer, void* buffer,
oatpp::os::io::Library::v_size bufferSize) { oatpp::os::io::Library::v_size bufferSize) {
while (transferSize > 0) { oatpp::os::io::Library::v_size progress = 0;
oatpp::os::io::Library::v_size desiredReadCount = transferSize;
if(desiredReadCount > bufferSize){ while (transferSize == 0 || progress < transferSize) {
oatpp::os::io::Library::v_size desiredReadCount = transferSize - progress;
if(transferSize == 0 || desiredReadCount > bufferSize){
desiredReadCount = bufferSize; desiredReadCount = bufferSize;
} }
auto readCount = fromStream->read(buffer, desiredReadCount); auto readResult = fromStream->read(buffer, desiredReadCount);
toStream->write(buffer, readCount); if(readResult > 0) {
transferSize -= readCount; auto writeReasul = writeExactSizeData(toStream.get(), buffer, readResult);
if(writeReasul != readResult) {
throw new std::runtime_error("[oatpp::data::stream::transfer()]: Unknown Error. Can't continue transfer.");
}
progress += readResult;
} else {
if(readResult == oatpp::data::stream::Errors::ERROR_IO_RETRY || readResult == oatpp::data::stream::Errors::ERROR_IO_WAIT_RETRY) {
continue;
}
return progress;
}
} }
return progress;
} }
oatpp::async::Action transferAsync(oatpp::async::AbstractCoroutine* parentCoroutine, oatpp::async::Action transferAsync(oatpp::async::AbstractCoroutine* parentCoroutine,
@ -147,6 +161,7 @@ oatpp::async::Action transferAsync(oatpp::async::AbstractCoroutine* parentCorout
std::shared_ptr<InputStream> m_fromStream; std::shared_ptr<InputStream> m_fromStream;
std::shared_ptr<OutputStream> m_toStream; std::shared_ptr<OutputStream> m_toStream;
oatpp::os::io::Library::v_size m_transferSize; oatpp::os::io::Library::v_size m_transferSize;
oatpp::os::io::Library::v_size m_progress;
std::shared_ptr<oatpp::data::buffer::IOBuffer> m_buffer; std::shared_ptr<oatpp::data::buffer::IOBuffer> m_buffer;
oatpp::os::io::Library::v_size m_desiredReadCount; oatpp::os::io::Library::v_size m_desiredReadCount;
@ -163,19 +178,22 @@ oatpp::async::Action transferAsync(oatpp::async::AbstractCoroutine* parentCorout
: m_fromStream(fromStream) : m_fromStream(fromStream)
, m_toStream(toStream) , m_toStream(toStream)
, m_transferSize(transferSize) , m_transferSize(transferSize)
, m_progress(0)
, m_buffer(buffer) , m_buffer(buffer)
{} {}
Action act() override { Action act() override {
if(m_transferSize == 0) { if(m_progress == m_transferSize) {
return finish(); return finish();
} else if(m_progress > m_transferSize) {
throw std::runtime_error("[oatpp::data::stream::transferAsync{TransferCoroutine::act()}]: Invalid state. m_progress > m_transferSize");
} else if(m_transferSize < 0) { } else if(m_transferSize < 0) {
throw std::runtime_error("Invalid stream::TransferCoroutine state"); throw std::runtime_error("[oatpp::data::stream::transferAsync{TransferCoroutine::act()}]: Invalid state. m_transferSize < 0");
} }
m_desiredReadCount = m_transferSize; m_desiredReadCount = m_transferSize - m_progress;
if(m_desiredReadCount > m_buffer->getSize()){ if(m_transferSize == 0 || m_desiredReadCount > m_buffer->getSize()){
m_desiredReadCount = m_buffer->getSize(); m_desiredReadCount = m_buffer->getSize();
} }
@ -188,22 +206,22 @@ oatpp::async::Action transferAsync(oatpp::async::AbstractCoroutine* parentCorout
Action doRead() { Action doRead() {
return oatpp::data::stream::readSomeDataAsyncInline(m_fromStream.get(), return oatpp::data::stream::readSomeDataAsyncInline(m_fromStream.get(),
m_readBufferPtr, m_readBufferPtr,
m_bytesLeft, m_bytesLeft,
yieldTo(&TransferCoroutine::prepareWrite)); yieldTo(&TransferCoroutine::prepareWrite));
} }
Action prepareWrite() { Action prepareWrite() {
m_bytesLeft = m_desiredReadCount - m_bytesLeft; m_bytesLeft = m_desiredReadCount - m_bytesLeft;
m_transferSize -= m_bytesLeft; m_progress += m_bytesLeft;
return yieldTo(&TransferCoroutine::doWrite); return yieldTo(&TransferCoroutine::doWrite);
} }
Action doWrite() { Action doWrite() {
return oatpp::data::stream::writeDataAsyncInline(m_toStream.get(), return oatpp::data::stream::writeDataAsyncInline(m_toStream.get(),
m_writeBufferPtr, m_writeBufferPtr,
m_bytesLeft, m_bytesLeft,
yieldTo(&TransferCoroutine::act)); yieldTo(&TransferCoroutine::act));
} }
}; };
@ -276,18 +294,25 @@ oatpp::async::Action readExactSizeDataAsyncInline(oatpp::data::stream::InputStre
} }
oatpp::os::io::Library::v_size writeExactSizeData(oatpp::data::stream::OutputStream* stream, const void* data, os::io::Library::v_size size) { oatpp::os::io::Library::v_size writeExactSizeData(oatpp::data::stream::OutputStream* stream, const void* data, os::io::Library::v_size size) {
const char* buffer = (char*)data; const char* buffer = (char*)data;
oatpp::os::io::Library::v_size progress = 0; oatpp::os::io::Library::v_size progress = 0;
while (progress < size) { while (progress < size) {
auto res = stream->write(&buffer[progress], size - progress); auto res = stream->write(&buffer[progress], size - progress);
if(res <= 0) { // if res == 0 then probably stream handles write() error incorrectly. return.
if(res == oatpp::data::stream::Errors::ERROR_IO_PIPE || if(res > 0) {
(res != oatpp::data::stream::Errors::ERROR_IO_RETRY && res != oatpp::data::stream::Errors::ERROR_IO_WAIT_RETRY)) { progress += res;
return progress; } else { // if res == 0 then probably stream handles write() error incorrectly. return.
if(res == oatpp::data::stream::Errors::ERROR_IO_RETRY || res == oatpp::data::stream::Errors::ERROR_IO_WAIT_RETRY) {
continue;
} }
return progress;
} }
progress += res;
} }
return progress; return progress;
} }

View File

@ -138,12 +138,15 @@ const std::shared_ptr<OutputStream>& operator << (const std::shared_ptr<OutputSt
/** /**
* Read bytes from @fromStream" and write to @toStream" using @buffer of size @bufferSize * Read bytes from @fromStream" and write to @toStream" using @buffer of size @bufferSize
* transfer up to transferSize or until error if transferSize == 0
* throws in case readCount != writeCount
*/ */
void transfer(const std::shared_ptr<InputStream>& fromStream, oatpp::os::io::Library::v_size transfer(const std::shared_ptr<InputStream>& fromStream,
const std::shared_ptr<OutputStream>& toStream, const std::shared_ptr<OutputStream>& toStream,
oatpp::os::io::Library::v_size transferSize, oatpp::os::io::Library::v_size transferSize,
void* buffer, void* buffer,
oatpp::os::io::Library::v_size bufferSize); oatpp::os::io::Library::v_size bufferSize);
/** /**
* Same as transfer but asynchronous * Same as transfer but asynchronous

View File

@ -85,7 +85,7 @@ void SimpleBodyDecoder::doChunkedDecoding(const std::shared_ptr<oatpp::data::str
void SimpleBodyDecoder::decode(const std::shared_ptr<Protocol::Headers>& headers, void SimpleBodyDecoder::decode(const std::shared_ptr<Protocol::Headers>& headers,
const std::shared_ptr<oatpp::data::stream::InputStream>& bodyStream, const std::shared_ptr<oatpp::data::stream::InputStream>& bodyStream,
const std::shared_ptr<oatpp::data::stream::OutputStream>& toStream) { const std::shared_ptr<oatpp::data::stream::OutputStream>& toStream) const {
auto transferEncoding = headers->get(Header::TRANSFER_ENCODING, nullptr); auto transferEncoding = headers->get(Header::TRANSFER_ENCODING, nullptr);
if(transferEncoding && transferEncoding->equals(Header::Value::TRANSFER_ENCODING_CHUNKED)) { if(transferEncoding && transferEncoding->equals(Header::Value::TRANSFER_ENCODING_CHUNKED)) {
@ -111,7 +111,7 @@ void SimpleBodyDecoder::decode(const std::shared_ptr<Protocol::Headers>& headers
oatpp::async::Action SimpleBodyDecoder::doChunkedDecodingAsync(oatpp::async::AbstractCoroutine* parentCoroutine, oatpp::async::Action SimpleBodyDecoder::doChunkedDecodingAsync(oatpp::async::AbstractCoroutine* parentCoroutine,
const oatpp::async::Action& actionOnReturn, const oatpp::async::Action& actionOnReturn,
const std::shared_ptr<oatpp::data::stream::InputStream>& fromStream, const std::shared_ptr<oatpp::data::stream::InputStream>& fromStream,
const std::shared_ptr<oatpp::data::stream::OutputStream>& toStream) const { const std::shared_ptr<oatpp::data::stream::OutputStream>& toStream) {
class ChunkedDecoder : public oatpp::async::Coroutine<ChunkedDecoder> { class ChunkedDecoder : public oatpp::async::Coroutine<ChunkedDecoder> {
private: private:

View File

@ -45,6 +45,7 @@ void AsyncHttpConnectionHandler::Task::consumeConnections(oatpp::async::Processo
while (curr != nullptr) { while (curr != nullptr) {
auto coroutine = HttpProcessor::Coroutine::getBench().obtain(m_router, auto coroutine = HttpProcessor::Coroutine::getBench().obtain(m_router,
m_bodyDecoder,
m_errorHandler, m_errorHandler,
m_requestInterceptors, m_requestInterceptors,
curr->getData()->connection, curr->getData()->connection,

View File

@ -31,6 +31,8 @@
#include "./HttpRouter.hpp" #include "./HttpRouter.hpp"
#include "oatpp/web/protocol/http/incoming/SimpleBodyDecoder.hpp"
#include "oatpp/web/protocol/http/incoming/Request.hpp" #include "oatpp/web/protocol/http/incoming/Request.hpp"
#include "oatpp/web/protocol/http/outgoing/Response.hpp" #include "oatpp/web/protocol/http/outgoing/Response.hpp"
@ -62,25 +64,29 @@ private:
Connections m_connections; Connections m_connections;
private: private:
HttpRouter* m_router; HttpRouter* m_router;
std::shared_ptr<const oatpp::web::protocol::http::incoming::BodyDecoder> m_bodyDecoder;
std::shared_ptr<handler::ErrorHandler> m_errorHandler; std::shared_ptr<handler::ErrorHandler> m_errorHandler;
HttpProcessor::RequestInterceptors* m_requestInterceptors; HttpProcessor::RequestInterceptors* m_requestInterceptors;
std::mutex m_taskMutex; std::mutex m_taskMutex;
std::condition_variable m_taskCondition; std::condition_variable m_taskCondition;
public: public:
Task(HttpRouter* router, Task(HttpRouter* router,
const std::shared_ptr<const oatpp::web::protocol::http::incoming::BodyDecoder>& bodyDecoder,
const std::shared_ptr<handler::ErrorHandler>& errorHandler, const std::shared_ptr<handler::ErrorHandler>& errorHandler,
HttpProcessor::RequestInterceptors* requestInterceptors) HttpProcessor::RequestInterceptors* requestInterceptors)
: m_atom(false) : m_atom(false)
, m_router(router) , m_router(router)
, m_bodyDecoder(bodyDecoder)
, m_errorHandler(errorHandler) , m_errorHandler(errorHandler)
, m_requestInterceptors(requestInterceptors) , m_requestInterceptors(requestInterceptors)
{} {}
public: public:
static std::shared_ptr<Task> createShared(HttpRouter* router, static std::shared_ptr<Task> createShared(HttpRouter* router,
const std::shared_ptr<const oatpp::web::protocol::http::incoming::BodyDecoder>& bodyDecoder,
const std::shared_ptr<handler::ErrorHandler>& errorHandler, const std::shared_ptr<handler::ErrorHandler>& errorHandler,
HttpProcessor::RequestInterceptors* requestInterceptors){ HttpProcessor::RequestInterceptors* requestInterceptors){
return std::make_shared<Task>(router, errorHandler, requestInterceptors); return std::make_shared<Task>(router, bodyDecoder, errorHandler, requestInterceptors);
} }
void run() override; void run() override;
@ -108,9 +114,14 @@ public:
, m_taskBalancer(0) , m_taskBalancer(0)
, m_threadCount(threadCount) , m_threadCount(threadCount)
{ {
// TODO make bodyDecoder configurable here
std::shared_ptr<const oatpp::web::protocol::http::incoming::BodyDecoder> bodyDecoder =
std::make_shared<oatpp::web::protocol::http::incoming::SimpleBodyDecoder>();
m_tasks = new std::shared_ptr<Task>[m_threadCount]; m_tasks = new std::shared_ptr<Task>[m_threadCount];
for(v_int32 i = 0; i < m_threadCount; i++) { for(v_int32 i = 0; i < m_threadCount; i++) {
auto task = Task::createShared(m_router.get(), m_errorHandler, &m_requestInterceptors); auto task = Task::createShared(m_router.get(), bodyDecoder, m_errorHandler, &m_requestInterceptors);
m_tasks[i] = task; m_tasks[i] = task;
concurrency::Thread thread(task); concurrency::Thread thread(task);
thread.detach(); thread.detach();

View File

@ -42,7 +42,7 @@ void HttpConnectionHandler::Task::run(){
bool keepAlive = true; bool keepAlive = true;
do { do {
auto response = HttpProcessor::processRequest(m_router, m_connection, m_errorHandler, m_requestInterceptors, buffer, bufferSize, inStream, keepAlive); auto response = HttpProcessor::processRequest(m_router, m_connection, m_bodyDecoder, m_errorHandler, m_requestInterceptors, buffer, bufferSize, inStream, keepAlive);
if(response) { if(response) {
outStream->setBufferPosition(0, 0); outStream->setBufferPosition(0, 0);
@ -59,7 +59,7 @@ void HttpConnectionHandler::Task::run(){
void HttpConnectionHandler::handleConnection(const std::shared_ptr<oatpp::data::stream::IOStream>& connection){ void HttpConnectionHandler::handleConnection(const std::shared_ptr<oatpp::data::stream::IOStream>& connection){
/* Create working thread */ /* Create working thread */
concurrency::Thread thread(Task::createShared(m_router.get(), connection, m_errorHandler, &m_requestInterceptors)); concurrency::Thread thread(Task::createShared(m_router.get(), connection, m_bodyDecoder, m_errorHandler, &m_requestInterceptors));
/* Get hardware concurrency -1 in order to have 1cpu free of workers. */ /* Get hardware concurrency -1 in order to have 1cpu free of workers. */
v_int32 concurrency = oatpp::concurrency::Thread::getHardwareConcurrency(); v_int32 concurrency = oatpp::concurrency::Thread::getHardwareConcurrency();

View File

@ -29,6 +29,8 @@
#include "./handler/ErrorHandler.hpp" #include "./handler/ErrorHandler.hpp"
#include "./HttpRouter.hpp" #include "./HttpRouter.hpp"
#include "oatpp/web/protocol/http/incoming/SimpleBodyDecoder.hpp"
#include "oatpp/web/protocol/http/incoming/Request.hpp" #include "oatpp/web/protocol/http/incoming/Request.hpp"
#include "oatpp/web/protocol/http/outgoing/Response.hpp" #include "oatpp/web/protocol/http/outgoing/Response.hpp"
@ -50,15 +52,18 @@ private:
private: private:
HttpRouter* m_router; HttpRouter* m_router;
std::shared_ptr<oatpp::data::stream::IOStream> m_connection; std::shared_ptr<oatpp::data::stream::IOStream> m_connection;
std::shared_ptr<const oatpp::web::protocol::http::incoming::BodyDecoder> m_bodyDecoder;
std::shared_ptr<handler::ErrorHandler> m_errorHandler; std::shared_ptr<handler::ErrorHandler> m_errorHandler;
HttpProcessor::RequestInterceptors* m_requestInterceptors; HttpProcessor::RequestInterceptors* m_requestInterceptors;
public: public:
Task(HttpRouter* router, Task(HttpRouter* router,
const std::shared_ptr<oatpp::data::stream::IOStream>& connection, const std::shared_ptr<oatpp::data::stream::IOStream>& connection,
const std::shared_ptr<const oatpp::web::protocol::http::incoming::BodyDecoder>& bodyDecoder,
const std::shared_ptr<handler::ErrorHandler>& errorHandler, const std::shared_ptr<handler::ErrorHandler>& errorHandler,
HttpProcessor::RequestInterceptors* requestInterceptors) HttpProcessor::RequestInterceptors* requestInterceptors)
: m_router(router) : m_router(router)
, m_connection(connection) , m_connection(connection)
, m_bodyDecoder(bodyDecoder)
, m_errorHandler(errorHandler) , m_errorHandler(errorHandler)
, m_requestInterceptors(requestInterceptors) , m_requestInterceptors(requestInterceptors)
{} {}
@ -66,9 +71,10 @@ private:
static std::shared_ptr<Task> createShared(HttpRouter* router, static std::shared_ptr<Task> createShared(HttpRouter* router,
const std::shared_ptr<oatpp::data::stream::IOStream>& connection, const std::shared_ptr<oatpp::data::stream::IOStream>& connection,
const std::shared_ptr<const oatpp::web::protocol::http::incoming::BodyDecoder>& bodyDecoder,
const std::shared_ptr<handler::ErrorHandler>& errorHandler, const std::shared_ptr<handler::ErrorHandler>& errorHandler,
HttpProcessor::RequestInterceptors* requestInterceptors) { HttpProcessor::RequestInterceptors* requestInterceptors) {
return std::make_shared<Task>(router, connection, errorHandler, requestInterceptors); return std::make_shared<Task>(router, connection, bodyDecoder, errorHandler, requestInterceptors);
} }
void run() override; void run() override;
@ -77,11 +83,13 @@ private:
private: private:
std::shared_ptr<HttpRouter> m_router; std::shared_ptr<HttpRouter> m_router;
std::shared_ptr<const oatpp::web::protocol::http::incoming::BodyDecoder> m_bodyDecoder;
std::shared_ptr<handler::ErrorHandler> m_errorHandler; std::shared_ptr<handler::ErrorHandler> m_errorHandler;
HttpProcessor::RequestInterceptors m_requestInterceptors; HttpProcessor::RequestInterceptors m_requestInterceptors;
public: public:
HttpConnectionHandler(const std::shared_ptr<HttpRouter>& router) HttpConnectionHandler(const std::shared_ptr<HttpRouter>& router)
: m_router(router) : m_router(router)
, m_bodyDecoder(std::make_shared<oatpp::web::protocol::http::incoming::SimpleBodyDecoder>())
, m_errorHandler(handler::DefaultErrorHandler::createShared()) , m_errorHandler(handler::DefaultErrorHandler::createShared())
{} {}
public: public: