diff --git a/core/data/stream/ChunkedBuffer.cpp b/core/data/stream/ChunkedBuffer.cpp index 0d5fe107..8196fdc6 100644 --- a/core/data/stream/ChunkedBuffer.cpp +++ b/core/data/stream/ChunkedBuffer.cpp @@ -220,6 +220,9 @@ oatpp::async::Action ChunkedBuffer::flushToStreamAsync(oatpp::async::AbstractCor std::shared_ptr m_stream; ChunkEntry* m_currEntry; os::io::Library::v_size m_bytesLeft; + const void* m_currData; + os::io::Library::v_size m_currDataSize; + Action m_nextAction; public: FlushCoroutine(const std::shared_ptr& chunkedBuffer, @@ -228,6 +231,9 @@ oatpp::async::Action ChunkedBuffer::flushToStreamAsync(oatpp::async::AbstractCor , m_stream(stream) , m_currEntry(chunkedBuffer->m_firstEntry) , m_bytesLeft(chunkedBuffer->m_size) + , m_currData(nullptr) + , m_currDataSize(0) + , m_nextAction(Action(Action::TYPE_FINISH, nullptr, nullptr)) {} Action act() override { @@ -237,30 +243,35 @@ oatpp::async::Action ChunkedBuffer::flushToStreamAsync(oatpp::async::AbstractCor } if(m_bytesLeft > CHUNK_ENTRY_SIZE) { - auto res = m_stream->write(m_currEntry->chunk, CHUNK_ENTRY_SIZE); - if(res == oatpp::data::stream::IOStream::ERROR_TRY_AGAIN){ - return waitRetry(); - } else if(res != CHUNK_ENTRY_SIZE) { - return error(ERROR_ASYNC_FAILED_TO_WRITE_ALL_DATA); - } - m_bytesLeft -= res; + m_currData = m_currEntry->chunk; + m_currDataSize = CHUNK_ENTRY_SIZE; + m_nextAction = yieldTo(&FlushCoroutine::act); + m_currEntry = m_currEntry->next; + return yieldTo(&FlushCoroutine::writeCurrData); } else { - auto res = m_stream->write(m_currEntry->chunk, m_bytesLeft); - if(res == oatpp::data::stream::IOStream::ERROR_TRY_AGAIN){ - return waitRetry(); - } else if(res != m_bytesLeft) { - return error(ERROR_ASYNC_FAILED_TO_WRITE_ALL_DATA); - } + m_currData = m_currEntry->chunk; + m_currDataSize = m_bytesLeft; + m_nextAction = yieldTo(&FlushCoroutine::act); + m_currEntry = m_currEntry->next; + return yieldTo(&FlushCoroutine::writeCurrData); + } + + } + + Action writeCurrData() { + auto res = m_stream->write(m_currData, m_currDataSize); + if(res == oatpp::data::stream::IOStream::ERROR_TRY_AGAIN) { + return waitRetry(); + } else if( res < 0) { + return error(ERROR_ASYNC_FAILED_TO_WRITE_ALL_DATA); + } else if(res < m_currDataSize) { + m_currData = &((p_char8) m_currData)[res]; + m_currDataSize = m_currDataSize - res; m_bytesLeft -= res; + return repeat(); } - - m_currEntry = m_currEntry->next; - - if(m_currEntry == nullptr) { - return finish(); - } - return repeat(); - + m_bytesLeft -= res; + return m_nextAction; } }; diff --git a/web/protocol/http/outgoing/ChunkedBufferBody.hpp b/web/protocol/http/outgoing/ChunkedBufferBody.hpp index 8b5f75bb..e22a39cd 100644 --- a/web/protocol/http/outgoing/ChunkedBufferBody.hpp +++ b/web/protocol/http/outgoing/ChunkedBufferBody.hpp @@ -93,7 +93,10 @@ public: std::shared_ptr m_stream; std::shared_ptr m_chunks; oatpp::data::stream::ChunkedBuffer::Chunks::LinkedListNode* m_currChunk; - v_int32 m_whileState; + const void* m_currData; + v_int32 m_currDataSize; + Action m_nextAction; + v_char8 m_buffer[16]; public: WriteToStreamCoroutine(const std::shared_ptr& body, @@ -102,51 +105,59 @@ public: , m_stream(stream) , m_chunks(m_body->m_buffer->getChunks()) , m_currChunk(m_chunks->getFirstNode()) - , m_whileState(0) + , m_currData(nullptr) + , m_currDataSize(0) + , m_nextAction(Action(Action::TYPE_FINISH, nullptr, nullptr)) {} Action act() override { - if(m_currChunk == nullptr) { - m_whileState = 3; + return yieldTo(&WriteToStreamCoroutine::writeEndOfChunks); } - - if(m_whileState == 0) { - auto res = m_stream->write(oatpp::utils::conversion::primitiveToStr(m_currChunk->getData()->size, "%X\r\n")); - if(res == oatpp::data::stream::IOStream::ERROR_TRY_AGAIN) { - return waitRetry(); - } else if(res < 1) { - return error(ERROR_FAILED_TO_WRITE_DATA); - } - m_whileState = 1; - } else if(m_whileState == 1) { - auto res = m_stream->write(m_currChunk->getData()->data, m_currChunk->getData()->size); - if(res == oatpp::data::stream::IOStream::ERROR_TRY_AGAIN) { - return waitRetry(); - } else if(res < m_currChunk->getData()->size) { - return error(ERROR_FAILED_TO_WRITE_DATA); - } - m_whileState = 2; - } else if(m_whileState == 2) { - auto res = m_stream->write("\r\n", 2); - if(res == oatpp::data::stream::IOStream::ERROR_TRY_AGAIN) { - return waitRetry(); - } else if(res < 2) { - return error(ERROR_FAILED_TO_WRITE_DATA); - } - m_whileState = 3; - } else if(m_whileState == 3) { - auto res = m_stream->write("0\r\n\r\n", 5); - if(res == oatpp::data::stream::IOStream::ERROR_TRY_AGAIN) { - return waitRetry(); - } else if(res < 5) { - return error(ERROR_FAILED_TO_WRITE_DATA); - } - return finish(); - } - + return yieldTo(&WriteToStreamCoroutine::writeChunkSize); + } + + Action writeChunkSize() { + m_currDataSize = oatpp::utils::conversion::primitiveToCharSequence(m_currChunk->getData()->size, m_buffer, "%X\r\n"); + m_currData = m_buffer; + m_nextAction = yieldTo(&WriteToStreamCoroutine::writeChunkData); + return yieldTo(&WriteToStreamCoroutine::writeCurrData); + } + + Action writeChunkData() { + m_currData = m_currChunk->getData()->data; + m_currDataSize = (v_int32) m_currChunk->getData()->size; + m_nextAction = yieldTo(&WriteToStreamCoroutine::writeChunkSeparator); + return yieldTo(&WriteToStreamCoroutine::writeCurrData); + } + + Action writeChunkSeparator() { + m_currData = (void*) "\r\n"; + m_currDataSize = 2; m_currChunk = m_currChunk->getNext(); - return repeat(); + m_nextAction = yieldTo(&WriteToStreamCoroutine::act); + return yieldTo(&WriteToStreamCoroutine::writeCurrData); + } + + Action writeEndOfChunks() { + m_currData = (void*) "0\r\n\r\n"; + m_currDataSize = 5; + m_nextAction = finish(); + return yieldTo(&WriteToStreamCoroutine::writeCurrData); + } + + Action writeCurrData() { + auto res = m_stream->write(m_currData, m_currDataSize); + if(res == oatpp::data::stream::IOStream::ERROR_TRY_AGAIN) { + return waitRetry(); + } else if( res < 0) { + return error(ERROR_FAILED_TO_WRITE_DATA); + } else if(res < m_currDataSize) { + m_currData = &((p_char8) m_currData)[res]; + m_currDataSize = m_currDataSize - (v_int32) res; + return repeat(); + } + return m_nextAction; } };