Better mime::multipart::StatefulParser. Correct async parsing.

This commit is contained in:
lganzzzo 2019-07-29 23:22:21 +04:00
parent babec3640f
commit 678b0b837b
5 changed files with 77 additions and 39 deletions

View File

@ -52,11 +52,37 @@ void InMemoryParser::onPartData(p_char8 data, oatpp::data::v_io_size size) {
}
}
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// AsyncInMemoryParser
AsyncInMemoryParser::AsyncInMemoryParser(Multipart* multipart)
: m_multipart(multipart)
{}
async::CoroutineStarter AsyncInMemoryParser::onPartHeadersAsync(const Headers& partHeaders) {
m_currPart = std::make_shared<Part>(partHeaders);
return nullptr;
}
async::CoroutineStarter AsyncInMemoryParser::onPartDataAsync(p_char8 data, oatpp::data::v_io_size size) {
if(size > 0) {
m_buffer.write(data, size);
} else {
auto fullData = m_buffer.toString();
m_buffer.clear();
auto stream = std::make_shared<data::stream::BufferInputStream>(fullData.getPtr(), fullData->getData(), fullData->getSize());
m_currPart->setDataInfo(stream, fullData, fullData->getSize());
m_multipart->addPart(m_currPart);
m_currPart = nullptr;
}
return nullptr;
}
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// InMemoryReader
Reader::Reader(Multipart* multipart)
: m_parser(multipart->getBoundary(), std::make_shared<InMemoryParser>(multipart))
: m_parser(multipart->getBoundary(), std::make_shared<InMemoryParser>(multipart), nullptr)
{}
data::v_io_size Reader::write(const void *data, data::v_io_size count) {
@ -68,17 +94,15 @@ data::v_io_size Reader::write(const void *data, data::v_io_size count) {
// AsyncReader
AsyncReader::AsyncReader(const std::shared_ptr<Multipart>& multipart)
: m_parser(multipart->getBoundary(), std::make_shared<InMemoryParser>(multipart.get()))
: m_parser(multipart->getBoundary(), nullptr, std::make_shared<AsyncInMemoryParser>(multipart.get()))
, m_multipart(multipart)
{}
oatpp::async::Action AsyncReader::writeAsyncInline(oatpp::async::AbstractCoroutine* coroutine,
oatpp::data::stream::AsyncInlineWriteData& inlineData,
oatpp::async::Action&& nextAction)
oatpp::data::stream::AsyncInlineWriteData& inlineData,
oatpp::async::Action&& nextAction)
{
m_parser.parseNext((p_char8) inlineData.currBufferPtr, inlineData.bytesLeft);
inlineData.setEof();
return std::forward<async::Action>(nextAction);
return m_parser.parseNextAsyncInline(coroutine, inlineData, std::forward<async::Action>(nextAction));
}
}}}}

View File

