Refactor BodyDecoder::decode method to use stream::WriteCallback for Simple API

This commit is contained in:
lganzzzo 2019-07-13 21:34:40 +03:00
parent 6b99725db5
commit ad471079fe
8 changed files with 66 additions and 31 deletions

View File

@ -225,7 +225,7 @@ ConsistentOutputStream& operator << (ConsistentOutputStream& s, bool value) {
}
oatpp::data::v_io_size transfer(const std::shared_ptr<InputStream>& fromStream,
const std::shared_ptr<OutputStream>& toStream,
WriteCallback* writeCallback,
oatpp::data::v_io_size transferSize,
void* buffer,
oatpp::data::v_io_size bufferSize)
@ -240,11 +240,23 @@ oatpp::data::v_io_size transfer(const std::shared_ptr<InputStream>& fromStream,
}
auto readResult = fromStream->read(buffer, desiredReadCount);
if(readResult > 0) {
auto writeReasul = writeExactSizeData(toStream.get(), buffer, readResult);
if(writeReasul != readResult) {
throw std::runtime_error("[oatpp::data::stream::transfer()]: Unknown Error. Can't continue transfer.");
p_char8 data = (p_char8) buffer;
oatpp::data::v_io_size bytesLeft = readResult;
while(bytesLeft > 0) {
auto writeResult = writeCallback->write(data, bytesLeft);
if(writeResult > 0) {
data = &data[writeResult];
bytesLeft -= writeResult;
} else if (writeResult == data::IOError::RETRY || writeResult == data::IOError::WAIT_RETRY) {
continue;
} else {
throw std::runtime_error("[oatpp::data::stream::transfer()]: Unknown Error. Can't continue transfer.");
}
}
progress += readResult;
} else {
if(readResult == data::IOError::RETRY || readResult == data::IOError::WAIT_RETRY) {
continue;

View File

@ -401,12 +401,16 @@ public:
};
/**
* Read bytes from @fromStream" and write to @toStream" using @buffer of size @bufferSize
* Read bytes from `fromStream` and write to `writeCallback` using `buffer` of size `bufferSize`
* transfer up to transferSize or until error if transferSize == 0
* throws in case readCount != writeCount
* @param fromStream
* @param transferSize
* @param buffer
* @param bufferSize
* @return - amount of bytes actually transferred.
*/
oatpp::data::v_io_size transfer(const std::shared_ptr<InputStream>& fromStream,
const std::shared_ptr<OutputStream>& toStream,
WriteCallback* writeCallback,
oatpp::data::v_io_size transferSize,
void* buffer,
oatpp::data::v_io_size bufferSize);

View File

@ -24,8 +24,6 @@
#include "BodyDecoder.hpp"
namespace oatpp { namespace web { namespace protocol { namespace http { namespace incoming {
BodyDecoder::ToStringDecoder::ToStringDecoder(const BodyDecoder* decoder,
@ -37,6 +35,14 @@ BodyDecoder::ToStringDecoder::ToStringDecoder(const BodyDecoder* decoder,
, m_chunkedBuffer(oatpp::data::stream::ChunkedBuffer::createShared())
{}
void BodyDecoder::decode(const Headers& headers,
const std::shared_ptr<oatpp::data::stream::InputStream>& bodyStream,
const std::shared_ptr<oatpp::data::stream::OutputStream>& toStream) const
{
oatpp::data::stream::DefaultWriteCallback callback(toStream.get());
decode(headers, bodyStream, &callback);
}
async::Action BodyDecoder::ToStringDecoder::act() {
return m_decoder->decodeAsync(m_headers, m_bodyStream, m_chunkedBuffer).next(yieldTo(&ToStringDecoder::onDecoded));
}

View File

@ -104,11 +104,21 @@ public:
* Implement this method! Decode bodyStream and write decoded data to toStream.
* @param headers - Headers map. &id:oatpp::web::protocol::http::Headers;.
* @param bodyStream - `std::shared_ptr` to &id:oatpp::data::stream::InputStream;.
* @param toStream - `std::shared_ptr` to &id:oatpp::data::stream::OutputStream;.
* @param writeCallback - &id:oatpp::data::stream::WriteCallback;.
*/
virtual void decode(const Headers& headers,
const std::shared_ptr<oatpp::data::stream::InputStream>& bodyStream,
const std::shared_ptr<oatpp::data::stream::OutputStream>& toStream) const = 0;
oatpp::data::stream::WriteCallback* writeCallback) const = 0;
/**
* Decode using &id:oatpp::data::stream::DefaultWriteCallback;.
* @param headers - Headers map. &id:oatpp::web::protocol::http::Headers;.
* @param bodyStream - `std::shared_ptr` to &id:oatpp::data::stream::InputStream;.
* @param toStream - `std::shared_ptr` to &id:oatpp::data::stream::OutputStream;.
*/
void decode(const Headers& headers,
const std::shared_ptr<oatpp::data::stream::InputStream>& bodyStream,
const std::shared_ptr<oatpp::data::stream::OutputStream>& toStream) const;
/**
* Implement this method! Same as &l:BodyDecoder::decode (); but Async.

View File

@ -56,7 +56,7 @@ data::v_io_size SimpleBodyDecoder::readLine(const std::shared_ptr<oatpp::data::s
}
void SimpleBodyDecoder::doChunkedDecoding(const std::shared_ptr<oatpp::data::stream::InputStream>& fromStream,
const std::shared_ptr<oatpp::data::stream::OutputStream>& toStream) {
oatpp::data::stream::WriteCallback* writeCallback) {
auto buffer = oatpp::data::buffer::IOBuffer::createShared();
@ -74,7 +74,7 @@ void SimpleBodyDecoder::doChunkedDecoding(const std::shared_ptr<oatpp::data::str
countToRead = strtol((const char*)lineBuffer, nullptr, 16);
if(countToRead > 0) {
oatpp::data::stream::transfer(fromStream, toStream, countToRead, buffer->getData(), buffer->getSize());
oatpp::data::stream::transfer(fromStream, writeCallback, countToRead, buffer->getData(), buffer->getSize());
}
fromStream->read(lineBuffer, 2); // just skip "\r\n"
@ -85,11 +85,11 @@ void SimpleBodyDecoder::doChunkedDecoding(const std::shared_ptr<oatpp::data::str
void SimpleBodyDecoder::decode(const Headers& headers,
const std::shared_ptr<oatpp::data::stream::InputStream>& bodyStream,
const std::shared_ptr<oatpp::data::stream::OutputStream>& toStream) const {
oatpp::data::stream::WriteCallback* writeCallback) const {
auto transferEncodingIt = headers.find(Header::TRANSFER_ENCODING);
if(transferEncodingIt != headers.end() && transferEncodingIt->second == Header::Value::TRANSFER_ENCODING_CHUNKED) {
doChunkedDecoding(bodyStream, toStream);
doChunkedDecoding(bodyStream, writeCallback);
} else {
oatpp::data::v_io_size contentLength = 0;
auto contentLengthStrIt = headers.find(Header::CONTENT_LENGTH);
@ -103,7 +103,7 @@ void SimpleBodyDecoder::decode(const Headers& headers,
}
auto buffer = oatpp::data::buffer::IOBuffer::createShared();
if(contentLength > 0) {
oatpp::data::stream::transfer(bodyStream, toStream, contentLength, buffer->getData(), buffer->getSize());
oatpp::data::stream::transfer(bodyStream, writeCallback, contentLength, buffer->getData(), buffer->getSize());
}
}
}

View File

@ -38,21 +38,21 @@ private:
p_char8 buffer,
data::v_io_size maxLineSize);
static void doChunkedDecoding(const std::shared_ptr<oatpp::data::stream::InputStream>& from,
const std::shared_ptr<oatpp::data::stream::OutputStream>& toStream);
oatpp::data::stream::WriteCallback* writeCallback);
static oatpp::async::CoroutineStarter doChunkedDecodingAsync(const std::shared_ptr<oatpp::data::stream::InputStream>& fromStream,
const std::shared_ptr<oatpp::data::stream::OutputStream>& toStream);
const std::shared_ptr<oatpp::data::stream::OutputStream>& toStream);
public:
/**
* Decode bodyStream and write decoded data to toStream.
* @param headers - Headers map. &id:oatpp::web::protocol::http::Headers;.
* @param bodyStream - `std::shared_ptr` to &id:oatpp::data::stream::InputStream;.
* @param toStream - `std::shared_ptr` to &id:oatpp::data::stream::OutputStream;.
* @param writeCallback - &id:oatpp::data::stream::WriteCallback;.
*/
void decode(const Headers& headers,
const std::shared_ptr<oatpp::data::stream::InputStream>& bodyStream,
const std::shared_ptr<oatpp::data::stream::OutputStream>& toStream) const override;
oatpp::data::stream::WriteCallback* writeCallback) const override;
/**
* Same as &l:SimpleBodyDecoder::decode (); but Async.

View File

@ -59,12 +59,13 @@ namespace {
OATPP_ASSERT(res == m_dataSample->getSize());
v_char8 buffer[100];
auto stream = oatpp::data::stream::ChunkedBuffer::createShared();
res = oatpp::data::stream::transfer(socket, stream, 2, buffer, 100);
oatpp::data::stream::ChunkedBuffer stream;
oatpp::data::stream::DefaultWriteCallback writeCallback(&stream);
res = oatpp::data::stream::transfer(socket, &writeCallback, 2, buffer, 100);
OATPP_ASSERT(res == 2);
OATPP_ASSERT(stream->getSize() == res);
OATPP_ASSERT(stream->toString() == "OK");
OATPP_ASSERT(stream.getSize() == res);
OATPP_ASSERT(stream.toString() == "OK");
//OATPP_LOGV("client", "finished - OK");
@ -86,12 +87,13 @@ namespace {
void run() {
v_char8 buffer[100];
auto stream = oatpp::data::stream::ChunkedBuffer::createShared();
auto res = oatpp::data::stream::transfer(m_socket, stream, m_dataSample->getSize(), buffer, 100);
oatpp::data::stream::ChunkedBuffer stream;
oatpp::data::stream::DefaultWriteCallback writeCallback(&stream);
auto res = oatpp::data::stream::transfer(m_socket, &writeCallback, m_dataSample->getSize(), buffer, 100);
OATPP_ASSERT(res == m_dataSample->getSize());
OATPP_ASSERT(stream->getSize() == res);
OATPP_ASSERT(stream->toString() == m_dataSample);
OATPP_ASSERT(stream.getSize() == res);
OATPP_ASSERT(stream.toString() == m_dataSample);
res = oatpp::data::stream::writeExactSizeData(m_socket.get(), "OK", 2);

View File

@ -110,10 +110,11 @@ namespace {
v_int32 bufferSize = 16;
v_char8 buffer[bufferSize];
auto stream = oatpp::data::stream::ChunkedBuffer::createShared();
oatpp::data::stream::transfer(part->getInputStream(), stream, 0, buffer, bufferSize);
oatpp::data::stream::ChunkedBuffer stream;
oatpp::data::stream::DefaultWriteCallback writeCallback(&stream);
oatpp::data::stream::transfer(part->getInputStream(), &writeCallback, 0, buffer, bufferSize);
oatpp::String readData = stream->toString();
oatpp::String readData = stream.toString();
OATPP_ASSERT(readData == part->getInMemoryData());