Fixed data flushing for ChunkedBuffer

This commit is contained in:
lganzzzo 2018-03-29 17:02:37 +03:00
parent 6984994f2e
commit e3afc00044
2 changed files with 83 additions and 61 deletions

View File

@ -220,6 +220,9 @@ oatpp::async::Action ChunkedBuffer::flushToStreamAsync(oatpp::async::AbstractCor
std::shared_ptr<OutputStream> m_stream;
ChunkEntry* m_currEntry;
os::io::Library::v_size m_bytesLeft;
const void* m_currData;
os::io::Library::v_size m_currDataSize;
Action m_nextAction;
public:
FlushCoroutine(const std::shared_ptr<ChunkedBuffer>& chunkedBuffer,
@ -228,6 +231,9 @@ oatpp::async::Action ChunkedBuffer::flushToStreamAsync(oatpp::async::AbstractCor
, m_stream(stream)
, m_currEntry(chunkedBuffer->m_firstEntry)
, m_bytesLeft(chunkedBuffer->m_size)
, m_currData(nullptr)
, m_currDataSize(0)
, m_nextAction(Action(Action::TYPE_FINISH, nullptr, nullptr))
{}
Action act() override {
@ -237,30 +243,35 @@ oatpp::async::Action ChunkedBuffer::flushToStreamAsync(oatpp::async::AbstractCor
}
if(m_bytesLeft > CHUNK_ENTRY_SIZE) {
auto res = m_stream->write(m_currEntry->chunk, CHUNK_ENTRY_SIZE);
if(res == oatpp::data::stream::IOStream::ERROR_TRY_AGAIN){
return waitRetry();
} else if(res != CHUNK_ENTRY_SIZE) {
return error(ERROR_ASYNC_FAILED_TO_WRITE_ALL_DATA);
}
m_bytesLeft -= res;
m_currData = m_currEntry->chunk;
m_currDataSize = CHUNK_ENTRY_SIZE;
m_nextAction = yieldTo(&FlushCoroutine::act);
m_currEntry = m_currEntry->next;
return yieldTo(&FlushCoroutine::writeCurrData);
} else {
auto res = m_stream->write(m_currEntry->chunk, m_bytesLeft);
if(res == oatpp::data::stream::IOStream::ERROR_TRY_AGAIN){
return waitRetry();
} else if(res != m_bytesLeft) {
return error(ERROR_ASYNC_FAILED_TO_WRITE_ALL_DATA);
}
m_currData = m_currEntry->chunk;
m_currDataSize = m_bytesLeft;
m_nextAction = yieldTo(&FlushCoroutine::act);
m_currEntry = m_currEntry->next;
return yieldTo(&FlushCoroutine::writeCurrData);
}
}
Action writeCurrData() {
auto res = m_stream->write(m_currData, m_currDataSize);
if(res == oatpp::data::stream::IOStream::ERROR_TRY_AGAIN) {
return waitRetry();
} else if( res < 0) {
return error(ERROR_ASYNC_FAILED_TO_WRITE_ALL_DATA);
} else if(res < m_currDataSize) {
m_currData = &((p_char8) m_currData)[res];
m_currDataSize = m_currDataSize - res;
m_bytesLeft -= res;
return repeat();
}
m_currEntry = m_currEntry->next;
if(m_currEntry == nullptr) {
return finish();
}
return repeat();
m_bytesLeft -= res;
return m_nextAction;
}
};

View File

@ -93,7 +93,10 @@ public:
std::shared_ptr<OutputStream> m_stream;
std::shared_ptr<oatpp::data::stream::ChunkedBuffer::Chunks> m_chunks;
oatpp::data::stream::ChunkedBuffer::Chunks::LinkedListNode* m_currChunk;
v_int32 m_whileState;
const void* m_currData;
v_int32 m_currDataSize;
Action m_nextAction;
v_char8 m_buffer[16];
public:
WriteToStreamCoroutine(const std::shared_ptr<ChunkedBufferBody>& body,
@ -102,51 +105,59 @@ public:
, m_stream(stream)
, m_chunks(m_body->m_buffer->getChunks())
, m_currChunk(m_chunks->getFirstNode())
, m_whileState(0)
, m_currData(nullptr)
, m_currDataSize(0)
, m_nextAction(Action(Action::TYPE_FINISH, nullptr, nullptr))
{}
Action act() override {
if(m_currChunk == nullptr) {
m_whileState = 3;
return yieldTo(&WriteToStreamCoroutine::writeEndOfChunks);
}
if(m_whileState == 0) {
auto res = m_stream->write(oatpp::utils::conversion::primitiveToStr(m_currChunk->getData()->size, "%X\r\n"));
if(res == oatpp::data::stream::IOStream::ERROR_TRY_AGAIN) {
return waitRetry();
} else if(res < 1) {
return error(ERROR_FAILED_TO_WRITE_DATA);
}
m_whileState = 1;
} else if(m_whileState == 1) {
auto res = m_stream->write(m_currChunk->getData()->data, m_currChunk->getData()->size);
if(res == oatpp::data::stream::IOStream::ERROR_TRY_AGAIN) {
return waitRetry();
} else if(res < m_currChunk->getData()->size) {
return error(ERROR_FAILED_TO_WRITE_DATA);
}
m_whileState = 2;
} else if(m_whileState == 2) {
auto res = m_stream->write("\r\n", 2);
if(res == oatpp::data::stream::IOStream::ERROR_TRY_AGAIN) {
return waitRetry();
} else if(res < 2) {
return error(ERROR_FAILED_TO_WRITE_DATA);
}
m_whileState = 3;
} else if(m_whileState == 3) {
auto res = m_stream->write("0\r\n\r\n", 5);
if(res == oatpp::data::stream::IOStream::ERROR_TRY_AGAIN) {
return waitRetry();
} else if(res < 5) {
return error(ERROR_FAILED_TO_WRITE_DATA);
}
return finish();
}
return yieldTo(&WriteToStreamCoroutine::writeChunkSize);
}
Action writeChunkSize() {
m_currDataSize = oatpp::utils::conversion::primitiveToCharSequence(m_currChunk->getData()->size, m_buffer, "%X\r\n");
m_currData = m_buffer;
m_nextAction = yieldTo(&WriteToStreamCoroutine::writeChunkData);
return yieldTo(&WriteToStreamCoroutine::writeCurrData);
}
Action writeChunkData() {
m_currData = m_currChunk->getData()->data;
m_currDataSize = (v_int32) m_currChunk->getData()->size;
m_nextAction = yieldTo(&WriteToStreamCoroutine::writeChunkSeparator);
return yieldTo(&WriteToStreamCoroutine::writeCurrData);
}
Action writeChunkSeparator() {
m_currData = (void*) "\r\n";
m_currDataSize = 2;
m_currChunk = m_currChunk->getNext();
return repeat();
m_nextAction = yieldTo(&WriteToStreamCoroutine::act);
return yieldTo(&WriteToStreamCoroutine::writeCurrData);
}
Action writeEndOfChunks() {
m_currData = (void*) "0\r\n\r\n";
m_currDataSize = 5;
m_nextAction = finish();
return yieldTo(&WriteToStreamCoroutine::writeCurrData);
}
Action writeCurrData() {
auto res = m_stream->write(m_currData, m_currDataSize);
if(res == oatpp::data::stream::IOStream::ERROR_TRY_AGAIN) {
return waitRetry();
} else if( res < 0) {
return error(ERROR_FAILED_TO_WRITE_DATA);
} else if(res < m_currDataSize) {
m_currData = &((p_char8) m_currData)[res];
m_currDataSize = m_currDataSize - (v_int32) res;
return repeat();
}
return m_nextAction;
}
};