From 2eb807f1d7615915299bb4e0ee251081efe77e7f Mon Sep 17 00:00:00 2001 From: lganzzzo Date: Sun, 16 Sep 2018 03:37:05 +0300 Subject: [PATCH] better stream io api --- core/data/stream/ChunkedBuffer.cpp | 2 +- core/data/stream/Stream.cpp | 146 ++++++++++-------- core/data/stream/Stream.hpp | 82 ++++++---- core/data/stream/StreamBufferedProxy.cpp | 4 +- network/Connection.cpp | 12 +- network/virtual_/Pipe.cpp | 16 +- web/client/HttpRequestExecutor.cpp | 2 +- web/protocol/http/incoming/BodyDecoder.cpp | 8 +- web/protocol/http/outgoing/Body.hpp | 11 +- web/protocol/http/outgoing/BufferBody.hpp | 18 +-- .../http/outgoing/ChunkedBufferBody.hpp | 6 +- web/protocol/http/outgoing/DtoBody.hpp | 7 +- web/protocol/http/outgoing/Response.cpp | 2 +- web/server/HttpProcessor.cpp | 6 +- 14 files changed, 176 insertions(+), 146 deletions(-) diff --git a/core/data/stream/ChunkedBuffer.cpp b/core/data/stream/ChunkedBuffer.cpp index 9d8dcb45..ae34f7e3 100644 --- a/core/data/stream/ChunkedBuffer.cpp +++ b/core/data/stream/ChunkedBuffer.cpp @@ -261,7 +261,7 @@ oatpp::async::Action ChunkedBuffer::flushToStreamAsync(oatpp::async::AbstractCor } Action writeCurrData() { - return IOStream::writeDataAsyncInline(m_stream.get(), m_currData, m_currDataSize, m_nextAction); + return oatpp::data::stream::writeDataAsyncInline(m_stream.get(), m_currData, m_currDataSize, m_nextAction); } }; diff --git a/core/data/stream/Stream.cpp b/core/data/stream/Stream.cpp index 002f2357..b4f8ae74 100644 --- a/core/data/stream/Stream.cpp +++ b/core/data/stream/Stream.cpp @@ -71,69 +71,6 @@ os::io::Library::v_size OutputStream::writeAsString(bool value) { } } -oatpp::async::Action IOStream::writeDataAsyncInline(oatpp::data::stream::OutputStream* stream, - const void*& data, - os::io::Library::v_size& size, - const oatpp::async::Action& nextAction) { - auto res = stream->write(data, size); - if(res == oatpp::data::stream::IOStream::ERROR_IO_WAIT_RETRY) { - return oatpp::async::Action::_WAIT_RETRY; - } else if(res == oatpp::data::stream::IOStream::ERROR_IO_RETRY) { - return oatpp::async::Action::_REPEAT; - } else if(res == oatpp::data::stream::IOStream::ERROR_IO_PIPE) { - return oatpp::async::Action::_ABORT; - } else if( res < 0) { - return oatpp::async::Action(oatpp::async::Error(ERROR_ASYNC_FAILED_TO_WRITE_DATA)); - } else if(res < size) { - data = &((p_char8) data)[res]; - size = size - res; - return oatpp::async::Action::_REPEAT; - } - return nextAction; -} - -oatpp::async::Action IOStream::readSomeDataAsyncInline(oatpp::data::stream::InputStream* stream, - void*& data, - os::io::Library::v_size& bytesLeftToRead, - const oatpp::async::Action& nextAction) { - auto res = stream->read(data, bytesLeftToRead); - if(res == oatpp::data::stream::IOStream::ERROR_IO_WAIT_RETRY) { - return oatpp::async::Action::_WAIT_RETRY; - } else if(res == oatpp::data::stream::IOStream::ERROR_IO_RETRY) { - return oatpp::async::Action::_REPEAT; - } else if( res < 0) { - return oatpp::async::Action(oatpp::async::Error(ERROR_ASYNC_FAILED_TO_READ_DATA)); - } else if(res < bytesLeftToRead) { - data = &((p_char8) data)[res]; - bytesLeftToRead -= res; - return nextAction; - } - bytesLeftToRead -= res; - return nextAction; -} - -oatpp::async::Action IOStream::readExactSizeDataAsyncInline(oatpp::data::stream::InputStream* stream, - void*& data, - os::io::Library::v_size& bytesLeftToRead, - const oatpp::async::Action& nextAction) { - auto res = stream->read(data, bytesLeftToRead); - if(res == oatpp::data::stream::IOStream::ERROR_IO_WAIT_RETRY) { - return oatpp::async::Action::_WAIT_RETRY; - } else if(res == oatpp::data::stream::IOStream::ERROR_IO_RETRY) { - return oatpp::async::Action::_REPEAT; - } else if(res == oatpp::data::stream::IOStream::ERROR_IO_PIPE) { - return oatpp::async::Action::_ABORT; - } else if( res < 0) { - return oatpp::async::Action(oatpp::async::Error(ERROR_ASYNC_FAILED_TO_READ_DATA)); - } else if(res < bytesLeftToRead) { - data = &((p_char8) data)[res]; - bytesLeftToRead -= res; - return oatpp::async::Action::_REPEAT; - } - bytesLeftToRead -= res; - return nextAction; -} - // Functions const std::shared_ptr& operator << @@ -247,7 +184,7 @@ oatpp::async::Action transferAsync(oatpp::async::AbstractCoroutine* parentCorout } Action doRead() { - return oatpp::data::stream::IOStream::readSomeDataAsyncInline(m_fromStream.get(), + return oatpp::data::stream::readSomeDataAsyncInline(m_fromStream.get(), m_readBufferPtr, m_bytesLeft, yieldTo(&TransferCoroutine::prepareWrite)); @@ -260,7 +197,7 @@ oatpp::async::Action transferAsync(oatpp::async::AbstractCoroutine* parentCorout } Action doWrite() { - return oatpp::data::stream::IOStream::writeDataAsyncInline(m_toStream.get(), + return oatpp::data::stream::writeDataAsyncInline(m_toStream.get(), m_writeBufferPtr, m_bytesLeft, yieldTo(&TransferCoroutine::act)); @@ -272,4 +209,83 @@ oatpp::async::Action transferAsync(oatpp::async::AbstractCoroutine* parentCorout } +oatpp::async::Action writeDataAsyncInline(oatpp::data::stream::OutputStream* stream, + const void*& data, + os::io::Library::v_size& size, + const oatpp::async::Action& nextAction) { + auto res = stream->write(data, size); + if(res == oatpp::data::stream::Errors::ERROR_IO_WAIT_RETRY) { + return oatpp::async::Action::_WAIT_RETRY; + } else if(res == oatpp::data::stream::Errors::ERROR_IO_RETRY) { + return oatpp::async::Action::_REPEAT; + } else if(res == oatpp::data::stream::Errors::ERROR_IO_PIPE) { + return oatpp::async::Action::_ABORT; + } else if( res < 0) { + return oatpp::async::Action(oatpp::async::Error(Errors::ERROR_ASYNC_FAILED_TO_WRITE_DATA)); + } else if(res < size) { + data = &((p_char8) data)[res]; + size = size - res; + return oatpp::async::Action::_REPEAT; + } + return nextAction; +} + +oatpp::async::Action readSomeDataAsyncInline(oatpp::data::stream::InputStream* stream, + void*& data, + os::io::Library::v_size& bytesLeftToRead, + const oatpp::async::Action& nextAction) { + auto res = stream->read(data, bytesLeftToRead); + if(res == oatpp::data::stream::Errors::ERROR_IO_WAIT_RETRY) { + return oatpp::async::Action::_WAIT_RETRY; + } else if(res == oatpp::data::stream::Errors::ERROR_IO_RETRY) { + return oatpp::async::Action::_REPEAT; + } else if( res < 0) { + return oatpp::async::Action(oatpp::async::Error(Errors::ERROR_ASYNC_FAILED_TO_READ_DATA)); + } else if(res < bytesLeftToRead) { + data = &((p_char8) data)[res]; + bytesLeftToRead -= res; + return nextAction; + } + bytesLeftToRead -= res; + return nextAction; +} + +oatpp::async::Action readExactSizeDataAsyncInline(oatpp::data::stream::InputStream* stream, + void*& data, + os::io::Library::v_size& bytesLeftToRead, + const oatpp::async::Action& nextAction) { + auto res = stream->read(data, bytesLeftToRead); + if(res == oatpp::data::stream::Errors::ERROR_IO_WAIT_RETRY) { + return oatpp::async::Action::_WAIT_RETRY; + } else if(res == oatpp::data::stream::Errors::ERROR_IO_RETRY) { + return oatpp::async::Action::_REPEAT; + } else if(res == oatpp::data::stream::Errors::ERROR_IO_PIPE) { + return oatpp::async::Action::_ABORT; + } else if( res < 0) { + return oatpp::async::Action(oatpp::async::Error(Errors::ERROR_ASYNC_FAILED_TO_READ_DATA)); + } else if(res < bytesLeftToRead) { + data = &((p_char8) data)[res]; + bytesLeftToRead -= res; + return oatpp::async::Action::_REPEAT; + } + bytesLeftToRead -= res; + return nextAction; +} + +oatpp::os::io::Library::v_size writeExactSizeData(oatpp::data::stream::OutputStream* stream, const void* data, os::io::Library::v_size size) { + const char* buffer = (char*)data; + oatpp::os::io::Library::v_size progress = 0; + while (progress < size) { + auto res = stream->write(&buffer[progress], size - progress); + if(res <= 0) { // if res == 0 then probably stream handles write() error incorrectly. return. + if(res == oatpp::data::stream::Errors::ERROR_IO_PIPE || + (res != oatpp::data::stream::Errors::ERROR_IO_RETRY && res != oatpp::data::stream::Errors::ERROR_IO_WAIT_RETRY)) { + return progress; + } + } + progress += res; + } + return progress; +} + }}} diff --git a/core/data/stream/Stream.hpp b/core/data/stream/Stream.hpp index 9e4c5944..352296fb 100644 --- a/core/data/stream/Stream.hpp +++ b/core/data/stream/Stream.hpp @@ -36,9 +36,24 @@ namespace oatpp { namespace data{ namespace stream { +class Errors { +public: + constexpr static os::io::Library::v_size ERROR_IO_NOTHING_TO_READ = -1001; + constexpr static os::io::Library::v_size ERROR_IO_WAIT_RETRY = -1002; + constexpr static os::io::Library::v_size ERROR_IO_RETRY = -1003; + constexpr static os::io::Library::v_size ERROR_IO_PIPE = -1004; + + constexpr static const char* const ERROR_ASYNC_FAILED_TO_WRITE_DATA = "ERROR_ASYNC_FAILED_TO_WRITE_DATA"; + constexpr static const char* const ERROR_ASYNC_FAILED_TO_READ_DATA = "ERROR_ASYNC_FAILED_TO_READ_DATA"; +}; + class OutputStream { public: + /** + * Write data to stream up to count bytes, and return number of bytes actually written + * It is a legal case if return result < count. Caller should handle this! + */ virtual os::io::Library::v_size write(const void *data, os::io::Library::v_size count) = 0; os::io::Library::v_size write(const char* data){ @@ -63,41 +78,16 @@ public: class InputStream { public: + /** + * Read data from stream up to count bytes, and return number of bytes actually read + * It is a legal case if return result < count. Caller should handle this! + */ virtual os::io::Library::v_size read(void *data, os::io::Library::v_size count) = 0; }; class IOStream : public InputStream, public OutputStream { -public: - constexpr static os::io::Library::v_size ERROR_IO_NOTHING_TO_READ = -1001; - constexpr static os::io::Library::v_size ERROR_IO_WAIT_RETRY = -1002; - constexpr static os::io::Library::v_size ERROR_IO_RETRY = -1003; - constexpr static os::io::Library::v_size ERROR_IO_PIPE = -1004; - - constexpr static const char* const ERROR_ASYNC_FAILED_TO_WRITE_DATA = "ERROR_ASYNC_FAILED_TO_WRITE_DATA"; - constexpr static const char* const ERROR_ASYNC_FAILED_TO_READ_DATA = "ERROR_ASYNC_FAILED_TO_READ_DATA"; - public: typedef os::io::Library::v_size v_size; - - /** - * Async write data withot starting new Coroutine. - * Should be called from a separate Coroutine method - */ - static oatpp::async::Action writeDataAsyncInline(oatpp::data::stream::OutputStream* stream, - const void*& data, - os::io::Library::v_size& size, - const oatpp::async::Action& nextAction); - - static oatpp::async::Action readSomeDataAsyncInline(oatpp::data::stream::InputStream* stream, - void*& data, - os::io::Library::v_size& bytesLeftToRead, - const oatpp::async::Action& nextAction); - - static oatpp::async::Action readExactSizeDataAsyncInline(oatpp::data::stream::InputStream* stream, - void*& data, - os::io::Library::v_size& bytesLeftToRead, - const oatpp::async::Action& nextAction); - }; class CompoundIOStream : public oatpp::base::Controllable, public IOStream { @@ -145,13 +135,19 @@ const std::shared_ptr& operator << (const std::shared_ptr& operator << (const std::shared_ptr& s, v_float64 value); const std::shared_ptr& operator << (const std::shared_ptr& s, bool value); - + +/** + * Read bytes from @fromStream" and write to @toStream" using @buffer of size @bufferSize + */ void transfer(const std::shared_ptr& fromStream, const std::shared_ptr& toStream, oatpp::os::io::Library::v_size transferSize, void* buffer, oatpp::os::io::Library::v_size bufferSize); +/** + * Same as transfer but asynchronous + */ oatpp::async::Action transferAsync(oatpp::async::AbstractCoroutine* parentCoroutine, const oatpp::async::Action& actionOnReturn, const std::shared_ptr& fromStream, @@ -159,6 +155,32 @@ oatpp::async::Action transferAsync(oatpp::async::AbstractCoroutine* parentCorout oatpp::os::io::Library::v_size transferSize, const std::shared_ptr& buffer); +/** + * Async write data withot starting new Coroutine. + * Should be called from a separate Coroutine method + */ +oatpp::async::Action writeDataAsyncInline(oatpp::data::stream::OutputStream* stream, + const void*& data, + os::io::Library::v_size& size, + const oatpp::async::Action& nextAction); + +oatpp::async::Action readSomeDataAsyncInline(oatpp::data::stream::InputStream* stream, + void*& data, + os::io::Library::v_size& bytesLeftToRead, + const oatpp::async::Action& nextAction); + +oatpp::async::Action readExactSizeDataAsyncInline(oatpp::data::stream::InputStream* stream, + void*& data, + os::io::Library::v_size& bytesLeftToRead, + const oatpp::async::Action& nextAction); + +/** + * Write exact amount of bytes to stream. + * returns exact amount of bytes was written. + * return result can be < size only in case of some disaster like broken pipe + */ +oatpp::os::io::Library::v_size writeExactSizeData(oatpp::data::stream::OutputStream* stream, const void* data, os::io::Library::v_size size); + }}} #endif /* defined(_data_Stream) */ diff --git a/core/data/stream/StreamBufferedProxy.cpp b/core/data/stream/StreamBufferedProxy.cpp index ddaa5cf1..fb9b1436 100644 --- a/core/data/stream/StreamBufferedProxy.cpp +++ b/core/data/stream/StreamBufferedProxy.cpp @@ -123,9 +123,9 @@ oatpp::async::Action OutputStreamBufferedProxy::flushAsync(oatpp::async::Abstrac m_stream->m_pos = 0; m_stream->m_posEnd = 0; return finish(); - } else if(result == IOStream::ERROR_IO_WAIT_RETRY) { + } else if(result == oatpp::data::stream::Errors::ERROR_IO_WAIT_RETRY) { return waitRetry(); - } else if(result == IOStream::ERROR_IO_RETRY) { + } else if(result == oatpp::data::stream::Errors::ERROR_IO_RETRY) { return repeat(); } else if(result > 0){ m_stream->m_pos += (v_bufferSize) result; diff --git a/network/Connection.cpp b/network/Connection.cpp index 3a481edd..982c7de4 100644 --- a/network/Connection.cpp +++ b/network/Connection.cpp @@ -45,11 +45,11 @@ Connection::Library::v_size Connection::write(const void *buff, Library::v_size if(result <= 0) { auto e = errno; if(e == EAGAIN || e == EWOULDBLOCK){ - return ERROR_IO_WAIT_RETRY; // For async io. In case socket is non_blocking + return oatpp::data::stream::Errors::ERROR_IO_WAIT_RETRY; // For async io. In case socket is non_blocking } else if(e == EINTR) { - return ERROR_IO_RETRY; + return oatpp::data::stream::Errors::ERROR_IO_RETRY; } else if(e == EPIPE) { - return ERROR_IO_PIPE; + return oatpp::data::stream::Errors::ERROR_IO_PIPE; } else { //OATPP_LOGD("Connection", "write errno=%d", e); } @@ -63,11 +63,11 @@ Connection::Library::v_size Connection::read(void *buff, Library::v_size count){ if(result <= 0) { auto e = errno; if(e == EAGAIN || e == EWOULDBLOCK){ - return ERROR_IO_WAIT_RETRY; // For async io. In case socket is non_blocking + return oatpp::data::stream::Errors::ERROR_IO_WAIT_RETRY; // For async io. In case socket is non_blocking } else if(e == EINTR) { - return ERROR_IO_RETRY; + return oatpp::data::stream::Errors::ERROR_IO_RETRY; } else if(e == ECONNRESET) { - return ERROR_IO_PIPE; + return oatpp::data::stream::Errors::ERROR_IO_PIPE; } else { //OATPP_LOGD("Connection", "write errno=%d", e); } diff --git a/network/virtual_/Pipe.cpp b/network/virtual_/Pipe.cpp index b0efae13..e61a5298 100644 --- a/network/virtual_/Pipe.cpp +++ b/network/virtual_/Pipe.cpp @@ -30,7 +30,7 @@ os::io::Library::v_size Pipe::Reader::read(void *data, os::io::Library::v_size c Pipe& pipe = *m_pipe; if(!pipe.m_alive) { - return oatpp::data::stream::IOStream::ERROR_IO_PIPE; + return oatpp::data::stream::Errors::ERROR_IO_PIPE; } if(m_nonBlocking) { @@ -40,7 +40,7 @@ os::io::Library::v_size Pipe::Reader::read(void *data, os::io::Library::v_size c pipe.m_writeCondition.notify_one(); return result; } else { - return oatpp::data::stream::IOStream::ERROR_IO_WAIT_RETRY; + return oatpp::data::stream::Errors::ERROR_IO_WAIT_RETRY; } } @@ -53,11 +53,11 @@ os::io::Library::v_size Pipe::Reader::read(void *data, os::io::Library::v_size c lock.unlock(); pipe.m_writeCondition.notify_all(); pipe.m_readCondition.notify_all(); - return oatpp::data::stream::IOStream::ERROR_IO_PIPE; + return oatpp::data::stream::Errors::ERROR_IO_PIPE; } if(pipe.m_buffer.availableToRead() == 0) { - return oatpp::data::stream::IOStream::ERROR_IO_RETRY; + return oatpp::data::stream::Errors::ERROR_IO_RETRY; } auto result = pipe.m_buffer.read(data, count); @@ -72,7 +72,7 @@ os::io::Library::v_size Pipe::Writer::write(const void *data, os::io::Library::v Pipe& pipe = *m_pipe; if(!pipe.m_alive) { - return oatpp::data::stream::IOStream::ERROR_IO_PIPE; + return oatpp::data::stream::Errors::ERROR_IO_PIPE; } if(m_nonBlocking) { @@ -82,7 +82,7 @@ os::io::Library::v_size Pipe::Writer::write(const void *data, os::io::Library::v pipe.m_readCondition.notify_one(); return result; } else { - return oatpp::data::stream::IOStream::ERROR_IO_WAIT_RETRY; + return oatpp::data::stream::Errors::ERROR_IO_WAIT_RETRY; } } @@ -95,11 +95,11 @@ os::io::Library::v_size Pipe::Writer::write(const void *data, os::io::Library::v lock.unlock(); pipe.m_writeCondition.notify_all(); pipe.m_readCondition.notify_all(); - return oatpp::data::stream::IOStream::ERROR_IO_PIPE; + return oatpp::data::stream::Errors::ERROR_IO_PIPE; } if(pipe.m_buffer.availableToWrite() == 0) { - return oatpp::data::stream::IOStream::ERROR_IO_RETRY; + return oatpp::data::stream::Errors::ERROR_IO_RETRY; } auto result = pipe.m_buffer.write(data, count); diff --git a/web/client/HttpRequestExecutor.cpp b/web/client/HttpRequestExecutor.cpp index f60ebaee..95dc99a3 100644 --- a/web/client/HttpRequestExecutor.cpp +++ b/web/client/HttpRequestExecutor.cpp @@ -147,7 +147,7 @@ oatpp::async::Action HttpRequestExecutor::executeAsync(oatpp::async::AbstractCor } Action readResponse() { - return oatpp::data::stream::IOStream:: + return oatpp::data::stream:: readSomeDataAsyncInline(m_connection.get(), m_bufferPointer, m_bufferBytesLeftToRead, yieldTo(&ExecutorCoroutine::parseResponse)); } diff --git a/web/protocol/http/incoming/BodyDecoder.cpp b/web/protocol/http/incoming/BodyDecoder.cpp index b83323e3..1c32e665 100644 --- a/web/protocol/http/incoming/BodyDecoder.cpp +++ b/web/protocol/http/incoming/BodyDecoder.cpp @@ -150,9 +150,9 @@ oatpp::async::Action BodyDecoder::doChunkedDecodingAsync(oatpp::async::AbstractC Action readLineChar() { auto res = m_fromStream->read(&m_lineChar, 1); - if(res == oatpp::data::stream::IOStream::ERROR_IO_WAIT_RETRY) { + if(res == oatpp::data::stream::Errors::ERROR_IO_WAIT_RETRY) { return oatpp::async::Action::_WAIT_RETRY; - } else if(res == oatpp::data::stream::IOStream::ERROR_IO_RETRY) { + } else if(res == oatpp::data::stream::Errors::ERROR_IO_RETRY) { return oatpp::async::Action::_REPEAT; } else if( res < 0) { return error("[BodyDecoder::ChunkedDecoder] Can't read line char"); @@ -197,12 +197,12 @@ oatpp::async::Action BodyDecoder::doChunkedDecodingAsync(oatpp::async::AbstractC Action skipRN() { if(m_done) { - return oatpp::data::stream::IOStream::readExactSizeDataAsyncInline(m_fromStream.get(), + return oatpp::data::stream::readExactSizeDataAsyncInline(m_fromStream.get(), m_skipData, m_skipSize, finish()); } else { - return oatpp::data::stream::IOStream::readExactSizeDataAsyncInline(m_fromStream.get(), + return oatpp::data::stream::readExactSizeDataAsyncInline(m_fromStream.get(), m_skipData, m_skipSize, yieldTo(&ChunkedDecoder::readLineChar)); diff --git a/web/protocol/http/outgoing/Body.hpp b/web/protocol/http/outgoing/Body.hpp index 7449ea23..0a28d858 100644 --- a/web/protocol/http/outgoing/Body.hpp +++ b/web/protocol/http/outgoing/Body.hpp @@ -38,13 +38,16 @@ protected: typedef oatpp::collection::ListMap Headers; typedef oatpp::data::stream::OutputStream OutputStream; public: - virtual void declareHeaders(const std::shared_ptr& headers) = 0; /** - * Do not call this method if stream::write is non blocking! - * For fast (not network) BLOCKING streams only!!! + * declare headers describing body */ - virtual void writeToStream(const std::shared_ptr& stream) = 0; + virtual void declareHeaders(const std::shared_ptr& headers) noexcept = 0; + + /** + * write content to stream + */ + virtual void writeToStream(const std::shared_ptr& stream) noexcept = 0; virtual Action writeToStreamAsync(oatpp::async::AbstractCoroutine* parentCoroutine, const Action& actionOnReturn, diff --git a/web/protocol/http/outgoing/BufferBody.hpp b/web/protocol/http/outgoing/BufferBody.hpp index 36dfe5c6..f488ccfb 100644 --- a/web/protocol/http/outgoing/BufferBody.hpp +++ b/web/protocol/http/outgoing/BufferBody.hpp @@ -47,23 +47,13 @@ public: return Shared_Http_Outgoing_BufferBody_Pool::allocateShared(buffer); } - void declareHeaders(const std::shared_ptr& headers) override { + void declareHeaders(const std::shared_ptr& headers) noexcept override { headers->put(oatpp::web::protocol::http::Header::CONTENT_LENGTH, oatpp::utils::conversion::int32ToStr(m_buffer->getSize())); } - void writeToStream(const std::shared_ptr& stream) override { - oatpp::os::io::Library::v_size progress = 0; - while (progress < m_buffer->getSize()) { - auto res = stream->write(&m_buffer->getData()[progress], m_buffer->getSize() - progress); - if(res < 0) { - if(res == oatpp::data::stream::IOStream::ERROR_IO_PIPE || - (res != oatpp::data::stream::IOStream::ERROR_IO_RETRY && res != oatpp::data::stream::IOStream::ERROR_IO_WAIT_RETRY)) { - return; - } - } - progress += res; - } + void writeToStream(const std::shared_ptr& stream) noexcept override { + oatpp::data::stream::writeExactSizeData(stream.get(), m_buffer->getData(), m_buffer->getSize()); } public: @@ -85,7 +75,7 @@ public: {} Action act() override { - return oatpp::data::stream::IOStream::writeDataAsyncInline(m_stream.get(), m_currData, m_currDataSize, finish()); + return oatpp::data::stream::writeDataAsyncInline(m_stream.get(), m_currData, m_currDataSize, finish()); } }; diff --git a/web/protocol/http/outgoing/ChunkedBufferBody.hpp b/web/protocol/http/outgoing/ChunkedBufferBody.hpp index 56e1ddd5..6828e65b 100644 --- a/web/protocol/http/outgoing/ChunkedBufferBody.hpp +++ b/web/protocol/http/outgoing/ChunkedBufferBody.hpp @@ -58,7 +58,7 @@ public: return Shared_Http_Outgoing_ChunkedBufferBody_Pool::allocateShared(buffer, chunked); } - void declareHeaders(const std::shared_ptr& headers) override { + void declareHeaders(const std::shared_ptr& headers) noexcept override { if(m_chunked){ headers->put(oatpp::web::protocol::http::Header::TRANSFER_ENCODING, oatpp::web::protocol::http::Header::Value::TRANSFER_ENCODING_CHUNKED); @@ -68,7 +68,7 @@ public: } } - void writeToStream(const std::shared_ptr& stream) override { + void writeToStream(const std::shared_ptr& stream) noexcept override { if(m_chunked){ auto chunks = m_buffer->getChunks(); auto curr = chunks->getFirstNode(); @@ -147,7 +147,7 @@ public: } Action writeCurrData() { - return oatpp::data::stream::IOStream::writeDataAsyncInline(m_stream.get(), m_currData, m_currDataSize, m_nextAction); + return oatpp::data::stream::writeDataAsyncInline(m_stream.get(), m_currData, m_currDataSize, m_nextAction); } }; diff --git a/web/protocol/http/outgoing/DtoBody.hpp b/web/protocol/http/outgoing/DtoBody.hpp index 391c31a4..770e81b4 100644 --- a/web/protocol/http/outgoing/DtoBody.hpp +++ b/web/protocol/http/outgoing/DtoBody.hpp @@ -60,11 +60,10 @@ public: return Shared_Http_Outgoing_DtoBody_Pool::allocateShared(dto, objectMapper, chunked); } - void declareHeaders(const std::shared_ptr& headers) override { - if(!m_dto) { - throw std::runtime_error("Sending null object"); + void declareHeaders(const std::shared_ptr& headers) noexcept override { + if(m_dto) { + m_objectMapper->write(m_buffer, m_dto); } - m_objectMapper->write(m_buffer, m_dto); ChunkedBufferBody::declareHeaders(headers); headers->putIfNotExists(Header::CONTENT_TYPE, m_objectMapper->getInfo().http_content_type); } diff --git a/web/protocol/http/outgoing/Response.cpp b/web/protocol/http/outgoing/Response.cpp index bc568b61..cf76adbd 100644 --- a/web/protocol/http/outgoing/Response.cpp +++ b/web/protocol/http/outgoing/Response.cpp @@ -28,7 +28,7 @@ namespace oatpp { namespace web { namespace protocol { namespace http { namespace outgoing { -void Response::send(const std::shared_ptr& stream){ +void Response::send(const std::shared_ptr& stream) { if(body){ body->declareHeaders(headers); diff --git a/web/server/HttpProcessor.cpp b/web/server/HttpProcessor.cpp index 07c50340..32bcca61 100644 --- a/web/server/HttpProcessor.cpp +++ b/web/server/HttpProcessor.cpp @@ -121,7 +121,7 @@ HttpProcessor::processRequest(HttpRouter* router, return errorHandler->handleError(protocol::http::Status::CODE_404, "Current url has no mapping"); } - } if(readCount == oatpp::data::stream::IOStream::ERROR_IO_NOTHING_TO_READ) { + } if(readCount == oatpp::data::stream::Errors::ERROR_IO_NOTHING_TO_READ) { keepAlive = true; } @@ -178,9 +178,9 @@ HttpProcessor::Coroutine::Action HttpProcessor::Coroutine::act() { if(readCount > 0) { m_currentResponse = nullptr; return parseRequest((v_int32)readCount); - } else if(readCount == oatpp::data::stream::IOStream::ERROR_IO_WAIT_RETRY) { + } else if(readCount == oatpp::data::stream::Errors::ERROR_IO_WAIT_RETRY) { return waitRetry(); - } else if(readCount == oatpp::data::stream::IOStream::ERROR_IO_RETRY) { + } else if(readCount == oatpp::data::stream::Errors::ERROR_IO_RETRY) { return repeat(); } return abort();