Introduce: AsyncInlineReadData, AsyncInlineWriteData.

This commit is contained in:
lganzzzo 2019-07-24 01:12:08 +04:00
parent 3944412fa6
commit 531030572f
16 changed files with 281 additions and 154 deletions

View File

@ -295,12 +295,8 @@ async::CoroutineStarter FIFOBuffer::flushToStreamAsync(const std::shared_ptr<dat
std::shared_ptr<FIFOBuffer> m_fifo; std::shared_ptr<FIFOBuffer> m_fifo;
std::shared_ptr<data::stream::OutputStream> m_stream; std::shared_ptr<data::stream::OutputStream> m_stream;
private: private:
data::stream::AsyncInlineWriteData m_data1;
const void* m_data1; data::stream::AsyncInlineWriteData m_data2;
data::v_io_size m_size1;
const void* m_data2;
data::v_io_size m_size2;
public: public:
FlushCoroutine(const std::shared_ptr<FIFOBuffer>& fifo, const std::shared_ptr<data::stream::OutputStream>& stream) FlushCoroutine(const std::shared_ptr<FIFOBuffer>& fifo, const std::shared_ptr<data::stream::OutputStream>& stream)
@ -316,34 +312,28 @@ async::CoroutineStarter FIFOBuffer::flushToStreamAsync(const std::shared_ptr<dat
if(m_fifo->m_readPosition < m_fifo->m_writePosition) { if(m_fifo->m_readPosition < m_fifo->m_writePosition) {
m_data1 = &m_fifo->m_buffer[m_fifo->m_readPosition]; m_data1.set(&m_fifo->m_buffer[m_fifo->m_readPosition], m_fifo->m_writePosition - m_fifo->m_readPosition);
m_size1 = m_fifo->m_writePosition - m_fifo->m_readPosition;
return yieldTo(&FlushCoroutine::fullFlush); return yieldTo(&FlushCoroutine::fullFlush);
} else { } else {
m_data1 = &m_fifo->m_buffer[m_fifo->m_readPosition]; m_data1.set(&m_fifo->m_buffer[m_fifo->m_readPosition], m_fifo->m_bufferSize - m_fifo->m_readPosition);
m_size1 = m_fifo->m_bufferSize - m_fifo->m_readPosition; m_data2.set(m_fifo->m_buffer, m_fifo->m_writePosition);
m_data2 = m_fifo->m_buffer;
m_size2 = m_fifo->m_writePosition;
return yieldTo(&FlushCoroutine::partialFlush1); return yieldTo(&FlushCoroutine::partialFlush1);
} }
} }
Action fullFlush() { Action fullFlush() {
return data::stream::writeExactSizeDataAsyncInline(this, m_stream.get(), m_data1, m_size1, yieldTo(&FlushCoroutine::beforeFinish)); return data::stream::writeExactSizeDataAsyncInline(this, m_stream.get(), m_data1, yieldTo(&FlushCoroutine::beforeFinish));
} }
Action partialFlush1() { Action partialFlush1() {
return data::stream::writeExactSizeDataAsyncInline(this, m_stream.get(), m_data1, m_size1, yieldTo(&FlushCoroutine::partialFlush2)); return data::stream::writeExactSizeDataAsyncInline(this, m_stream.get(), m_data1, yieldTo(&FlushCoroutine::partialFlush2));
} }
Action partialFlush2() { Action partialFlush2() {
return data::stream::writeExactSizeDataAsyncInline(this, m_stream.get(), m_data2, m_size2, yieldTo(&FlushCoroutine::beforeFinish)); return data::stream::writeExactSizeDataAsyncInline(this, m_stream.get(), m_data2, yieldTo(&FlushCoroutine::beforeFinish));
} }
Action beforeFinish() { Action beforeFinish() {

View File

@ -239,9 +239,8 @@ oatpp::async::CoroutineStarter ChunkedBuffer::flushToStreamAsync(const std::shar
std::shared_ptr<OutputStream> m_stream; std::shared_ptr<OutputStream> m_stream;
ChunkEntry* m_currEntry; ChunkEntry* m_currEntry;
data::v_io_size m_bytesLeft; data::v_io_size m_bytesLeft;
const void* m_currData;
data::v_io_size m_currDataSize;
Action m_nextAction; Action m_nextAction;
data::stream::AsyncInlineWriteData m_currData;
public: public:
FlushCoroutine(const std::shared_ptr<ChunkedBuffer>& chunkedBuffer, FlushCoroutine(const std::shared_ptr<ChunkedBuffer>& chunkedBuffer,
@ -250,8 +249,6 @@ oatpp::async::CoroutineStarter ChunkedBuffer::flushToStreamAsync(const std::shar
, m_stream(stream) , m_stream(stream)
, m_currEntry(chunkedBuffer->m_firstEntry) , m_currEntry(chunkedBuffer->m_firstEntry)
, m_bytesLeft(chunkedBuffer->m_size) , m_bytesLeft(chunkedBuffer->m_size)
, m_currData(nullptr)
, m_currDataSize(0)
, m_nextAction(Action::createActionByType(Action::TYPE_FINISH)) , m_nextAction(Action::createActionByType(Action::TYPE_FINISH))
{} {}
@ -262,25 +259,23 @@ oatpp::async::CoroutineStarter ChunkedBuffer::flushToStreamAsync(const std::shar
} }
if(m_bytesLeft > CHUNK_ENTRY_SIZE) { if(m_bytesLeft > CHUNK_ENTRY_SIZE) {
m_currData = m_currEntry->chunk; m_currData.set(m_currEntry->chunk, CHUNK_ENTRY_SIZE);
m_currDataSize = CHUNK_ENTRY_SIZE;
m_nextAction = yieldTo(&FlushCoroutine::act); m_nextAction = yieldTo(&FlushCoroutine::act);
m_currEntry = m_currEntry->next; m_currEntry = m_currEntry->next;
m_bytesLeft -= m_currDataSize; m_bytesLeft -= m_currData.bytesLeft;
return yieldTo(&FlushCoroutine::writeCurrData); return yieldTo(&FlushCoroutine::writeCurrData);
} else { } else {
m_currData = m_currEntry->chunk; m_currData.set(m_currEntry->chunk, m_bytesLeft);
m_currDataSize = m_bytesLeft;
m_nextAction = yieldTo(&FlushCoroutine::act); m_nextAction = yieldTo(&FlushCoroutine::act);
m_currEntry = m_currEntry->next; m_currEntry = m_currEntry->next;
m_bytesLeft -= m_currDataSize; m_bytesLeft -= m_currData.bytesLeft;
return yieldTo(&FlushCoroutine::writeCurrData); return yieldTo(&FlushCoroutine::writeCurrData);
} }
} }
Action writeCurrData() { Action writeCurrData() {
return oatpp::data::stream::writeExactSizeDataAsyncInline(this, m_stream.get(), m_currData, m_currDataSize, Action::clone(m_nextAction)); return oatpp::data::stream::writeExactSizeDataAsyncInline(this, m_stream.get(), m_currData, Action::clone(m_nextAction));
} }
}; };

View File

@ -90,17 +90,71 @@ data::v_io_size ConsistentOutputStream::writeAsString(bool value) {
} }
} }
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// AsyncInlineWriteData
AsyncInlineWriteData::AsyncInlineWriteData()
: currBufferPtr(nullptr)
, bytesLeft(0)
{}
AsyncInlineWriteData::AsyncInlineWriteData(const void* data, data::v_io_size size)
: currBufferPtr(data)
, bytesLeft(size)
{}
void AsyncInlineWriteData::set(const void* data, data::v_io_size size) {
currBufferPtr = data;
bytesLeft = size;
}
void AsyncInlineWriteData::inc(data::v_io_size amount) {
currBufferPtr = &((p_char8) currBufferPtr)[amount];
bytesLeft -= amount;
}
void AsyncInlineWriteData::setEof() {
currBufferPtr = &((p_char8) currBufferPtr)[bytesLeft];
bytesLeft = 0;
}
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// AsyncInlineReadData
AsyncInlineReadData::AsyncInlineReadData()
: currBufferPtr(nullptr)
, bytesLeft(0)
{}
AsyncInlineReadData::AsyncInlineReadData(void* data, data::v_io_size size)
: currBufferPtr(data)
, bytesLeft(size)
{}
void AsyncInlineReadData::set(void* data, data::v_io_size size) {
currBufferPtr = data;
bytesLeft = size;
}
void AsyncInlineReadData::inc(data::v_io_size amount) {
currBufferPtr = &((p_char8) currBufferPtr)[amount];
bytesLeft -= amount;
}
void AsyncInlineReadData::setEof() {
currBufferPtr = &((p_char8) currBufferPtr)[bytesLeft];
bytesLeft = 0;
}
//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// AsyncWriteCallbackWithCoroutineStarter // AsyncWriteCallbackWithCoroutineStarter
oatpp::async::Action AsyncWriteCallbackWithCoroutineStarter::writeAsyncInline(oatpp::async::AbstractCoroutine* coroutine, oatpp::async::Action AsyncWriteCallbackWithCoroutineStarter::writeAsyncInline(oatpp::async::AbstractCoroutine* coroutine,
const void*& currBufferPtr, AsyncInlineWriteData& inlineData,
data::v_io_size& bytesLeft,
oatpp::async::Action&& nextAction) oatpp::async::Action&& nextAction)
{ {
auto coroutineStarter = writeAsync(currBufferPtr, bytesLeft); auto coroutineStarter = writeAsync(inlineData.currBufferPtr, inlineData.bytesLeft);
currBufferPtr = &((p_char8) currBufferPtr)[bytesLeft]; inlineData.setEof();
bytesLeft = 0;
return coroutineStarter.next(std::forward<async::Action>(nextAction)); return coroutineStarter.next(std::forward<async::Action>(nextAction));
} }
@ -123,11 +177,10 @@ DefaultAsyncWriteCallback::DefaultAsyncWriteCallback(const std::shared_ptr<Outpu
{} {}
oatpp::async::Action DefaultAsyncWriteCallback::writeAsyncInline(oatpp::async::AbstractCoroutine* coroutine, oatpp::async::Action DefaultAsyncWriteCallback::writeAsyncInline(oatpp::async::AbstractCoroutine* coroutine,
const void*& currBufferPtr, AsyncInlineWriteData& inlineData,
data::v_io_size& bytesLeft,
oatpp::async::Action&& nextAction) oatpp::async::Action&& nextAction)
{ {
return writeExactSizeDataAsyncInline(coroutine, m_stream.get(), currBufferPtr, bytesLeft, std::forward<oatpp::async::Action>(nextAction)); return writeExactSizeDataAsyncInline(coroutine, m_stream.get(), inlineData, std::forward<oatpp::async::Action>(nextAction));
} }
//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
@ -297,9 +350,9 @@ oatpp::async::CoroutineStarter transferAsync(const std::shared_ptr<InputStream>&
std::shared_ptr<oatpp::data::buffer::IOBuffer> m_buffer; std::shared_ptr<oatpp::data::buffer::IOBuffer> m_buffer;
oatpp::data::v_io_size m_desiredReadCount; oatpp::data::v_io_size m_desiredReadCount;
void* m_readBufferPtr;
const void* m_writeBufferPtr; AsyncInlineReadData m_inlineReadData;
oatpp::data::v_io_size m_bytesLeft; AsyncInlineWriteData m_inlineWriteData;
public: public:
@ -330,26 +383,28 @@ oatpp::async::CoroutineStarter transferAsync(const std::shared_ptr<InputStream>&
if(m_transferSize == 0 || m_desiredReadCount > m_buffer->getSize()){ if(m_transferSize == 0 || m_desiredReadCount > m_buffer->getSize()){
m_desiredReadCount = m_buffer->getSize(); m_desiredReadCount = m_buffer->getSize();
} }
m_readBufferPtr = m_buffer->getData(); m_inlineReadData.set(m_buffer->getData(), m_desiredReadCount);
m_writeBufferPtr = m_buffer->getData();
m_bytesLeft = m_desiredReadCount;
return yieldTo(&TransferCoroutine::doRead); return yieldTo(&TransferCoroutine::doRead);
} }
Action doRead() { Action doRead() {
return oatpp::data::stream::readSomeDataAsyncInline(this, m_fromStream.get(), m_readBufferPtr, m_bytesLeft, yieldTo(&TransferCoroutine::prepareWrite)); return oatpp::data::stream::readSomeDataAsyncInline(this, m_fromStream.get(), m_inlineReadData, yieldTo(&TransferCoroutine::prepareWrite));
} }
Action prepareWrite() { Action prepareWrite() {
m_bytesLeft = m_desiredReadCount - m_bytesLeft;
m_progress += m_bytesLeft; auto readCount = m_desiredReadCount - m_inlineReadData.bytesLeft;
m_inlineWriteData.set(m_buffer->getData(), readCount);
m_progress += readCount;
return yieldTo(&TransferCoroutine::doWrite); return yieldTo(&TransferCoroutine::doWrite);
} }
Action doWrite() { Action doWrite() {
return m_writeCallback->writeAsyncInline(this, m_writeBufferPtr, m_bytesLeft, yieldTo(&TransferCoroutine::act)); return m_writeCallback->writeAsyncInline(this, m_inlineWriteData, yieldTo(&TransferCoroutine::act));
} }
Action handleError(const std::shared_ptr<const Error>& error) override { Action handleError(const std::shared_ptr<const Error>& error) override {
@ -405,15 +460,13 @@ namespace {
oatpp::async::Action writeExactSizeDataAsyncInline(oatpp::async::AbstractCoroutine* coroutine, oatpp::async::Action writeExactSizeDataAsyncInline(oatpp::async::AbstractCoroutine* coroutine,
oatpp::data::stream::OutputStream* stream, oatpp::data::stream::OutputStream* stream,
const void*& data, AsyncInlineWriteData& inlineData,
data::v_io_size& size,
oatpp::async::Action&& nextAction) { oatpp::async::Action&& nextAction) {
if(size > 0) { if(inlineData.bytesLeft > 0) {
auto res = stream->write(data, size); auto res = stream->write(inlineData.currBufferPtr, inlineData.bytesLeft);
if(res > 0) { if(res > 0) {
data = &((p_char8) data)[res]; inlineData.inc(res);
size -= res; if (inlineData.bytesLeft > 0) {
if (size > 0) {
return stream->suggestOutputStreamAction(res); return stream->suggestOutputStreamAction(res);
} }
} else { } else {
@ -426,15 +479,13 @@ oatpp::async::Action writeExactSizeDataAsyncInline(oatpp::async::AbstractCorouti
oatpp::async::Action readSomeDataAsyncInline(oatpp::async::AbstractCoroutine* coroutine, oatpp::async::Action readSomeDataAsyncInline(oatpp::async::AbstractCoroutine* coroutine,
oatpp::data::stream::InputStream* stream, oatpp::data::stream::InputStream* stream,
void*& data, AsyncInlineReadData& inlineData,
data::v_io_size& size,
oatpp::async::Action&& nextAction) { oatpp::async::Action&& nextAction) {
if(size > 0) { if(inlineData.bytesLeft > 0) {
auto res = stream->read(data, size); auto res = stream->read(inlineData.currBufferPtr, inlineData.bytesLeft);
if(res > 0) { if(res > 0) {
data = &((p_char8) data)[res]; inlineData.inc(res);
size -= res;
} else { } else {
return asyncInputStreamActionOnIOError(coroutine, stream, res); return asyncInputStreamActionOnIOError(coroutine, stream, res);
} }
@ -445,15 +496,13 @@ oatpp::async::Action readSomeDataAsyncInline(oatpp::async::AbstractCoroutine* co
oatpp::async::Action readExactSizeDataAsyncInline(oatpp::async::AbstractCoroutine* coroutine, oatpp::async::Action readExactSizeDataAsyncInline(oatpp::async::AbstractCoroutine* coroutine,
oatpp::data::stream::InputStream* stream, oatpp::data::stream::InputStream* stream,
void*& data, AsyncInlineReadData& inlineData,
data::v_io_size& size,
oatpp::async::Action&& nextAction) { oatpp::async::Action&& nextAction) {
if(size > 0) { if(inlineData.bytesLeft > 0) {
auto res = stream->read(data, size); auto res = stream->read(inlineData.currBufferPtr, inlineData.bytesLeft);
if(res > 0) { if(res > 0) {
data = &((p_char8) data)[res]; inlineData.inc(res);
size -= res; if (inlineData.bytesLeft > 0) {
if (size > 0) {
return stream->suggestInputStreamAction(res); return stream->suggestInputStreamAction(res);
} }
} else { } else {

View File

@ -290,6 +290,102 @@ ConsistentOutputStream& operator << (ConsistentOutputStream& s, v_float32 value)
ConsistentOutputStream& operator << (ConsistentOutputStream& s, v_float64 value); ConsistentOutputStream& operator << (ConsistentOutputStream& s, v_float64 value);
ConsistentOutputStream& operator << (ConsistentOutputStream& s, bool value); ConsistentOutputStream& operator << (ConsistentOutputStream& s, bool value);
/**
* Convenience structure for stream Async-Inline write operations.
*/
struct AsyncInlineWriteData {
/**
* Pointer to current position in the buffer.
*/
const void* currBufferPtr;
/**
* Bytes left to write from the buffer.
*/
data::v_io_size bytesLeft;
/**
* Default constructor.
*/
AsyncInlineWriteData();
/**
* Constructor.
* @param data
* @param size
*/
AsyncInlineWriteData(const void* data, data::v_io_size size);
/**
* Set `currBufferPtr` and `bytesLeft` values. <br>
* @param data - pointer to buffer containing data to be written.
* @param size - size in bytes of the buffer.
*/
void set(const void* data, data::v_io_size size);
/**
* Increase position in the write buffer by `amount` bytes. <br>
* This will increase `currBufferPtr` and descrease `bytesLeft` values.
* @param amount
*/
void inc(data::v_io_size amount);
/**
* Same as `inc(bytesLeft).`
*/
void setEof();
};
/**
* Convenience structure for stream Async-Inline read operations.
*/
struct AsyncInlineReadData {
/**
* Pointer to current position in the buffer.
*/
void* currBufferPtr;
/**
* Bytes left to read to the buffer.
*/
data::v_io_size bytesLeft;
/**
* Default constructor.
*/
AsyncInlineReadData();
/**
* Constructor.
* @param data
* @param size
*/
AsyncInlineReadData(void* data, data::v_io_size size);
/**
* Set `currBufferPtr` and `bytesLeft` values. <br>
* @param data - pointer to buffer to store read data.
* @param size - size in bytes of the buffer.
*/
void set(void* data, data::v_io_size size);
/**
* Increase position in the read buffer by `amount` bytes. <br>
* This will increase `currBufferPtr` and descrease `bytesLeft` values.
* @param amount
*/
void inc(data::v_io_size amount);
/**
* Same as `inc(bytesLeft).`
*/
void setEof();
};
/** /**
* Callback for stream write operation. * Callback for stream write operation.
*/ */
@ -324,14 +420,12 @@ public:
/** /**
* Async-Inline write callback. * Async-Inline write callback.
* @param coroutine - caller coroutine. * @param coroutine - caller coroutine.
* @param currBufferPtr - pointer to current data position. * @param inlineData - &id:oatpp::data::stream::AsyncInlineWriteData;.
* @param bytesLeft - how much bytes left to write.
* @param nextAction - next action when write finished. * @param nextAction - next action when write finished.
* @return - &id:oatpp::async::Action;. * @return - &id:oatpp::async::Action;.
*/ */
virtual oatpp::async::Action writeAsyncInline(oatpp::async::AbstractCoroutine* coroutine, virtual oatpp::async::Action writeAsyncInline(oatpp::async::AbstractCoroutine* coroutine,
const void*& currBufferPtr, AsyncInlineWriteData& inlineData,
data::v_io_size& bytesLeft,
oatpp::async::Action&& nextAction) = 0; oatpp::async::Action&& nextAction) = 0;
}; };
@ -345,14 +439,12 @@ public:
* Async-Inline write callback. <br> * Async-Inline write callback. <br>
* Calls &l:AsyncWriteCallbackWithCoroutineStarter::writeAsync (); internally. * Calls &l:AsyncWriteCallbackWithCoroutineStarter::writeAsync (); internally.
* @param coroutine - caller coroutine. * @param coroutine - caller coroutine.
* @param currBufferPtr - pointer to current data position. * @param inlineData - &id:oatpp::data::stream::AsyncInlineWriteData;.
* @param bytesLeft - how much bytes left to write.
* @param nextAction - next action when write finished. * @param nextAction - next action when write finished.
* @return - &id:oatpp::async::Action;. * @return - &id:oatpp::async::Action;.
*/ */
oatpp::async::Action writeAsyncInline(oatpp::async::AbstractCoroutine* coroutine, oatpp::async::Action writeAsyncInline(oatpp::async::AbstractCoroutine* coroutine,
const void*& currBufferPtr, AsyncInlineWriteData& inlineData,
data::v_io_size& bytesLeft,
oatpp::async::Action&& nextAction) override; oatpp::async::Action&& nextAction) override;
public: public:
@ -411,14 +503,12 @@ public:
/** /**
* Async-Inline write callback. * Async-Inline write callback.
* @param coroutine - caller coroutine. * @param coroutine - caller coroutine.
* @param currBufferPtr - pointer to current data position. * @param inlineData - &id:oatpp::data::stream::AsyncInlineWriteData;.
* @param bytesLeft - how much bytes left to write.
* @param nextAction - next action when write finished. * @param nextAction - next action when write finished.
* @return - &id:oatpp::async::Action;. * @return - &id:oatpp::async::Action;.
*/ */
oatpp::async::Action writeAsyncInline(oatpp::async::AbstractCoroutine* coroutine, oatpp::async::Action writeAsyncInline(oatpp::async::AbstractCoroutine* coroutine,
const void*& currBufferPtr, AsyncInlineWriteData& inlineData,
data::v_io_size& bytesLeft,
oatpp::async::Action&& nextAction) override; oatpp::async::Action&& nextAction) override;
}; };
@ -459,14 +549,12 @@ public:
/** /**
* Async-Inline read callback. * Async-Inline read callback.
* @param coroutine - caller coroutine. * @param coroutine - caller coroutine.
* @param currBufferPtr - pointer to current buffer position. * @param inlineData - &id:oatpp::data::stream::AsyncInlineReadData;.
* @param bytesLeftToRead - how much bytes left to read.
* @param nextAction - next action when read finished. * @param nextAction - next action when read finished.
* @return - &id:oatpp::async::Action;. * @return - &id:oatpp::async::Action;.
*/ */
virtual oatpp::async::Action readAsyncInline(oatpp::async::AbstractCoroutine* coroutine, virtual oatpp::async::Action readAsyncInline(oatpp::async::AbstractCoroutine* coroutine,
void*& currBufferPtr, AsyncInlineReadData& inlineData,
data::v_io_size& bytesLeftToRead,
oatpp::async::Action&& nextAction) = 0; oatpp::async::Action&& nextAction) = 0;
}; };
@ -510,20 +598,17 @@ oatpp::async::CoroutineStarter transferAsync(const std::shared_ptr<InputStream>&
oatpp::async::Action writeExactSizeDataAsyncInline(oatpp::async::AbstractCoroutine* coroutine, oatpp::async::Action writeExactSizeDataAsyncInline(oatpp::async::AbstractCoroutine* coroutine,
oatpp::data::stream::OutputStream* stream, oatpp::data::stream::OutputStream* stream,
const void*& data, AsyncInlineWriteData& inlineData,
data::v_io_size& size,
oatpp::async::Action&& nextAction); oatpp::async::Action&& nextAction);
oatpp::async::Action readSomeDataAsyncInline(oatpp::async::AbstractCoroutine* coroutine, oatpp::async::Action readSomeDataAsyncInline(oatpp::async::AbstractCoroutine* coroutine,
oatpp::data::stream::InputStream* stream, oatpp::data::stream::InputStream* stream,
void*& data, AsyncInlineReadData& inlineData,
data::v_io_size& bytesLeftToRead,
oatpp::async::Action&& nextAction); oatpp::async::Action&& nextAction);
oatpp::async::Action readExactSizeDataAsyncInline(oatpp::async::AbstractCoroutine* coroutine, oatpp::async::Action readExactSizeDataAsyncInline(oatpp::async::AbstractCoroutine* coroutine,
oatpp::data::stream::InputStream* stream, oatpp::data::stream::InputStream* stream,
void*& data, AsyncInlineReadData& inlineData,
data::v_io_size& bytesLeftToRead,
oatpp::async::Action&& nextAction); oatpp::async::Action&& nextAction);
/** /**

View File

@ -72,13 +72,11 @@ AsyncInMemoryReader::AsyncInMemoryReader(Multipart* multipart)
{} {}
oatpp::async::Action AsyncInMemoryReader::writeAsyncInline(oatpp::async::AbstractCoroutine* coroutine, oatpp::async::Action AsyncInMemoryReader::writeAsyncInline(oatpp::async::AbstractCoroutine* coroutine,
const void*& currBufferPtr, oatpp::data::stream::AsyncInlineWriteData& inlineData,
data::v_io_size& bytesLeft,
oatpp::async::Action&& nextAction) oatpp::async::Action&& nextAction)
{ {
m_parser.parseNext((p_char8) currBufferPtr, bytesLeft); m_parser.parseNext((p_char8) inlineData.currBufferPtr, inlineData.bytesLeft);
currBufferPtr = &((p_char8) currBufferPtr)[bytesLeft]; inlineData.setEof();
bytesLeft = 0;
return std::forward<async::Action>(nextAction); return std::forward<async::Action>(nextAction);
} }

View File

@ -88,8 +88,7 @@ public:
AsyncInMemoryReader(Multipart* multipart); AsyncInMemoryReader(Multipart* multipart);
oatpp::async::Action writeAsyncInline(oatpp::async::AbstractCoroutine* coroutine, oatpp::async::Action writeAsyncInline(oatpp::async::AbstractCoroutine* coroutine,
const void*& currBufferPtr, oatpp::data::stream::AsyncInlineWriteData& inlineData,
data::v_io_size& bytesLeft,
oatpp::async::Action&& nextAction) override; oatpp::async::Action&& nextAction) override;
}; };

View File

@ -125,13 +125,11 @@ oatpp::async::CoroutineStarter SimpleBodyDecoder::doChunkedDecodingAsync(const s
v_char8 m_lineChar; v_char8 m_lineChar;
bool m_lineEnding; bool m_lineEnding;
v_char8 m_lineBuffer [16]; // used max 8 v_char8 m_lineBuffer [16]; // used max 8
void* m_skipData; data::stream::AsyncInlineReadData m_skipData;
data::v_io_size m_skipSize;
bool m_done = false; bool m_done = false;
private: private:
void prepareSkipRN() { void prepareSkipRN() {
m_skipData = &m_lineBuffer[0]; m_skipData.set(&m_lineBuffer[0], 2);
m_skipSize = 2;
m_currLineLength = 0; m_currLineLength = 0;
m_lineEnding = false; m_lineEnding = false;
} }
@ -198,9 +196,9 @@ oatpp::async::CoroutineStarter SimpleBodyDecoder::doChunkedDecodingAsync(const s
Action skipRN() { Action skipRN() {
if(m_done) { if(m_done) {
return oatpp::data::stream::readExactSizeDataAsyncInline(this, m_fromStream.get(), m_skipData, m_skipSize, finish()); return oatpp::data::stream::readExactSizeDataAsyncInline(this, m_fromStream.get(), m_skipData, finish());
} else { } else {
return oatpp::data::stream::readExactSizeDataAsyncInline(this, m_fromStream.get(), m_skipData, m_skipSize, yieldTo(&ChunkedDecoder::readLineChar)); return oatpp::data::stream::readExactSizeDataAsyncInline(this, m_fromStream.get(), m_skipData, yieldTo(&ChunkedDecoder::readLineChar));
} }
} }

View File

@ -32,12 +32,11 @@ BufferBody::WriteToStreamCoroutine::WriteToStreamCoroutine(const std::shared_ptr
const std::shared_ptr<OutputStream>& stream) const std::shared_ptr<OutputStream>& stream)
: m_body(body) : m_body(body)
, m_stream(stream) , m_stream(stream)
, m_currData(m_body->m_buffer->getData()) , m_inlineData(m_body->m_buffer->getData(), m_body->m_buffer->getSize())
, m_currDataSize(m_body->m_buffer->getSize())
{} {}
async::Action BufferBody::WriteToStreamCoroutine::act() { async::Action BufferBody::WriteToStreamCoroutine::act() {
return oatpp::data::stream::writeExactSizeDataAsyncInline(this, m_stream.get(), m_currData, m_currDataSize, finish()); return oatpp::data::stream::writeExactSizeDataAsyncInline(this, m_stream.get(), m_inlineData, finish());
} }
BufferBody::BufferBody(const oatpp::String& buffer) BufferBody::BufferBody(const oatpp::String& buffer)

View File

@ -72,8 +72,7 @@ public:
private: private:
std::shared_ptr<BufferBody> m_body; std::shared_ptr<BufferBody> m_body;
std::shared_ptr<OutputStream> m_stream; std::shared_ptr<OutputStream> m_stream;
const void* m_currData; oatpp::data::stream::AsyncInlineWriteData m_inlineData;
oatpp::data::v_io_size m_currDataSize;
public: public:
/** /**

View File

@ -96,13 +96,10 @@ oatpp::async::CoroutineStarter ChunkedBody::writeToStreamAsync(const std::shared
bool m_firstChunk; bool m_firstChunk;
private: private:
oatpp::String m_chunkSizeHex; oatpp::String m_chunkSizeHex;
const void* m_chunkSizeCurrDataPtr;
data::v_io_size m_chunkSizeBytesLeft;
private: private:
void* m_currDataReadPtr; data::stream::AsyncInlineReadData m_inlineReadData;
const void* m_currDataWritePtr; data::stream::AsyncInlineWriteData m_inlineWriteData;
data::v_io_size m_bytesLeft; data::stream::AsyncInlineWriteData m_chunkSizeWriteData;
data::v_io_size m_bytesRead;
public: public:
WriteCoroutine(const std::shared_ptr<ChunkedBody>& body, WriteCoroutine(const std::shared_ptr<ChunkedBody>& body,
@ -113,55 +110,50 @@ oatpp::async::CoroutineStarter ChunkedBody::writeToStreamAsync(const std::shared
{} {}
Action act() override { Action act() override {
m_currDataReadPtr = m_body->m_buffer; m_inlineReadData.set(m_body->m_buffer, m_body->m_bufferSize);
m_bytesLeft = m_body->m_bufferSize;
return yieldTo(&WriteCoroutine::readCallback); return yieldTo(&WriteCoroutine::readCallback);
} }
Action readCallback() { Action readCallback() {
return m_body->m_asyncReadCallback->readAsyncInline(this, m_currDataReadPtr, m_bytesLeft, yieldTo(&WriteCoroutine::onChunkRead)); return m_body->m_asyncReadCallback->readAsyncInline(this, m_inlineReadData, yieldTo(&WriteCoroutine::onChunkRead));
} }
Action onChunkRead() { Action onChunkRead() {
m_bytesRead = m_body->m_bufferSize - m_bytesLeft; data::v_io_size bytesRead = m_body->m_bufferSize - m_inlineReadData.bytesLeft;
if(m_bytesRead > 0) { if(bytesRead > 0) {
if(m_firstChunk) { if(m_firstChunk) {
m_chunkSizeHex = oatpp::utils::conversion::primitiveToStr(m_bytesRead, "%X") + "\r\n"; m_chunkSizeHex = oatpp::utils::conversion::primitiveToStr(bytesRead, "%X") + "\r\n";
m_firstChunk = false; m_firstChunk = false;
} else { } else {
m_chunkSizeHex = "\r\n" + oatpp::utils::conversion::primitiveToStr(m_bytesRead, "%X") + "\r\n"; m_chunkSizeHex = "\r\n" + oatpp::utils::conversion::primitiveToStr(bytesRead, "%X") + "\r\n";
} }
m_chunkSizeCurrDataPtr = m_chunkSizeHex->getData(); m_chunkSizeWriteData.set(m_chunkSizeHex->getData(), m_chunkSizeHex->getSize());
m_chunkSizeBytesLeft = m_chunkSizeHex->getSize(); m_inlineWriteData.set(m_body->m_buffer, bytesRead);
m_currDataWritePtr = m_body->m_buffer;
m_bytesLeft = m_bytesRead;
return yieldTo(&WriteCoroutine::writeChunkSize); return yieldTo(&WriteCoroutine::writeChunkSize);
} }
m_chunkSizeCurrDataPtr = "\r\n0\r\n\r\n"; m_chunkSizeWriteData.set("\r\n0\r\n\r\n", 7);
m_chunkSizeBytesLeft = 7;
return yieldTo(&WriteCoroutine::writeTrailingBytes); return yieldTo(&WriteCoroutine::writeTrailingBytes);
} }
Action writeChunkSize() { Action writeChunkSize() {
return oatpp::data::stream::writeExactSizeDataAsyncInline(this, m_stream.get(), m_chunkSizeCurrDataPtr, m_chunkSizeBytesLeft, yieldTo(&WriteCoroutine::writeChunk)); return oatpp::data::stream::writeExactSizeDataAsyncInline(this, m_stream.get(), m_chunkSizeWriteData, yieldTo(&WriteCoroutine::writeChunk));
} }
Action writeChunk() { Action writeChunk() {
return oatpp::data::stream::writeExactSizeDataAsyncInline(this, m_stream.get(), m_currDataWritePtr, m_bytesLeft, yieldTo(&WriteCoroutine::act)); return oatpp::data::stream::writeExactSizeDataAsyncInline(this, m_stream.get(), m_inlineWriteData, yieldTo(&WriteCoroutine::act));
} }
Action writeTrailingBytes() { Action writeTrailingBytes() {
return oatpp::data::stream::writeExactSizeDataAsyncInline(this, m_stream.get(), m_chunkSizeCurrDataPtr, m_chunkSizeBytesLeft, finish()); return oatpp::data::stream::writeExactSizeDataAsyncInline(this, m_stream.get(), m_chunkSizeWriteData, finish());
} }
}; };

View File

@ -68,8 +68,6 @@ ChunkedBufferBody::WriteToStreamCoroutine::WriteToStreamCoroutine(const std::sha
, m_stream(stream) , m_stream(stream)
, m_chunks(m_body->m_buffer->getChunks()) , m_chunks(m_body->m_buffer->getChunks())
, m_currChunk(m_chunks->getFirstNode()) , m_currChunk(m_chunks->getFirstNode())
, m_currData(nullptr)
, m_currDataSize(0)
, m_nextAction(Action::createActionByType(Action::TYPE_FINISH)) , m_nextAction(Action::createActionByType(Action::TYPE_FINISH))
{} {}
@ -81,36 +79,32 @@ async::Action ChunkedBufferBody::WriteToStreamCoroutine::act() {
} }
async::Action ChunkedBufferBody::WriteToStreamCoroutine::writeChunkSize() { async::Action ChunkedBufferBody::WriteToStreamCoroutine::writeChunkSize() {
m_currDataSize = oatpp::utils::conversion::primitiveToCharSequence(m_currChunk->getData()->size, m_buffer, "%X\r\n"); m_inlineWriteData.set(m_buffer, oatpp::utils::conversion::primitiveToCharSequence(m_currChunk->getData()->size, m_buffer, "%X\r\n"));
m_currData = m_buffer;
m_nextAction = yieldTo(&WriteToStreamCoroutine::writeChunkData); m_nextAction = yieldTo(&WriteToStreamCoroutine::writeChunkData);
return yieldTo(&WriteToStreamCoroutine::writeCurrData); return yieldTo(&WriteToStreamCoroutine::writeCurrData);
} }
async::Action ChunkedBufferBody::WriteToStreamCoroutine::writeChunkData() { async::Action ChunkedBufferBody::WriteToStreamCoroutine::writeChunkData() {
m_currData = m_currChunk->getData()->data; m_inlineWriteData.set(m_currChunk->getData()->data, m_currChunk->getData()->size);
m_currDataSize = (v_int32) m_currChunk->getData()->size;
m_nextAction = yieldTo(&WriteToStreamCoroutine::writeChunkSeparator); m_nextAction = yieldTo(&WriteToStreamCoroutine::writeChunkSeparator);
return yieldTo(&WriteToStreamCoroutine::writeCurrData); return yieldTo(&WriteToStreamCoroutine::writeCurrData);
} }
async::Action ChunkedBufferBody::WriteToStreamCoroutine::writeChunkSeparator() { async::Action ChunkedBufferBody::WriteToStreamCoroutine::writeChunkSeparator() {
m_currData = (void*) "\r\n"; m_inlineWriteData.set("\r\n", 2);
m_currDataSize = 2;
m_currChunk = m_currChunk->getNext(); m_currChunk = m_currChunk->getNext();
m_nextAction = yieldTo(&WriteToStreamCoroutine::act); m_nextAction = yieldTo(&WriteToStreamCoroutine::act);
return yieldTo(&WriteToStreamCoroutine::writeCurrData); return yieldTo(&WriteToStreamCoroutine::writeCurrData);
} }
async::Action ChunkedBufferBody::WriteToStreamCoroutine::writeEndOfChunks() { async::Action ChunkedBufferBody::WriteToStreamCoroutine::writeEndOfChunks() {
m_currData = (void*) "0\r\n\r\n"; m_inlineWriteData.set("0\r\n\r\n", 5);
m_currDataSize = 5;
m_nextAction = finish(); m_nextAction = finish();
return yieldTo(&WriteToStreamCoroutine::writeCurrData); return yieldTo(&WriteToStreamCoroutine::writeCurrData);
} }
async::Action ChunkedBufferBody::WriteToStreamCoroutine::writeCurrData() { async::Action ChunkedBufferBody::WriteToStreamCoroutine::writeCurrData() {
return oatpp::data::stream::writeExactSizeDataAsyncInline(this, m_stream.get(), m_currData, m_currDataSize, Action::clone(m_nextAction)); return oatpp::data::stream::writeExactSizeDataAsyncInline(this, m_stream.get(), m_inlineWriteData, Action::clone(m_nextAction));
} }
oatpp::async::CoroutineStarter ChunkedBufferBody::writeToStreamAsync(const std::shared_ptr<OutputStream>& stream) { oatpp::async::CoroutineStarter ChunkedBufferBody::writeToStreamAsync(const std::shared_ptr<OutputStream>& stream) {

View File

@ -86,9 +86,8 @@ public:
std::shared_ptr<OutputStream> m_stream; std::shared_ptr<OutputStream> m_stream;
std::shared_ptr<oatpp::data::stream::ChunkedBuffer::Chunks> m_chunks; std::shared_ptr<oatpp::data::stream::ChunkedBuffer::Chunks> m_chunks;
oatpp::data::stream::ChunkedBuffer::Chunks::LinkedListNode* m_currChunk; oatpp::data::stream::ChunkedBuffer::Chunks::LinkedListNode* m_currChunk;
const void* m_currData;
oatpp::data::v_io_size m_currDataSize;
Action m_nextAction; Action m_nextAction;
oatpp::data::stream::AsyncInlineWriteData m_inlineWriteData;
v_char8 m_buffer[16]; v_char8 m_buffer[16];
public: public:

View File

@ -166,8 +166,7 @@ MultipartBody::AsyncMultipartReadCallback::AsyncMultipartReadCallback(const std:
{} {}
oatpp::async::Action MultipartBody::AsyncMultipartReadCallback::readAsyncInline(oatpp::async::AbstractCoroutine* coroutine, oatpp::async::Action MultipartBody::AsyncMultipartReadCallback::readAsyncInline(oatpp::async::AbstractCoroutine* coroutine,
void*& currBufferPtr, oatpp::data::stream::AsyncInlineReadData& inlineData,
data::v_io_size& bytesLeftToRead,
oatpp::async::Action&& nextAction) oatpp::async::Action&& nextAction)
{ {
return std::forward<oatpp::async::Action>(nextAction); return std::forward<oatpp::async::Action>(nextAction);

View File

@ -96,8 +96,7 @@ private:
AsyncMultipartReadCallback(const std::shared_ptr<Multipart>& multipart); AsyncMultipartReadCallback(const std::shared_ptr<Multipart>& multipart);
oatpp::async::Action readAsyncInline(oatpp::async::AbstractCoroutine* coroutine, oatpp::async::Action readAsyncInline(oatpp::async::AbstractCoroutine* coroutine,
void*& currBufferPtr, oatpp::data::stream::AsyncInlineReadData& inlineData,
data::v_io_size& bytesLeftToRead,
oatpp::async::Action&& nextAction) override; oatpp::async::Action&& nextAction) override;
}; };

View File

@ -29,6 +29,7 @@
#include "oatpp/web/mime/multipart/InMemoryReader.hpp" #include "oatpp/web/mime/multipart/InMemoryReader.hpp"
#include "oatpp/web/protocol/http/outgoing/MultipartBody.hpp"
#include "oatpp/web/protocol/http/outgoing/ChunkedBody.hpp" #include "oatpp/web/protocol/http/outgoing/ChunkedBody.hpp"
#include "oatpp/web/server/api/ApiController.hpp" #include "oatpp/web/server/api/ApiController.hpp"
@ -163,6 +164,7 @@ public:
ENDPOINT("POST", "test/multipart", multipartTest, REQUEST(std::shared_ptr<IncomingRequest>, request)) { ENDPOINT("POST", "test/multipart", multipartTest, REQUEST(std::shared_ptr<IncomingRequest>, request)) {
/*
oatpp::web::mime::multipart::Multipart multipart(request->getHeaders()); oatpp::web::mime::multipart::Multipart multipart(request->getHeaders());
oatpp::web::mime::multipart::InMemoryReader multipartReader(&multipart); oatpp::web::mime::multipart::InMemoryReader multipartReader(&multipart);
request->transferBody(&multipartReader); request->transferBody(&multipartReader);
@ -170,8 +172,40 @@ public:
for(auto& part : multipart.getAllParts()) { for(auto& part : multipart.getAllParts()) {
OATPP_LOGD("multipart", "name='%s', value='%s'", part->getName()->getData(), part->getInMemoryData()->getData()); OATPP_LOGD("multipart", "name='%s', value='%s'", part->getName()->getData(), part->getInMemoryData()->getData());
} }
*/
return createResponse(Status::CODE_200, ""); oatpp::data::stream::ChunkedBuffer stream;
request->transferBodyToStream(&stream);
return createResponse(Status::CODE_200, stream.toString());
}
ENDPOINT("GET", "test/multipart", multipartGetTest) {
auto multipart = std::make_shared<oatpp::web::mime::multipart::Multipart>("0--qwerty1234--0");
{
oatpp::web::mime::multipart::Headers partHeaders;
auto part = std::make_shared<oatpp::web::mime::multipart::Part>(partHeaders);
multipart->addPart(part);
part->putHeader("Content-Disposition", "form-data; name=\"part1\"");
oatpp::String data = "Hello";
part->setDataInfo(std::make_shared<oatpp::data::stream::BufferInputStream>(data));
}
{
oatpp::web::mime::multipart::Headers partHeaders;
auto part = std::make_shared<oatpp::web::mime::multipart::Part>(partHeaders);
multipart->addPart(part);
part->putHeader("Content-Disposition", "form-data; filename=\"file2.txt\"");
oatpp::String data = "World";
part->setDataInfo(std::make_shared<oatpp::data::stream::BufferInputStream>(data));
}
auto body = std::make_shared<oatpp::web::protocol::http::outgoing::MultipartBody>(multipart);
return OutgoingResponse::createShared(Status::CODE_200, body);
} }

View File

@ -142,14 +142,12 @@ public:
{} {}
oatpp::async::Action readAsyncInline(oatpp::async::AbstractCoroutine* coroutine, oatpp::async::Action readAsyncInline(oatpp::async::AbstractCoroutine* coroutine,
void*& currBufferPtr, oatpp::data::stream::AsyncInlineReadData& inlineData,
data::v_io_size& bytesLeftToRead,
oatpp::async::Action&& nextAction) override oatpp::async::Action&& nextAction) override
{ {
if(m_counter < m_iterations) { if(m_counter < m_iterations) {
std::memcpy(currBufferPtr, m_text->getData(), m_text->getSize()); std::memcpy(inlineData.currBufferPtr, m_text->getData(), m_text->getSize());
currBufferPtr = &((p_char8) currBufferPtr)[m_text->getSize()]; inlineData.inc(m_text->getSize());
bytesLeftToRead -= m_text->getSize();
} }
m_counter ++; m_counter ++;
return std::forward<oatpp::async::Action>(nextAction); return std::forward<oatpp::async::Action>(nextAction);