mirror of
https://github.com/oatpp/oatpp.git
synced 2025-01-06 16:24:27 +08:00
Fix to FIFOBuffer flush async
This commit is contained in:
parent
df2306438b
commit
2086a362d1
@ -288,13 +288,13 @@ data::v_io_size FIFOBuffer::flushToStream(data::stream::OutputStream& stream) {
|
||||
|
||||
oatpp::async::Action FIFOBuffer::flushToStreamAsync(oatpp::async::AbstractCoroutine* parentCoroutine,
|
||||
const oatpp::async::Action& actionOnFinish,
|
||||
data::stream::OutputStream& stream)
|
||||
const std::shared_ptr<data::stream::OutputStream>& stream)
|
||||
{
|
||||
|
||||
class FlushCoroutine : public oatpp::async::Coroutine<FlushCoroutine> {
|
||||
private:
|
||||
FIFOBuffer* m_fifo;
|
||||
data::stream::OutputStream* m_stream;
|
||||
std::shared_ptr<FIFOBuffer> m_fifo;
|
||||
std::shared_ptr<data::stream::OutputStream> m_stream;
|
||||
private:
|
||||
|
||||
const void* m_data1;
|
||||
@ -304,7 +304,7 @@ oatpp::async::Action FIFOBuffer::flushToStreamAsync(oatpp::async::AbstractCorout
|
||||
data::v_io_size m_size2;
|
||||
public:
|
||||
|
||||
FlushCoroutine(FIFOBuffer* fifo, data::stream::OutputStream* stream)
|
||||
FlushCoroutine(const std::shared_ptr<FIFOBuffer>& fifo, const std::shared_ptr<data::stream::OutputStream>& stream)
|
||||
: m_fifo(fifo)
|
||||
, m_stream(stream)
|
||||
{}
|
||||
@ -336,15 +336,15 @@ oatpp::async::Action FIFOBuffer::flushToStreamAsync(oatpp::async::AbstractCorout
|
||||
}
|
||||
|
||||
Action fullFlush() {
|
||||
return data::stream::writeExactSizeDataAsyncInline(m_stream, m_data1, m_size1, yieldTo(&FlushCoroutine::beforeFinish));
|
||||
return data::stream::writeExactSizeDataAsyncInline(m_stream.get(), m_data1, m_size1, yieldTo(&FlushCoroutine::beforeFinish));
|
||||
}
|
||||
|
||||
Action partialFlush1() {
|
||||
return data::stream::writeExactSizeDataAsyncInline(m_stream, m_data1, m_size1, yieldTo(&FlushCoroutine::partialFlush2));
|
||||
return data::stream::writeExactSizeDataAsyncInline(m_stream.get(), m_data1, m_size1, yieldTo(&FlushCoroutine::partialFlush2));
|
||||
}
|
||||
|
||||
Action partialFlush2() {
|
||||
return data::stream::writeExactSizeDataAsyncInline(m_stream, m_data2, m_size2, yieldTo(&FlushCoroutine::beforeFinish));
|
||||
return data::stream::writeExactSizeDataAsyncInline(m_stream.get(), m_data2, m_size2, yieldTo(&FlushCoroutine::beforeFinish));
|
||||
}
|
||||
|
||||
Action beforeFinish() {
|
||||
@ -354,7 +354,7 @@ oatpp::async::Action FIFOBuffer::flushToStreamAsync(oatpp::async::AbstractCorout
|
||||
|
||||
};
|
||||
|
||||
return parentCoroutine->startCoroutine<FlushCoroutine>(actionOnFinish, this, &stream);
|
||||
return parentCoroutine->startCoroutine<FlushCoroutine>(actionOnFinish, shared_from_this(), stream);
|
||||
|
||||
}
|
||||
|
||||
|
@ -36,7 +36,7 @@ namespace oatpp { namespace data { namespace buffer {
|
||||
* FIFO operations over the buffer
|
||||
* !FIFOBuffer is NOT an IOStream despite having similar APIs!
|
||||
*/
|
||||
class FIFOBuffer {
|
||||
class FIFOBuffer : public std::enable_shared_from_this<FIFOBuffer> {
|
||||
private:
|
||||
p_char8 m_buffer;
|
||||
v_io_size m_bufferSize;
|
||||
@ -132,7 +132,7 @@ public:
|
||||
*/
|
||||
oatpp::async::Action flushToStreamAsync(oatpp::async::AbstractCoroutine* parentCoroutine,
|
||||
const oatpp::async::Action& actionOnFinish,
|
||||
data::stream::OutputStream& stream);
|
||||
const std::shared_ptr<data::stream::OutputStream>& stream);
|
||||
|
||||
|
||||
};
|
||||
|
@ -27,24 +27,24 @@
|
||||
namespace oatpp { namespace data{ namespace stream {
|
||||
|
||||
data::v_io_size OutputStreamBufferedProxy::write(const void *data, data::v_io_size count) {
|
||||
if(m_buffer.availableToWrite() > 0) {
|
||||
return m_buffer.write(data, count);
|
||||
if(m_buffer->availableToWrite() > 0) {
|
||||
return m_buffer->write(data, count);
|
||||
} else {
|
||||
auto bytesFlushed = m_buffer.readAndWriteToStream(*m_outputStream, m_buffer.getBufferSize());
|
||||
auto bytesFlushed = m_buffer->readAndWriteToStream(*m_outputStream, m_buffer->getBufferSize());
|
||||
if(bytesFlushed > 0) {
|
||||
return m_buffer.write(data, count);
|
||||
return m_buffer->write(data, count);
|
||||
}
|
||||
return bytesFlushed;
|
||||
}
|
||||
}
|
||||
|
||||
data::v_io_size OutputStreamBufferedProxy::flush() {
|
||||
return m_buffer.flushToStream(*m_outputStream);
|
||||
return m_buffer->flushToStream(*m_outputStream);
|
||||
}
|
||||
|
||||
oatpp::async::Action OutputStreamBufferedProxy::flushAsync(oatpp::async::AbstractCoroutine* parentCoroutine,
|
||||
const oatpp::async::Action& actionOnFinish) {
|
||||
return m_buffer.flushToStreamAsync(parentCoroutine, actionOnFinish, *m_outputStream);
|
||||
const oatpp::async::Action& actionOnFinish) {
|
||||
return m_buffer->flushToStreamAsync(parentCoroutine, actionOnFinish, m_outputStream);
|
||||
}
|
||||
|
||||
data::v_io_size InputStreamBufferedProxy::read(void *data, data::v_io_size count) {
|
||||
|
@ -41,7 +41,7 @@ public:
|
||||
private:
|
||||
std::shared_ptr<OutputStream> m_outputStream;
|
||||
std::shared_ptr<oatpp::data::buffer::IOBuffer> m_bufferPtr;
|
||||
buffer::FIFOBuffer m_buffer;
|
||||
std::shared_ptr<buffer::FIFOBuffer> m_buffer;
|
||||
public:
|
||||
OutputStreamBufferedProxy(const std::shared_ptr<OutputStream>& outputStream,
|
||||
const std::shared_ptr<oatpp::data::buffer::IOBuffer>& bufferPtr,
|
||||
@ -49,12 +49,12 @@ public:
|
||||
v_bufferSize bufferSize)
|
||||
: m_outputStream(outputStream)
|
||||
, m_bufferPtr(bufferPtr)
|
||||
, m_buffer(buffer, bufferSize)
|
||||
, m_buffer(std::make_shared<buffer::FIFOBuffer>(buffer, bufferSize))
|
||||
{}
|
||||
public:
|
||||
|
||||
static std::shared_ptr<OutputStreamBufferedProxy> createShared(const std::shared_ptr<OutputStream>& outputStream,
|
||||
const std::shared_ptr<oatpp::data::buffer::IOBuffer>& buffer)
|
||||
const std::shared_ptr<oatpp::data::buffer::IOBuffer>& buffer)
|
||||
{
|
||||
return Shared_OutputStreamBufferedProxy_Pool::allocateShared(outputStream,
|
||||
buffer,
|
||||
@ -78,7 +78,7 @@ public:
|
||||
const oatpp::async::Action& actionOnFinish);
|
||||
|
||||
void setBufferPosition(data::v_io_size readPosition, data::v_io_size writePosition, bool canRead) {
|
||||
m_buffer.setBufferPosition(readPosition, writePosition, canRead);
|
||||
m_buffer->setBufferPosition(readPosition, writePosition, canRead);
|
||||
}
|
||||
|
||||
};
|
||||
|
Loading…
Reference in New Issue
Block a user