From ad471079fe79cf53aa3a4f504ccc1908496208b0 Mon Sep 17 00:00:00 2001 From: lganzzzo Date: Sat, 13 Jul 2019 21:34:40 +0300 Subject: [PATCH] Refactor BodyDecoder::decode method to use stream::WriteCallback for Simple API --- src/oatpp/core/data/stream/Stream.cpp | 20 +++++++++++++++---- src/oatpp/core/data/stream/Stream.hpp | 10 +++++++--- .../protocol/http/incoming/BodyDecoder.cpp | 10 ++++++++-- .../protocol/http/incoming/BodyDecoder.hpp | 14 +++++++++++-- .../http/incoming/SimpleBodyDecoder.cpp | 10 +++++----- .../http/incoming/SimpleBodyDecoder.hpp | 8 ++++---- test/oatpp/network/virtual_/InterfaceTest.cpp | 18 +++++++++-------- .../web/mime/multipart/StatefulParserTest.cpp | 7 ++++--- 8 files changed, 66 insertions(+), 31 deletions(-) diff --git a/src/oatpp/core/data/stream/Stream.cpp b/src/oatpp/core/data/stream/Stream.cpp index 2a7a4ee9..590a092e 100644 --- a/src/oatpp/core/data/stream/Stream.cpp +++ b/src/oatpp/core/data/stream/Stream.cpp @@ -225,7 +225,7 @@ ConsistentOutputStream& operator << (ConsistentOutputStream& s, bool value) { } oatpp::data::v_io_size transfer(const std::shared_ptr& fromStream, - const std::shared_ptr& toStream, + WriteCallback* writeCallback, oatpp::data::v_io_size transferSize, void* buffer, oatpp::data::v_io_size bufferSize) @@ -240,11 +240,23 @@ oatpp::data::v_io_size transfer(const std::shared_ptr& fromStream, } auto readResult = fromStream->read(buffer, desiredReadCount); if(readResult > 0) { - auto writeReasul = writeExactSizeData(toStream.get(), buffer, readResult); - if(writeReasul != readResult) { - throw std::runtime_error("[oatpp::data::stream::transfer()]: Unknown Error. Can't continue transfer."); + + p_char8 data = (p_char8) buffer; + oatpp::data::v_io_size bytesLeft = readResult; + while(bytesLeft > 0) { + auto writeResult = writeCallback->write(data, bytesLeft); + if(writeResult > 0) { + data = &data[writeResult]; + bytesLeft -= writeResult; + } else if (writeResult == data::IOError::RETRY || writeResult == data::IOError::WAIT_RETRY) { + continue; + } else { + throw std::runtime_error("[oatpp::data::stream::transfer()]: Unknown Error. Can't continue transfer."); + } } + progress += readResult; + } else { if(readResult == data::IOError::RETRY || readResult == data::IOError::WAIT_RETRY) { continue; diff --git a/src/oatpp/core/data/stream/Stream.hpp b/src/oatpp/core/data/stream/Stream.hpp index ef6021f0..4fc56bd4 100644 --- a/src/oatpp/core/data/stream/Stream.hpp +++ b/src/oatpp/core/data/stream/Stream.hpp @@ -401,12 +401,16 @@ public: }; /** - * Read bytes from @fromStream" and write to @toStream" using @buffer of size @bufferSize + * Read bytes from `fromStream` and write to `writeCallback` using `buffer` of size `bufferSize` * transfer up to transferSize or until error if transferSize == 0 - * throws in case readCount != writeCount + * @param fromStream + * @param transferSize + * @param buffer + * @param bufferSize + * @return - amount of bytes actually transferred. */ oatpp::data::v_io_size transfer(const std::shared_ptr& fromStream, - const std::shared_ptr& toStream, + WriteCallback* writeCallback, oatpp::data::v_io_size transferSize, void* buffer, oatpp::data::v_io_size bufferSize); diff --git a/src/oatpp/web/protocol/http/incoming/BodyDecoder.cpp b/src/oatpp/web/protocol/http/incoming/BodyDecoder.cpp index ca88c624..72468823 100644 --- a/src/oatpp/web/protocol/http/incoming/BodyDecoder.cpp +++ b/src/oatpp/web/protocol/http/incoming/BodyDecoder.cpp @@ -24,8 +24,6 @@ #include "BodyDecoder.hpp" - - namespace oatpp { namespace web { namespace protocol { namespace http { namespace incoming { BodyDecoder::ToStringDecoder::ToStringDecoder(const BodyDecoder* decoder, @@ -37,6 +35,14 @@ BodyDecoder::ToStringDecoder::ToStringDecoder(const BodyDecoder* decoder, , m_chunkedBuffer(oatpp::data::stream::ChunkedBuffer::createShared()) {} +void BodyDecoder::decode(const Headers& headers, + const std::shared_ptr& bodyStream, + const std::shared_ptr& toStream) const +{ + oatpp::data::stream::DefaultWriteCallback callback(toStream.get()); + decode(headers, bodyStream, &callback); +} + async::Action BodyDecoder::ToStringDecoder::act() { return m_decoder->decodeAsync(m_headers, m_bodyStream, m_chunkedBuffer).next(yieldTo(&ToStringDecoder::onDecoded)); } diff --git a/src/oatpp/web/protocol/http/incoming/BodyDecoder.hpp b/src/oatpp/web/protocol/http/incoming/BodyDecoder.hpp index 1c69a59f..7f6a9820 100644 --- a/src/oatpp/web/protocol/http/incoming/BodyDecoder.hpp +++ b/src/oatpp/web/protocol/http/incoming/BodyDecoder.hpp @@ -104,11 +104,21 @@ public: * Implement this method! Decode bodyStream and write decoded data to toStream. * @param headers - Headers map. &id:oatpp::web::protocol::http::Headers;. * @param bodyStream - `std::shared_ptr` to &id:oatpp::data::stream::InputStream;. - * @param toStream - `std::shared_ptr` to &id:oatpp::data::stream::OutputStream;. + * @param writeCallback - &id:oatpp::data::stream::WriteCallback;. */ virtual void decode(const Headers& headers, const std::shared_ptr& bodyStream, - const std::shared_ptr& toStream) const = 0; + oatpp::data::stream::WriteCallback* writeCallback) const = 0; + + /** + * Decode using &id:oatpp::data::stream::DefaultWriteCallback;. + * @param headers - Headers map. &id:oatpp::web::protocol::http::Headers;. + * @param bodyStream - `std::shared_ptr` to &id:oatpp::data::stream::InputStream;. + * @param toStream - `std::shared_ptr` to &id:oatpp::data::stream::OutputStream;. + */ + void decode(const Headers& headers, + const std::shared_ptr& bodyStream, + const std::shared_ptr& toStream) const; /** * Implement this method! Same as &l:BodyDecoder::decode (); but Async. diff --git a/src/oatpp/web/protocol/http/incoming/SimpleBodyDecoder.cpp b/src/oatpp/web/protocol/http/incoming/SimpleBodyDecoder.cpp index c790f12f..7379c1dd 100644 --- a/src/oatpp/web/protocol/http/incoming/SimpleBodyDecoder.cpp +++ b/src/oatpp/web/protocol/http/incoming/SimpleBodyDecoder.cpp @@ -56,7 +56,7 @@ data::v_io_size SimpleBodyDecoder::readLine(const std::shared_ptr& fromStream, - const std::shared_ptr& toStream) { + oatpp::data::stream::WriteCallback* writeCallback) { auto buffer = oatpp::data::buffer::IOBuffer::createShared(); @@ -74,7 +74,7 @@ void SimpleBodyDecoder::doChunkedDecoding(const std::shared_ptr 0) { - oatpp::data::stream::transfer(fromStream, toStream, countToRead, buffer->getData(), buffer->getSize()); + oatpp::data::stream::transfer(fromStream, writeCallback, countToRead, buffer->getData(), buffer->getSize()); } fromStream->read(lineBuffer, 2); // just skip "\r\n" @@ -85,11 +85,11 @@ void SimpleBodyDecoder::doChunkedDecoding(const std::shared_ptr& bodyStream, - const std::shared_ptr& toStream) const { + oatpp::data::stream::WriteCallback* writeCallback) const { auto transferEncodingIt = headers.find(Header::TRANSFER_ENCODING); if(transferEncodingIt != headers.end() && transferEncodingIt->second == Header::Value::TRANSFER_ENCODING_CHUNKED) { - doChunkedDecoding(bodyStream, toStream); + doChunkedDecoding(bodyStream, writeCallback); } else { oatpp::data::v_io_size contentLength = 0; auto contentLengthStrIt = headers.find(Header::CONTENT_LENGTH); @@ -103,7 +103,7 @@ void SimpleBodyDecoder::decode(const Headers& headers, } auto buffer = oatpp::data::buffer::IOBuffer::createShared(); if(contentLength > 0) { - oatpp::data::stream::transfer(bodyStream, toStream, contentLength, buffer->getData(), buffer->getSize()); + oatpp::data::stream::transfer(bodyStream, writeCallback, contentLength, buffer->getData(), buffer->getSize()); } } } diff --git a/src/oatpp/web/protocol/http/incoming/SimpleBodyDecoder.hpp b/src/oatpp/web/protocol/http/incoming/SimpleBodyDecoder.hpp index 78a29696..2a12dc41 100644 --- a/src/oatpp/web/protocol/http/incoming/SimpleBodyDecoder.hpp +++ b/src/oatpp/web/protocol/http/incoming/SimpleBodyDecoder.hpp @@ -38,21 +38,21 @@ private: p_char8 buffer, data::v_io_size maxLineSize); static void doChunkedDecoding(const std::shared_ptr& from, - const std::shared_ptr& toStream); + oatpp::data::stream::WriteCallback* writeCallback); static oatpp::async::CoroutineStarter doChunkedDecodingAsync(const std::shared_ptr& fromStream, - const std::shared_ptr& toStream); + const std::shared_ptr& toStream); public: /** * Decode bodyStream and write decoded data to toStream. * @param headers - Headers map. &id:oatpp::web::protocol::http::Headers;. * @param bodyStream - `std::shared_ptr` to &id:oatpp::data::stream::InputStream;. - * @param toStream - `std::shared_ptr` to &id:oatpp::data::stream::OutputStream;. + * @param writeCallback - &id:oatpp::data::stream::WriteCallback;. */ void decode(const Headers& headers, const std::shared_ptr& bodyStream, - const std::shared_ptr& toStream) const override; + oatpp::data::stream::WriteCallback* writeCallback) const override; /** * Same as &l:SimpleBodyDecoder::decode (); but Async. diff --git a/test/oatpp/network/virtual_/InterfaceTest.cpp b/test/oatpp/network/virtual_/InterfaceTest.cpp index ff0aec1c..7be8ec0b 100644 --- a/test/oatpp/network/virtual_/InterfaceTest.cpp +++ b/test/oatpp/network/virtual_/InterfaceTest.cpp @@ -59,12 +59,13 @@ namespace { OATPP_ASSERT(res == m_dataSample->getSize()); v_char8 buffer[100]; - auto stream = oatpp::data::stream::ChunkedBuffer::createShared(); - res = oatpp::data::stream::transfer(socket, stream, 2, buffer, 100); + oatpp::data::stream::ChunkedBuffer stream; + oatpp::data::stream::DefaultWriteCallback writeCallback(&stream); + res = oatpp::data::stream::transfer(socket, &writeCallback, 2, buffer, 100); OATPP_ASSERT(res == 2); - OATPP_ASSERT(stream->getSize() == res); - OATPP_ASSERT(stream->toString() == "OK"); + OATPP_ASSERT(stream.getSize() == res); + OATPP_ASSERT(stream.toString() == "OK"); //OATPP_LOGV("client", "finished - OK"); @@ -86,12 +87,13 @@ namespace { void run() { v_char8 buffer[100]; - auto stream = oatpp::data::stream::ChunkedBuffer::createShared(); - auto res = oatpp::data::stream::transfer(m_socket, stream, m_dataSample->getSize(), buffer, 100); + oatpp::data::stream::ChunkedBuffer stream; + oatpp::data::stream::DefaultWriteCallback writeCallback(&stream); + auto res = oatpp::data::stream::transfer(m_socket, &writeCallback, m_dataSample->getSize(), buffer, 100); OATPP_ASSERT(res == m_dataSample->getSize()); - OATPP_ASSERT(stream->getSize() == res); - OATPP_ASSERT(stream->toString() == m_dataSample); + OATPP_ASSERT(stream.getSize() == res); + OATPP_ASSERT(stream.toString() == m_dataSample); res = oatpp::data::stream::writeExactSizeData(m_socket.get(), "OK", 2); diff --git a/test/oatpp/web/mime/multipart/StatefulParserTest.cpp b/test/oatpp/web/mime/multipart/StatefulParserTest.cpp index 7b3e7a63..69b3ee91 100644 --- a/test/oatpp/web/mime/multipart/StatefulParserTest.cpp +++ b/test/oatpp/web/mime/multipart/StatefulParserTest.cpp @@ -110,10 +110,11 @@ namespace { v_int32 bufferSize = 16; v_char8 buffer[bufferSize]; - auto stream = oatpp::data::stream::ChunkedBuffer::createShared(); - oatpp::data::stream::transfer(part->getInputStream(), stream, 0, buffer, bufferSize); + oatpp::data::stream::ChunkedBuffer stream; + oatpp::data::stream::DefaultWriteCallback writeCallback(&stream); + oatpp::data::stream::transfer(part->getInputStream(), &writeCallback, 0, buffer, bufferSize); - oatpp::String readData = stream->toString(); + oatpp::String readData = stream.toString(); OATPP_ASSERT(readData == part->getInMemoryData());