web::protocol::http::outgoing::MultipartBody. Correct Async processing of continuous multipart stream.

This commit is contained in:
lganzzzo 2020-08-03 03:48:19 +03:00
parent 678219180c
commit 6a569888ea
4 changed files with 105 additions and 21 deletions

View File

@ -32,16 +32,17 @@ v_io_size MultipartBody::readBody(void *buffer, v_buff_size count, async::Action
const auto& stream = part->getInputStream();
if(!stream) {
OATPP_LOGW("[oatpp::web::protocol::http::outgoing::MultipartBody::MultipartReadCallback::readBody()]", "Warning. Part has no input stream", m_state);
m_iterator.inc(action);
return 0;
}
auto res = stream->read(buffer, count, action);
if(res == 0) {
if(action.isNone()) {
m_iterator.inc(action);
}
return stream->read(buffer, count, action);
}
v_io_size MultipartBody::incPart(async::Action& action) {
m_iterator.inc(action);
if(action.isNone()) {
return 0;
}
return res;
return oatpp::IOError::RETRY_READ;
}
v_io_size MultipartBody::read(void *buffer, v_buff_size count, async::Action& action) {
@ -76,6 +77,10 @@ v_io_size MultipartBody::read(void *buffer, v_buff_size count, async::Action& ac
res = readBody(currBufferPtr, bytesLeft, action);
break;
case STATE_INC_PART:
res = incPart(action);
break;
default:
OATPP_LOGE("[oatpp::web::protocol::http::outgoing::MultipartBody::MultipartReadCallback::read()]", "Error. Invalid state %d", m_state);
return 0;
@ -93,11 +98,13 @@ v_io_size MultipartBody::read(void *buffer, v_buff_size count, async::Action& ac
}
m_state += 1;
if(m_state == STATE_INC_PART && m_flushParts && bytesLeft < count) {
break;
}
if(m_state == STATE_ROUND) {
m_state = 0;
if(m_flushImmediately) {
break;
}
}
} else if(action.isNone()) {
@ -167,13 +174,13 @@ v_io_size MultipartBody::readHeaders(const std::shared_ptr<Multipart>& multipart
return res;
}
MultipartBody::MultipartBody(const std::shared_ptr<Multipart>& multipart, const oatpp::String& contentType, bool flushImmediately)
MultipartBody::MultipartBody(const std::shared_ptr<Multipart>& multipart, const oatpp::String& contentType, bool flushParts)
: m_multipart(multipart)
, m_contentType(contentType)
, m_iterator(multipart)
, m_state(STATE_BOUNDARY)
, m_readStream(nullptr, nullptr, 0)
, m_flushImmediately(flushImmediately)
, m_flushParts(flushParts)
{}
void MultipartBody::declareHeaders(Headers& headers) {

View File

@ -56,8 +56,9 @@ private:
static constexpr v_int32 STATE_BOUNDARY = 0;
static constexpr v_int32 STATE_HEADERS = 1;
static constexpr v_int32 STATE_BODY = 2;
static constexpr v_int32 STATE_ROUND = 3; // number of possible states. used to round the state.
static constexpr v_int32 STATE_FINISHED = 4;
static constexpr v_int32 STATE_INC_PART = 3;
static constexpr v_int32 STATE_ROUND = 4; // number of possible states. used to round the state.
static constexpr v_int32 STATE_FINISHED = 5;
private:
@ -85,9 +86,7 @@ private:
void inc(async::Action& action) {
m_part = m_multipart->readNextPart(action);
if(m_part) {
m_isFirst = false;
}
m_isFirst = false;
}
bool finished() {
@ -125,19 +124,21 @@ private:
PartIterator m_iterator;
v_int32 m_state;
oatpp::data::stream::BufferInputStream m_readStream;
bool m_flushImmediately;
bool m_flushParts;
private:
v_io_size readBody(void *buffer, v_buff_size count, async::Action& action);
v_io_size incPart(async::Action& action);
public:
/**
* Constructor.
* @param multipart - multipart object.
* @param contentType - type of the multipart. Default value = `"multipart/form-data"`.
* @param flushParts - flush data part by part.
*/
MultipartBody(const std::shared_ptr<Multipart>& multipart,
const oatpp::String& contentType = "multipart/form-data",
bool flushImmediately = false);
bool flushParts = false);
/**
* Read operation callback.

View File

@ -300,7 +300,7 @@ public:
std::shared_ptr<Part> readNextPart(async::Action& action) override {
if(counter == 5) {
if(counter == 10) {
return nullptr;
}
@ -343,7 +343,7 @@ public:
auto body = std::make_shared<oatpp::web::protocol::http::outgoing::MultipartBody>(
multipart,
"multipart/x-mixed-replace",
true /* flush frames immediately */
true /* flush parts */
);
return OutgoingResponse::createShared(Status::CODE_200, body);
}

View File

@ -38,6 +38,7 @@
#include "oatpp/parser/json/mapping/ObjectMapper.hpp"
#include "oatpp/core/data/stream/FileStream.hpp"
#include "oatpp/core/data/stream/Stream.hpp"
#include "oatpp/core/utils/ConversionUtils.hpp"
#include "oatpp/core/macro/codegen.hpp"
@ -284,6 +285,81 @@ public:
};
class MPStream : public oatpp::web::mime::multipart::Multipart {
public:
typedef oatpp::web::mime::multipart::Part Part;
private:
v_uint32 counter = 0;
bool m_wait = false;
public:
MPStream()
: oatpp::web::mime::multipart::Multipart(generateRandomBoundary())
{}
std::shared_ptr<Part> readNextPart(async::Action& action) override {
if(counter == 10) {
return nullptr;
}
if(m_wait) {
m_wait = false;
action = async::Action::createWaitRepeatAction(1000 * 1000 + oatpp::base::Environment::getMicroTickCount());
return nullptr;
}
m_wait = true;
auto part = std::make_shared<Part>();
part->putHeader(Header::CONTENT_TYPE, "text/html");
oatpp::String frameData;
// if(counter % 2 == 0) {
// frameData = "<html><body>0</body></html>";
// } else {
// frameData = "<html><body>1</body></html>";
// }
// part->setDataInfo(std::make_shared<oatpp::data::stream::BufferInputStream>(frameData));
if(counter % 2 == 0) {
part->setDataInfo(std::make_shared<oatpp::data::stream::FileInputStream>("/Users/leonid/Documents/test/frame1.jpg"));
} else {
part->setDataInfo(std::make_shared<oatpp::data::stream::FileInputStream>("/Users/leonid/Documents/test/frame2.jpg"));
}
++ counter;
OATPP_LOGD("Multipart", "Frame sent!");
return part;
}
void writeNextPart(const std::shared_ptr<Part>& part, async::Action& action) override {
throw std::runtime_error("No writes here!!!");
}
};
ENDPOINT_ASYNC("GET", "multipart-stream", MultipartStream) {
ENDPOINT_ASYNC_INIT(MultipartStream)
Action act() {
auto multipart = std::make_shared<MPStream>();
auto body = std::make_shared<oatpp::web::protocol::http::outgoing::MultipartBody>(
multipart,
"multipart/x-mixed-replace",
true /* flush parts */
);
return _return(OutgoingResponse::createShared(Status::CODE_200, body));
}
};
#include OATPP_CODEGEN_END(ApiController)
};