diff --git a/src/oatpp/web/protocol/http/outgoing/MultipartBody.cpp b/src/oatpp/web/protocol/http/outgoing/MultipartBody.cpp index a1d600e0..d6484e66 100644 --- a/src/oatpp/web/protocol/http/outgoing/MultipartBody.cpp +++ b/src/oatpp/web/protocol/http/outgoing/MultipartBody.cpp @@ -32,16 +32,17 @@ v_io_size MultipartBody::readBody(void *buffer, v_buff_size count, async::Action const auto& stream = part->getInputStream(); if(!stream) { OATPP_LOGW("[oatpp::web::protocol::http::outgoing::MultipartBody::MultipartReadCallback::readBody()]", "Warning. Part has no input stream", m_state); - m_iterator.inc(action); return 0; } - auto res = stream->read(buffer, count, action); - if(res == 0) { - if(action.isNone()) { - m_iterator.inc(action); - } + return stream->read(buffer, count, action); +} + +v_io_size MultipartBody::incPart(async::Action& action) { + m_iterator.inc(action); + if(action.isNone()) { + return 0; } - return res; + return oatpp::IOError::RETRY_READ; } v_io_size MultipartBody::read(void *buffer, v_buff_size count, async::Action& action) { @@ -76,6 +77,10 @@ v_io_size MultipartBody::read(void *buffer, v_buff_size count, async::Action& ac res = readBody(currBufferPtr, bytesLeft, action); break; + case STATE_INC_PART: + res = incPart(action); + break; + default: OATPP_LOGE("[oatpp::web::protocol::http::outgoing::MultipartBody::MultipartReadCallback::read()]", "Error. Invalid state %d", m_state); return 0; @@ -93,11 +98,13 @@ v_io_size MultipartBody::read(void *buffer, v_buff_size count, async::Action& ac } m_state += 1; + + if(m_state == STATE_INC_PART && m_flushParts && bytesLeft < count) { + break; + } + if(m_state == STATE_ROUND) { m_state = 0; - if(m_flushImmediately) { - break; - } } } else if(action.isNone()) { @@ -167,13 +174,13 @@ v_io_size MultipartBody::readHeaders(const std::shared_ptr& multipart return res; } -MultipartBody::MultipartBody(const std::shared_ptr& multipart, const oatpp::String& contentType, bool flushImmediately) +MultipartBody::MultipartBody(const std::shared_ptr& multipart, const oatpp::String& contentType, bool flushParts) : m_multipart(multipart) , m_contentType(contentType) , m_iterator(multipart) , m_state(STATE_BOUNDARY) , m_readStream(nullptr, nullptr, 0) - , m_flushImmediately(flushImmediately) + , m_flushParts(flushParts) {} void MultipartBody::declareHeaders(Headers& headers) { diff --git a/src/oatpp/web/protocol/http/outgoing/MultipartBody.hpp b/src/oatpp/web/protocol/http/outgoing/MultipartBody.hpp index d0b10f65..e64ff3c8 100644 --- a/src/oatpp/web/protocol/http/outgoing/MultipartBody.hpp +++ b/src/oatpp/web/protocol/http/outgoing/MultipartBody.hpp @@ -56,8 +56,9 @@ private: static constexpr v_int32 STATE_BOUNDARY = 0; static constexpr v_int32 STATE_HEADERS = 1; static constexpr v_int32 STATE_BODY = 2; - static constexpr v_int32 STATE_ROUND = 3; // number of possible states. used to round the state. - static constexpr v_int32 STATE_FINISHED = 4; + static constexpr v_int32 STATE_INC_PART = 3; + static constexpr v_int32 STATE_ROUND = 4; // number of possible states. used to round the state. + static constexpr v_int32 STATE_FINISHED = 5; private: @@ -85,9 +86,7 @@ private: void inc(async::Action& action) { m_part = m_multipart->readNextPart(action); - if(m_part) { - m_isFirst = false; - } + m_isFirst = false; } bool finished() { @@ -125,19 +124,21 @@ private: PartIterator m_iterator; v_int32 m_state; oatpp::data::stream::BufferInputStream m_readStream; - bool m_flushImmediately; + bool m_flushParts; private: v_io_size readBody(void *buffer, v_buff_size count, async::Action& action); + v_io_size incPart(async::Action& action); public: /** * Constructor. * @param multipart - multipart object. * @param contentType - type of the multipart. Default value = `"multipart/form-data"`. + * @param flushParts - flush data part by part. */ MultipartBody(const std::shared_ptr& multipart, const oatpp::String& contentType = "multipart/form-data", - bool flushImmediately = false); + bool flushParts = false); /** * Read operation callback. diff --git a/test/oatpp/web/app/Controller.hpp b/test/oatpp/web/app/Controller.hpp index ef4faea6..7c8ffcfb 100644 --- a/test/oatpp/web/app/Controller.hpp +++ b/test/oatpp/web/app/Controller.hpp @@ -300,7 +300,7 @@ public: std::shared_ptr readNextPart(async::Action& action) override { - if(counter == 5) { + if(counter == 10) { return nullptr; } @@ -343,7 +343,7 @@ public: auto body = std::make_shared( multipart, "multipart/x-mixed-replace", - true /* flush frames immediately */ + true /* flush parts */ ); return OutgoingResponse::createShared(Status::CODE_200, body); } diff --git a/test/oatpp/web/app/ControllerAsync.hpp b/test/oatpp/web/app/ControllerAsync.hpp index 363529d0..67c23eda 100644 --- a/test/oatpp/web/app/ControllerAsync.hpp +++ b/test/oatpp/web/app/ControllerAsync.hpp @@ -38,6 +38,7 @@ #include "oatpp/parser/json/mapping/ObjectMapper.hpp" +#include "oatpp/core/data/stream/FileStream.hpp" #include "oatpp/core/data/stream/Stream.hpp" #include "oatpp/core/utils/ConversionUtils.hpp" #include "oatpp/core/macro/codegen.hpp" @@ -284,6 +285,81 @@ public: }; + + class MPStream : public oatpp::web::mime::multipart::Multipart { + public: + typedef oatpp::web::mime::multipart::Part Part; + private: + v_uint32 counter = 0; + bool m_wait = false; + public: + + MPStream() + : oatpp::web::mime::multipart::Multipart(generateRandomBoundary()) + {} + + std::shared_ptr readNextPart(async::Action& action) override { + + if(counter == 10) { + return nullptr; + } + + if(m_wait) { + m_wait = false; + action = async::Action::createWaitRepeatAction(1000 * 1000 + oatpp::base::Environment::getMicroTickCount()); + return nullptr; + } + + m_wait = true; + + auto part = std::make_shared(); + part->putHeader(Header::CONTENT_TYPE, "text/html"); + + oatpp::String frameData; + +// if(counter % 2 == 0) { +// frameData = "0"; +// } else { +// frameData = "1"; +// } +// part->setDataInfo(std::make_shared(frameData)); + + if(counter % 2 == 0) { + part->setDataInfo(std::make_shared("/Users/leonid/Documents/test/frame1.jpg")); + } else { + part->setDataInfo(std::make_shared("/Users/leonid/Documents/test/frame2.jpg")); + } + + ++ counter; + + OATPP_LOGD("Multipart", "Frame sent!"); + + return part; + + } + + void writeNextPart(const std::shared_ptr& part, async::Action& action) override { + throw std::runtime_error("No writes here!!!"); + } + + }; + + ENDPOINT_ASYNC("GET", "multipart-stream", MultipartStream) { + + ENDPOINT_ASYNC_INIT(MultipartStream) + + Action act() { + auto multipart = std::make_shared(); + auto body = std::make_shared( + multipart, + "multipart/x-mixed-replace", + true /* flush parts */ + ); + return _return(OutgoingResponse::createShared(Status::CODE_200, body)); + } + + }; + #include OATPP_CODEGEN_END(ApiController) };