@ -51,6 +51,30 @@ public:
void onPartHeaders(const Headers& partHeaders) override;
void onPartData(p_char8 data, oatpp::data::v_io_size size) override;
};
/**
* Async In memory multipart parser. <br>
* Extends - &id:oatpp::web::mime::multipart::StatefulParser::AsyncListener;.
*/
class AsyncInMemoryParser : public StatefulParser::AsyncListener {
private:
Multipart* m_multipart;
std::shared_ptr<Part> m_currPart;
data::stream::ChunkedBuffer m_buffer;
public:
/**
* Constructor.
* @param multipart - pointer to &id:oatpp::web::mime::multipart::Multipart;.
*/
AsyncInMemoryParser(Multipart* multipart);
async::CoroutineStarter onPartHeadersAsync(const Headers& partHeaders) override;
async::CoroutineStarter onPartDataAsync(p_char8 data, oatpp::data::v_io_size size) override;
};
/**

View File

@ -100,7 +100,9 @@ StatefulParser::ListenerCall::operator bool() const {
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// StatefulParser
StatefulParser::StatefulParser(const oatpp::String& boundary, const std::shared_ptr<Listener>& listener)
StatefulParser::StatefulParser(const oatpp::String& boundary,
const std::shared_ptr<Listener>& listener,
const std::shared_ptr<AsyncListener>& asyncListener)
: m_state(STATE_BOUNDARY)
, m_currPartIndex(0)
, m_currBoundaryCharIndex(0)
@ -112,6 +114,7 @@ StatefulParser::StatefulParser(const oatpp::String& boundary, const std::shared_
, m_nextBoundarySample("\r\n--" + boundary)
, m_maxPartHeadersSize(4092)
, m_listener(listener)
, m_asyncListener(asyncListener)
{}
void StatefulParser::parseHeaders(Headers& headers) {
@ -341,22 +344,6 @@ v_int32 StatefulParser::parseNext(p_char8 data, v_int32 size) {
}
async::CoroutineStarter StatefulParser::parseNext_BoundaryAsync(data::stream::AsyncInlineWriteData* inlineData) {
return nullptr;
}
async::CoroutineStarter StatefulParser::parseNext_AfterBoundaryAsync(data::stream::AsyncInlineWriteData* inlineData) {
return nullptr;
}
async::CoroutineStarter StatefulParser::parseNext_HeadersAsync(data::stream::AsyncInlineWriteData* inlineData) {
return nullptr;
}
async::CoroutineStarter StatefulParser::parseNext_DataAsync(data::stream::AsyncInlineWriteData* inlineData) {
return nullptr;
}
async::Action StatefulParser::parseNextAsyncInline(async::AbstractCoroutine* coroutine,
data::stream::AsyncInlineWriteData& inlineData,
async::Action&& nextAction)
@ -377,22 +364,29 @@ async::Action StatefulParser::parseNextAsyncInline(async::AbstractCoroutine* cor
if(m_inlineData->bytesLeft > 0) {
ListenerCall listenerCall;
switch (m_this->m_state) {
case STATE_BOUNDARY:
return m_this->parseNext_BoundaryAsync(m_inlineData).next(yieldTo(&ParseCoroutine::act));
listenerCall = m_this->parseNext_Boundary(*m_inlineData);
break;
case STATE_AFTER_BOUNDARY:
return m_this->parseNext_AfterBoundaryAsync(m_inlineData).next(yieldTo(&ParseCoroutine::act));
m_this->parseNext_AfterBoundary(*m_inlineData);
break;
case STATE_HEADERS:
return m_this->parseNext_HeadersAsync(m_inlineData).next(yieldTo(&ParseCoroutine::act));
listenerCall = m_this->parseNext_Headers(*m_inlineData);
break;
case STATE_DATA:
return m_this->parseNext_DataAsync(m_inlineData).next(yieldTo(&ParseCoroutine::act));
listenerCall = m_this->parseNext_Data(*m_inlineData);
break;
case STATE_DONE:
return finish();
default:
throw std::runtime_error(
"[oatpp::web::mime::multipart::StatefulParser::parseNext()]: Error. Invalid state.");
throw std::runtime_error("[oatpp::web::mime::multipart::StatefulParser::parseNext()]: Error. Invalid state.");
}
return listenerCall.callAsync(m_this).next(yieldTo(&ParseCoroutine::act));
}
return finish();

View File

@ -196,21 +196,17 @@ private:
ListenerCall parseNext_Headers(data::stream::AsyncInlineWriteData& inlineData);
ListenerCall parseNext_Data(data::stream::AsyncInlineWriteData& inlineData);
private:
async::CoroutineStarter parseNext_BoundaryAsync(data::stream::AsyncInlineWriteData* inlineData);
async::CoroutineStarter parseNext_AfterBoundaryAsync(data::stream::AsyncInlineWriteData* inlineData);
async::CoroutineStarter parseNext_HeadersAsync(data::stream::AsyncInlineWriteData* inlineData);
async::CoroutineStarter parseNext_DataAsync(data::stream::AsyncInlineWriteData* inlineData);
public:
/**
* Constructor.
* @param boundary - value of multipart boundary.
* @param listener - &l:StatefulParser::Listener;.
* @param asyncListener - &l:StatefulParser::AsyncListener;.
*/
StatefulParser(const oatpp::String& boundary, const std::shared_ptr<Listener>& listener);
StatefulParser(const oatpp::String& boundary,
const std::shared_ptr<Listener>& listener,
const std::shared_ptr<AsyncListener>& asyncListener);
/**
* Parse next chunk of bytes.

View File

@ -60,7 +60,7 @@ namespace {
v_int32 step)
{
oatpp::web::mime::multipart::StatefulParser parser(boundary, listener);
oatpp::web::mime::multipart::StatefulParser parser(boundary, listener, nullptr);
oatpp::data::stream::BufferInputStream stream(text.getPtr(), text->getData(), text->getSize());
v_char8 buffer[step];