Refactor BodyDecoder::decode method to use stream::AsyncWriteCallback for Async API

This commit is contained in:
lganzzzo 2019-07-14 01:54:41 +03:00
parent ad471079fe
commit 0c9d53744f
6 changed files with 44 additions and 25 deletions

View File

@ -105,7 +105,7 @@ data::v_io_size DefaultWriteCallback::write(const void *data, data::v_io_size co
//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// DefaultAsyncWriteCallback // DefaultAsyncWriteCallback
DefaultAsyncWriteCallback::DefaultAsyncWriteCallback(OutputStream* stream) DefaultAsyncWriteCallback::DefaultAsyncWriteCallback(const std::shared_ptr<OutputStream>& stream)
: m_stream(stream) : m_stream(stream)
{} {}
@ -114,7 +114,7 @@ oatpp::async::Action DefaultAsyncWriteCallback::writeAsyncInline(oatpp::async::A
data::v_io_size& bytesLeft, data::v_io_size& bytesLeft,
oatpp::async::Action&& nextAction) oatpp::async::Action&& nextAction)
{ {
return writeExactSizeDataAsyncInline(coroutine, m_stream, currBufferPtr, bytesLeft, std::forward<oatpp::async::Action>(nextAction)); return writeExactSizeDataAsyncInline(coroutine, m_stream.get(), currBufferPtr, bytesLeft, std::forward<oatpp::async::Action>(nextAction));
} }
//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
@ -270,7 +270,7 @@ oatpp::data::v_io_size transfer(const std::shared_ptr<InputStream>& fromStream,
} }
oatpp::async::CoroutineStarter transferAsync(const std::shared_ptr<InputStream>& fromStream, oatpp::async::CoroutineStarter transferAsync(const std::shared_ptr<InputStream>& fromStream,
const std::shared_ptr<OutputStream>& toStream, const std::shared_ptr<AsyncWriteCallback>& writeCallback,
oatpp::data::v_io_size transferSize, oatpp::data::v_io_size transferSize,
const std::shared_ptr<oatpp::data::buffer::IOBuffer>& buffer) const std::shared_ptr<oatpp::data::buffer::IOBuffer>& buffer)
{ {
@ -278,7 +278,7 @@ oatpp::async::CoroutineStarter transferAsync(const std::shared_ptr<InputStream>&
class TransferCoroutine : public oatpp::async::Coroutine<TransferCoroutine> { class TransferCoroutine : public oatpp::async::Coroutine<TransferCoroutine> {
private: private:
std::shared_ptr<InputStream> m_fromStream; std::shared_ptr<InputStream> m_fromStream;
std::shared_ptr<OutputStream> m_toStream; std::shared_ptr<AsyncWriteCallback> m_writeCallback;
oatpp::data::v_io_size m_transferSize; oatpp::data::v_io_size m_transferSize;
oatpp::data::v_io_size m_progress; oatpp::data::v_io_size m_progress;
std::shared_ptr<oatpp::data::buffer::IOBuffer> m_buffer; std::shared_ptr<oatpp::data::buffer::IOBuffer> m_buffer;
@ -291,11 +291,11 @@ oatpp::async::CoroutineStarter transferAsync(const std::shared_ptr<InputStream>&
public: public:
TransferCoroutine(const std::shared_ptr<InputStream>& fromStream, TransferCoroutine(const std::shared_ptr<InputStream>& fromStream,
const std::shared_ptr<OutputStream>& toStream, const std::shared_ptr<AsyncWriteCallback>& writeCallback,
oatpp::data::v_io_size transferSize, oatpp::data::v_io_size transferSize,
const std::shared_ptr<oatpp::data::buffer::IOBuffer>& buffer) const std::shared_ptr<oatpp::data::buffer::IOBuffer>& buffer)
: m_fromStream(fromStream) : m_fromStream(fromStream)
, m_toStream(toStream) , m_writeCallback(writeCallback)
, m_transferSize(transferSize) , m_transferSize(transferSize)
, m_progress(0) , m_progress(0)
, m_buffer(buffer) , m_buffer(buffer)
@ -336,7 +336,7 @@ oatpp::async::CoroutineStarter transferAsync(const std::shared_ptr<InputStream>&
} }
Action doWrite() { Action doWrite() {
return oatpp::data::stream::writeExactSizeDataAsyncInline(this, m_toStream.get(), m_writeBufferPtr, m_bytesLeft, yieldTo(&TransferCoroutine::act)); return m_writeCallback->writeAsyncInline(this, m_writeBufferPtr, m_bytesLeft, yieldTo(&TransferCoroutine::act));
} }
Action handleError(const std::shared_ptr<const Error>& error) override { Action handleError(const std::shared_ptr<const Error>& error) override {
@ -348,7 +348,7 @@ oatpp::async::CoroutineStarter transferAsync(const std::shared_ptr<InputStream>&
}; };
return TransferCoroutine::start(fromStream, toStream, transferSize, buffer); return TransferCoroutine::start(fromStream, writeCallback, transferSize, buffer);
} }

View File

@ -365,14 +365,14 @@ public:
*/ */
class DefaultAsyncWriteCallback : public AsyncWriteCallback { class DefaultAsyncWriteCallback : public AsyncWriteCallback {
private: private:
OutputStream* m_stream; std::shared_ptr<OutputStream> m_stream;
public: public:
/** /**
* Constructor. * Constructor.
* @param stream - stream to write to. * @param stream - stream to write to.
*/ */
DefaultAsyncWriteCallback(OutputStream* stream); DefaultAsyncWriteCallback(const std::shared_ptr<OutputStream>& stream);
/** /**
* Async-Inline write callback. * Async-Inline write callback.
@ -420,7 +420,7 @@ oatpp::data::v_io_size transfer(const std::shared_ptr<InputStream>& fromStream,
* Same as transfer but asynchronous * Same as transfer but asynchronous
*/ */
oatpp::async::CoroutineStarter transferAsync(const std::shared_ptr<InputStream>& fromStream, oatpp::async::CoroutineStarter transferAsync(const std::shared_ptr<InputStream>& fromStream,
const std::shared_ptr<OutputStream>& toStream, const std::shared_ptr<AsyncWriteCallback>& writeCallback,
oatpp::data::v_io_size transferSize, oatpp::data::v_io_size transferSize,
const std::shared_ptr<oatpp::data::buffer::IOBuffer>& buffer); const std::shared_ptr<oatpp::data::buffer::IOBuffer>& buffer);

View File

@ -43,6 +43,14 @@ void BodyDecoder::decode(const Headers& headers,
decode(headers, bodyStream, &callback); decode(headers, bodyStream, &callback);
} }
oatpp::async::CoroutineStarter BodyDecoder::decodeAsync(const Headers& headers,
const std::shared_ptr<oatpp::data::stream::InputStream>& bodyStream,
const std::shared_ptr<oatpp::data::stream::OutputStream>& toStream) const
{
auto callback = std::make_shared<oatpp::data::stream::DefaultAsyncWriteCallback>(toStream);
return decodeAsync(headers, bodyStream, callback);
}
async::Action BodyDecoder::ToStringDecoder::act() { async::Action BodyDecoder::ToStringDecoder::act() {
return m_decoder->decodeAsync(m_headers, m_bodyStream, m_chunkedBuffer).next(yieldTo(&ToStringDecoder::onDecoded)); return m_decoder->decodeAsync(m_headers, m_bodyStream, m_chunkedBuffer).next(yieldTo(&ToStringDecoder::onDecoded));
} }

View File

@ -124,12 +124,23 @@ public:
* Implement this method! Same as &l:BodyDecoder::decode (); but Async. * Implement this method! Same as &l:BodyDecoder::decode (); but Async.
* @param headers - Headers map. &id:oatpp::web::protocol::http::Headers;. * @param headers - Headers map. &id:oatpp::web::protocol::http::Headers;.
* @param bodyStream - `std::shared_ptr` to &id:oatpp::data::stream::InputStream;. * @param bodyStream - `std::shared_ptr` to &id:oatpp::data::stream::InputStream;.
* @param toStream - `std::shared_ptr` to &id:oatpp::data::stream::OutputStream;. * @param writeCallback - `std::shared_ptr` to &id:oatpp::data::stream::AsyncWriteCallback;.
* @return - &id:oatpp::async::CoroutineStarter;. * @return - &id:oatpp::async::CoroutineStarter;.
*/ */
virtual oatpp::async::CoroutineStarter decodeAsync(const Headers& headers, virtual oatpp::async::CoroutineStarter decodeAsync(const Headers& headers,
const std::shared_ptr<oatpp::data::stream::InputStream>& bodyStream, const std::shared_ptr<oatpp::data::stream::InputStream>& bodyStream,
const std::shared_ptr<oatpp::data::stream::OutputStream>& toStream) const = 0; const std::shared_ptr<oatpp::data::stream::AsyncWriteCallback>& writeCallback) const = 0;
/**
* Decode in asynchronous manner using &id:oatpp::data::stream::DefaultAsyncWriteCallback;.
* @param headers - Headers map. &id:oatpp::web::protocol::http::Headers;.
* @param bodyStream - `std::shared_ptr` to &id:oatpp::data::stream::InputStream;.
* @param toStream - `std::shared_ptr` to &id:oatpp::data::stream::OutputStream;.
* @return - &id:oatpp::async::CoroutineStarter;.
*/
oatpp::async::CoroutineStarter decodeAsync(const Headers& headers,
const std::shared_ptr<oatpp::data::stream::InputStream>& bodyStream,
const std::shared_ptr<oatpp::data::stream::OutputStream>& toStream) const;
/** /**
* Read body stream and decode it to string. * Read body stream and decode it to string.

View File

@ -111,14 +111,14 @@ void SimpleBodyDecoder::decode(const Headers& headers,
} }
oatpp::async::CoroutineStarter SimpleBodyDecoder::doChunkedDecodingAsync(const std::shared_ptr<oatpp::data::stream::InputStream>& fromStream, oatpp::async::CoroutineStarter SimpleBodyDecoder::doChunkedDecodingAsync(const std::shared_ptr<oatpp::data::stream::InputStream>& fromStream,
const std::shared_ptr<oatpp::data::stream::OutputStream>& toStream) { const std::shared_ptr<oatpp::data::stream::AsyncWriteCallback>& writeCallback) {
class ChunkedDecoder : public oatpp::async::Coroutine<ChunkedDecoder> { class ChunkedDecoder : public oatpp::async::Coroutine<ChunkedDecoder> {
private: private:
const v_int32 MAX_LINE_SIZE = 8; const v_int32 MAX_LINE_SIZE = 8;
private: private:
std::shared_ptr<oatpp::data::stream::InputStream> m_fromStream; std::shared_ptr<oatpp::data::stream::InputStream> m_fromStream;
std::shared_ptr<oatpp::data::stream::OutputStream> m_toStream; const std::shared_ptr<oatpp::data::stream::AsyncWriteCallback>& m_writeCallback;
std::shared_ptr<oatpp::data::buffer::IOBuffer> m_buffer = oatpp::data::buffer::IOBuffer::createShared(); std::shared_ptr<oatpp::data::buffer::IOBuffer> m_buffer = oatpp::data::buffer::IOBuffer::createShared();
v_int32 m_currLineLength; v_int32 m_currLineLength;
v_char8 m_lineChar; v_char8 m_lineChar;
@ -137,9 +137,9 @@ oatpp::async::CoroutineStarter SimpleBodyDecoder::doChunkedDecodingAsync(const s
public: public:
ChunkedDecoder(const std::shared_ptr<oatpp::data::stream::InputStream>& fromStream, ChunkedDecoder(const std::shared_ptr<oatpp::data::stream::InputStream>& fromStream,
const std::shared_ptr<oatpp::data::stream::OutputStream>& toStream) const std::shared_ptr<oatpp::data::stream::AsyncWriteCallback>& writeCallback)
: m_fromStream(fromStream) : m_fromStream(fromStream)
, m_toStream(toStream) , m_writeCallback(writeCallback)
{} {}
Action act() override { Action act() override {
@ -188,7 +188,7 @@ oatpp::async::CoroutineStarter SimpleBodyDecoder::doChunkedDecodingAsync(const s
data::v_io_size countToRead = strtol((const char*) m_lineBuffer, nullptr, 16); data::v_io_size countToRead = strtol((const char*) m_lineBuffer, nullptr, 16);
if(countToRead > 0) { if(countToRead > 0) {
prepareSkipRN(); prepareSkipRN();
return oatpp::data::stream::transferAsync(m_fromStream, m_toStream, countToRead, m_buffer).next(yieldTo(&ChunkedDecoder::skipRN)); return oatpp::data::stream::transferAsync(m_fromStream, m_writeCallback, countToRead, m_buffer).next(yieldTo(&ChunkedDecoder::skipRN));
} }
m_done = true; m_done = true;
prepareSkipRN(); prepareSkipRN();
@ -205,16 +205,16 @@ oatpp::async::CoroutineStarter SimpleBodyDecoder::doChunkedDecodingAsync(const s
}; };
return ChunkedDecoder::start(fromStream, toStream); return ChunkedDecoder::start(fromStream, writeCallback);
} }
oatpp::async::CoroutineStarter SimpleBodyDecoder::decodeAsync(const Headers& headers, oatpp::async::CoroutineStarter SimpleBodyDecoder::decodeAsync(const Headers& headers,
const std::shared_ptr<oatpp::data::stream::InputStream>& bodyStream, const std::shared_ptr<oatpp::data::stream::InputStream>& bodyStream,
const std::shared_ptr<oatpp::data::stream::OutputStream>& toStream) const { const std::shared_ptr<oatpp::data::stream::AsyncWriteCallback>& writeCallback) const {
auto transferEncodingIt = headers.find(Header::TRANSFER_ENCODING); auto transferEncodingIt = headers.find(Header::TRANSFER_ENCODING);
if(transferEncodingIt != headers.end() && transferEncodingIt->second == Header::Value::TRANSFER_ENCODING_CHUNKED) { if(transferEncodingIt != headers.end() && transferEncodingIt->second == Header::Value::TRANSFER_ENCODING_CHUNKED) {
return doChunkedDecodingAsync(bodyStream, toStream); return doChunkedDecodingAsync(bodyStream, writeCallback);
} else { } else {
oatpp::data::v_io_size contentLength = 0; oatpp::data::v_io_size contentLength = 0;
auto contentLengthStrIt = headers.find(Header::CONTENT_LENGTH); auto contentLengthStrIt = headers.find(Header::CONTENT_LENGTH);
@ -227,7 +227,7 @@ oatpp::async::CoroutineStarter SimpleBodyDecoder::decodeAsync(const Headers& hea
throw HttpError(http::Status::CODE_400, "Invalid 'Content-Length' Header"); throw HttpError(http::Status::CODE_400, "Invalid 'Content-Length' Header");
} }
if(contentLength > 0) { if(contentLength > 0) {
return oatpp::data::stream::transferAsync(bodyStream, toStream, contentLength, oatpp::data::buffer::IOBuffer::createShared()); return oatpp::data::stream::transferAsync(bodyStream, writeCallback, contentLength, oatpp::data::buffer::IOBuffer::createShared());
} else { } else {
return nullptr; return nullptr;
} }

View File

@ -41,7 +41,7 @@ private:
oatpp::data::stream::WriteCallback* writeCallback); oatpp::data::stream::WriteCallback* writeCallback);
static oatpp::async::CoroutineStarter doChunkedDecodingAsync(const std::shared_ptr<oatpp::data::stream::InputStream>& fromStream, static oatpp::async::CoroutineStarter doChunkedDecodingAsync(const std::shared_ptr<oatpp::data::stream::InputStream>& fromStream,
const std::shared_ptr<oatpp::data::stream::OutputStream>& toStream); const std::shared_ptr<oatpp::data::stream::AsyncWriteCallback>& writeCallback);
public: public:
/** /**
@ -58,12 +58,12 @@ public:
* Same as &l:SimpleBodyDecoder::decode (); but Async. * Same as &l:SimpleBodyDecoder::decode (); but Async.
* @param headers - Headers map. &id:oatpp::web::protocol::http::Headers;. * @param headers - Headers map. &id:oatpp::web::protocol::http::Headers;.
* @param bodyStream - `std::shared_ptr` to &id:oatpp::data::stream::InputStream;. * @param bodyStream - `std::shared_ptr` to &id:oatpp::data::stream::InputStream;.
* @param toStream - `std::shared_ptr` to &id:oatpp::data::stream::OutputStream;. * @param writeCallback - `std::shared_ptr` to &id:oatpp::data::stream::AsyncWriteCallback;.
* @return - &id:oatpp::async::CoroutineStarter;. * @return - &id:oatpp::async::CoroutineStarter;.
*/ */
oatpp::async::CoroutineStarter decodeAsync(const Headers& headers, oatpp::async::CoroutineStarter decodeAsync(const Headers& headers,
const std::shared_ptr<oatpp::data::stream::InputStream>& bodyStream, const std::shared_ptr<oatpp::data::stream::InputStream>& bodyStream,
const std::shared_ptr<oatpp::data::stream::OutputStream>& toStream) const override; const std::shared_ptr<oatpp::data::stream::AsyncWriteCallback>& writeCallback) const override;
}; };