From 678b0b837bcb28b1a82ccbba10cd6228fed02a7f Mon Sep 17 00:00:00 2001 From: lganzzzo Date: Mon, 29 Jul 2019 23:22:21 +0400 Subject: [PATCH] Better mime::multipart::StatefulParser. Correct async parsing. --- src/oatpp/web/mime/multipart/Reader.cpp | 38 ++++++++++++++---- src/oatpp/web/mime/multipart/Reader.hpp | 24 +++++++++++ .../web/mime/multipart/StatefulParser.cpp | 40 ++++++++----------- .../web/mime/multipart/StatefulParser.hpp | 12 ++---- .../web/mime/multipart/StatefulParserTest.cpp | 2 +- 5 files changed, 77 insertions(+), 39 deletions(-) diff --git a/src/oatpp/web/mime/multipart/Reader.cpp b/src/oatpp/web/mime/multipart/Reader.cpp index 2301db09..21126f0f 100644 --- a/src/oatpp/web/mime/multipart/Reader.cpp +++ b/src/oatpp/web/mime/multipart/Reader.cpp @@ -52,11 +52,37 @@ void InMemoryParser::onPartData(p_char8 data, oatpp::data::v_io_size size) { } } +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// +// AsyncInMemoryParser + +AsyncInMemoryParser::AsyncInMemoryParser(Multipart* multipart) + : m_multipart(multipart) +{} + +async::CoroutineStarter AsyncInMemoryParser::onPartHeadersAsync(const Headers& partHeaders) { + m_currPart = std::make_shared(partHeaders); + return nullptr; +} + +async::CoroutineStarter AsyncInMemoryParser::onPartDataAsync(p_char8 data, oatpp::data::v_io_size size) { + if(size > 0) { + m_buffer.write(data, size); + } else { + auto fullData = m_buffer.toString(); + m_buffer.clear(); + auto stream = std::make_shared(fullData.getPtr(), fullData->getData(), fullData->getSize()); + m_currPart->setDataInfo(stream, fullData, fullData->getSize()); + m_multipart->addPart(m_currPart); + m_currPart = nullptr; + } + return nullptr; +} + //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// // InMemoryReader Reader::Reader(Multipart* multipart) - : m_parser(multipart->getBoundary(), std::make_shared(multipart)) + : m_parser(multipart->getBoundary(), std::make_shared(multipart), nullptr) {} data::v_io_size Reader::write(const void *data, data::v_io_size count) { @@ -68,17 +94,15 @@ data::v_io_size Reader::write(const void *data, data::v_io_size count) { // AsyncReader AsyncReader::AsyncReader(const std::shared_ptr& multipart) - : m_parser(multipart->getBoundary(), std::make_shared(multipart.get())) + : m_parser(multipart->getBoundary(), nullptr, std::make_shared(multipart.get())) , m_multipart(multipart) {} oatpp::async::Action AsyncReader::writeAsyncInline(oatpp::async::AbstractCoroutine* coroutine, - oatpp::data::stream::AsyncInlineWriteData& inlineData, - oatpp::async::Action&& nextAction) + oatpp::data::stream::AsyncInlineWriteData& inlineData, + oatpp::async::Action&& nextAction) { - m_parser.parseNext((p_char8) inlineData.currBufferPtr, inlineData.bytesLeft); - inlineData.setEof(); - return std::forward(nextAction); + return m_parser.parseNextAsyncInline(coroutine, inlineData, std::forward(nextAction)); } }}}} diff --git a/src/oatpp/web/mime/multipart/Reader.hpp b/src/oatpp/web/mime/multipart/Reader.hpp index b9276035..ee5e73ec 100644 --- a/src/oatpp/web/mime/multipart/Reader.hpp +++ b/src/oatpp/web/mime/multipart/Reader.hpp @@ -51,6 +51,30 @@ public: void onPartHeaders(const Headers& partHeaders) override; void onPartData(p_char8 data, oatpp::data::v_io_size size) override; + +}; + +/** + * Async In memory multipart parser.
+ * Extends - &id:oatpp::web::mime::multipart::StatefulParser::AsyncListener;. + */ +class AsyncInMemoryParser : public StatefulParser::AsyncListener { +private: + Multipart* m_multipart; + std::shared_ptr m_currPart; + data::stream::ChunkedBuffer m_buffer; +public: + + /** + * Constructor. + * @param multipart - pointer to &id:oatpp::web::mime::multipart::Multipart;. + */ + AsyncInMemoryParser(Multipart* multipart); + + async::CoroutineStarter onPartHeadersAsync(const Headers& partHeaders) override; + + async::CoroutineStarter onPartDataAsync(p_char8 data, oatpp::data::v_io_size size) override; + }; /** diff --git a/src/oatpp/web/mime/multipart/StatefulParser.cpp b/src/oatpp/web/mime/multipart/StatefulParser.cpp index 9ce691e7..312520a3 100644 --- a/src/oatpp/web/mime/multipart/StatefulParser.cpp +++ b/src/oatpp/web/mime/multipart/StatefulParser.cpp @@ -100,7 +100,9 @@ StatefulParser::ListenerCall::operator bool() const { //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// // StatefulParser -StatefulParser::StatefulParser(const oatpp::String& boundary, const std::shared_ptr& listener) +StatefulParser::StatefulParser(const oatpp::String& boundary, + const std::shared_ptr& listener, + const std::shared_ptr& asyncListener) : m_state(STATE_BOUNDARY) , m_currPartIndex(0) , m_currBoundaryCharIndex(0) @@ -112,6 +114,7 @@ StatefulParser::StatefulParser(const oatpp::String& boundary, const std::shared_ , m_nextBoundarySample("\r\n--" + boundary) , m_maxPartHeadersSize(4092) , m_listener(listener) + , m_asyncListener(asyncListener) {} void StatefulParser::parseHeaders(Headers& headers) { @@ -341,22 +344,6 @@ v_int32 StatefulParser::parseNext(p_char8 data, v_int32 size) { } -async::CoroutineStarter StatefulParser::parseNext_BoundaryAsync(data::stream::AsyncInlineWriteData* inlineData) { - return nullptr; -} - -async::CoroutineStarter StatefulParser::parseNext_AfterBoundaryAsync(data::stream::AsyncInlineWriteData* inlineData) { - return nullptr; -} - -async::CoroutineStarter StatefulParser::parseNext_HeadersAsync(data::stream::AsyncInlineWriteData* inlineData) { - return nullptr; -} - -async::CoroutineStarter StatefulParser::parseNext_DataAsync(data::stream::AsyncInlineWriteData* inlineData) { - return nullptr; -} - async::Action StatefulParser::parseNextAsyncInline(async::AbstractCoroutine* coroutine, data::stream::AsyncInlineWriteData& inlineData, async::Action&& nextAction) @@ -377,22 +364,29 @@ async::Action StatefulParser::parseNextAsyncInline(async::AbstractCoroutine* cor if(m_inlineData->bytesLeft > 0) { + ListenerCall listenerCall; + switch (m_this->m_state) { case STATE_BOUNDARY: - return m_this->parseNext_BoundaryAsync(m_inlineData).next(yieldTo(&ParseCoroutine::act)); + listenerCall = m_this->parseNext_Boundary(*m_inlineData); + break; case STATE_AFTER_BOUNDARY: - return m_this->parseNext_AfterBoundaryAsync(m_inlineData).next(yieldTo(&ParseCoroutine::act)); + m_this->parseNext_AfterBoundary(*m_inlineData); + break; case STATE_HEADERS: - return m_this->parseNext_HeadersAsync(m_inlineData).next(yieldTo(&ParseCoroutine::act)); + listenerCall = m_this->parseNext_Headers(*m_inlineData); + break; case STATE_DATA: - return m_this->parseNext_DataAsync(m_inlineData).next(yieldTo(&ParseCoroutine::act)); + listenerCall = m_this->parseNext_Data(*m_inlineData); + break; case STATE_DONE: return finish(); default: - throw std::runtime_error( - "[oatpp::web::mime::multipart::StatefulParser::parseNext()]: Error. Invalid state."); + throw std::runtime_error("[oatpp::web::mime::multipart::StatefulParser::parseNext()]: Error. Invalid state."); } + return listenerCall.callAsync(m_this).next(yieldTo(&ParseCoroutine::act)); + } return finish(); diff --git a/src/oatpp/web/mime/multipart/StatefulParser.hpp b/src/oatpp/web/mime/multipart/StatefulParser.hpp index 1cc71769..8aeb1a98 100644 --- a/src/oatpp/web/mime/multipart/StatefulParser.hpp +++ b/src/oatpp/web/mime/multipart/StatefulParser.hpp @@ -196,21 +196,17 @@ private: ListenerCall parseNext_Headers(data::stream::AsyncInlineWriteData& inlineData); ListenerCall parseNext_Data(data::stream::AsyncInlineWriteData& inlineData); -private: - - async::CoroutineStarter parseNext_BoundaryAsync(data::stream::AsyncInlineWriteData* inlineData); - async::CoroutineStarter parseNext_AfterBoundaryAsync(data::stream::AsyncInlineWriteData* inlineData); - async::CoroutineStarter parseNext_HeadersAsync(data::stream::AsyncInlineWriteData* inlineData); - async::CoroutineStarter parseNext_DataAsync(data::stream::AsyncInlineWriteData* inlineData); - public: /** * Constructor. * @param boundary - value of multipart boundary. * @param listener - &l:StatefulParser::Listener;. + * @param asyncListener - &l:StatefulParser::AsyncListener;. */ - StatefulParser(const oatpp::String& boundary, const std::shared_ptr& listener); + StatefulParser(const oatpp::String& boundary, + const std::shared_ptr& listener, + const std::shared_ptr& asyncListener); /** * Parse next chunk of bytes. diff --git a/test/oatpp/web/mime/multipart/StatefulParserTest.cpp b/test/oatpp/web/mime/multipart/StatefulParserTest.cpp index 32ff3b8e..12facaf7 100644 --- a/test/oatpp/web/mime/multipart/StatefulParserTest.cpp +++ b/test/oatpp/web/mime/multipart/StatefulParserTest.cpp @@ -60,7 +60,7 @@ namespace { v_int32 step) { - oatpp::web::mime::multipart::StatefulParser parser(boundary, listener); + oatpp::web::mime::multipart::StatefulParser parser(boundary, listener, nullptr); oatpp::data::stream::BufferInputStream stream(text.getPtr(), text->getData(), text->getSize()); v_char8 buffer[step];