mirror of
https://github.com/oatpp/oatpp.git
synced 2025-04-12 18:50:22 +08:00
better stream io api
This commit is contained in:
parent
b0f0013a93
commit
2eb807f1d7
@ -261,7 +261,7 @@ oatpp::async::Action ChunkedBuffer::flushToStreamAsync(oatpp::async::AbstractCor
|
||||
}
|
||||
|
||||
Action writeCurrData() {
|
||||
return IOStream::writeDataAsyncInline(m_stream.get(), m_currData, m_currDataSize, m_nextAction);
|
||||
return oatpp::data::stream::writeDataAsyncInline(m_stream.get(), m_currData, m_currDataSize, m_nextAction);
|
||||
}
|
||||
|
||||
};
|
||||
|
@ -71,69 +71,6 @@ os::io::Library::v_size OutputStream::writeAsString(bool value) {
|
||||
}
|
||||
}
|
||||
|
||||
oatpp::async::Action IOStream::writeDataAsyncInline(oatpp::data::stream::OutputStream* stream,
|
||||
const void*& data,
|
||||
os::io::Library::v_size& size,
|
||||
const oatpp::async::Action& nextAction) {
|
||||
auto res = stream->write(data, size);
|
||||
if(res == oatpp::data::stream::IOStream::ERROR_IO_WAIT_RETRY) {
|
||||
return oatpp::async::Action::_WAIT_RETRY;
|
||||
} else if(res == oatpp::data::stream::IOStream::ERROR_IO_RETRY) {
|
||||
return oatpp::async::Action::_REPEAT;
|
||||
} else if(res == oatpp::data::stream::IOStream::ERROR_IO_PIPE) {
|
||||
return oatpp::async::Action::_ABORT;
|
||||
} else if( res < 0) {
|
||||
return oatpp::async::Action(oatpp::async::Error(ERROR_ASYNC_FAILED_TO_WRITE_DATA));
|
||||
} else if(res < size) {
|
||||
data = &((p_char8) data)[res];
|
||||
size = size - res;
|
||||
return oatpp::async::Action::_REPEAT;
|
||||
}
|
||||
return nextAction;
|
||||
}
|
||||
|
||||
oatpp::async::Action IOStream::readSomeDataAsyncInline(oatpp::data::stream::InputStream* stream,
|
||||
void*& data,
|
||||
os::io::Library::v_size& bytesLeftToRead,
|
||||
const oatpp::async::Action& nextAction) {
|
||||
auto res = stream->read(data, bytesLeftToRead);
|
||||
if(res == oatpp::data::stream::IOStream::ERROR_IO_WAIT_RETRY) {
|
||||
return oatpp::async::Action::_WAIT_RETRY;
|
||||
} else if(res == oatpp::data::stream::IOStream::ERROR_IO_RETRY) {
|
||||
return oatpp::async::Action::_REPEAT;
|
||||
} else if( res < 0) {
|
||||
return oatpp::async::Action(oatpp::async::Error(ERROR_ASYNC_FAILED_TO_READ_DATA));
|
||||
} else if(res < bytesLeftToRead) {
|
||||
data = &((p_char8) data)[res];
|
||||
bytesLeftToRead -= res;
|
||||
return nextAction;
|
||||
}
|
||||
bytesLeftToRead -= res;
|
||||
return nextAction;
|
||||
}
|
||||
|
||||
oatpp::async::Action IOStream::readExactSizeDataAsyncInline(oatpp::data::stream::InputStream* stream,
|
||||
void*& data,
|
||||
os::io::Library::v_size& bytesLeftToRead,
|
||||
const oatpp::async::Action& nextAction) {
|
||||
auto res = stream->read(data, bytesLeftToRead);
|
||||
if(res == oatpp::data::stream::IOStream::ERROR_IO_WAIT_RETRY) {
|
||||
return oatpp::async::Action::_WAIT_RETRY;
|
||||
} else if(res == oatpp::data::stream::IOStream::ERROR_IO_RETRY) {
|
||||
return oatpp::async::Action::_REPEAT;
|
||||
} else if(res == oatpp::data::stream::IOStream::ERROR_IO_PIPE) {
|
||||
return oatpp::async::Action::_ABORT;
|
||||
} else if( res < 0) {
|
||||
return oatpp::async::Action(oatpp::async::Error(ERROR_ASYNC_FAILED_TO_READ_DATA));
|
||||
} else if(res < bytesLeftToRead) {
|
||||
data = &((p_char8) data)[res];
|
||||
bytesLeftToRead -= res;
|
||||
return oatpp::async::Action::_REPEAT;
|
||||
}
|
||||
bytesLeftToRead -= res;
|
||||
return nextAction;
|
||||
}
|
||||
|
||||
// Functions
|
||||
|
||||
const std::shared_ptr<OutputStream>& operator <<
|
||||
@ -247,7 +184,7 @@ oatpp::async::Action transferAsync(oatpp::async::AbstractCoroutine* parentCorout
|
||||
}
|
||||
|
||||
Action doRead() {
|
||||
return oatpp::data::stream::IOStream::readSomeDataAsyncInline(m_fromStream.get(),
|
||||
return oatpp::data::stream::readSomeDataAsyncInline(m_fromStream.get(),
|
||||
m_readBufferPtr,
|
||||
m_bytesLeft,
|
||||
yieldTo(&TransferCoroutine::prepareWrite));
|
||||
@ -260,7 +197,7 @@ oatpp::async::Action transferAsync(oatpp::async::AbstractCoroutine* parentCorout
|
||||
}
|
||||
|
||||
Action doWrite() {
|
||||
return oatpp::data::stream::IOStream::writeDataAsyncInline(m_toStream.get(),
|
||||
return oatpp::data::stream::writeDataAsyncInline(m_toStream.get(),
|
||||
m_writeBufferPtr,
|
||||
m_bytesLeft,
|
||||
yieldTo(&TransferCoroutine::act));
|
||||
@ -272,4 +209,83 @@ oatpp::async::Action transferAsync(oatpp::async::AbstractCoroutine* parentCorout
|
||||
|
||||
}
|
||||
|
||||
oatpp::async::Action writeDataAsyncInline(oatpp::data::stream::OutputStream* stream,
|
||||
const void*& data,
|
||||
os::io::Library::v_size& size,
|
||||
const oatpp::async::Action& nextAction) {
|
||||
auto res = stream->write(data, size);
|
||||
if(res == oatpp::data::stream::Errors::ERROR_IO_WAIT_RETRY) {
|
||||
return oatpp::async::Action::_WAIT_RETRY;
|
||||
} else if(res == oatpp::data::stream::Errors::ERROR_IO_RETRY) {
|
||||
return oatpp::async::Action::_REPEAT;
|
||||
} else if(res == oatpp::data::stream::Errors::ERROR_IO_PIPE) {
|
||||
return oatpp::async::Action::_ABORT;
|
||||
} else if( res < 0) {
|
||||
return oatpp::async::Action(oatpp::async::Error(Errors::ERROR_ASYNC_FAILED_TO_WRITE_DATA));
|
||||
} else if(res < size) {
|
||||
data = &((p_char8) data)[res];
|
||||
size = size - res;
|
||||
return oatpp::async::Action::_REPEAT;
|
||||
}
|
||||
return nextAction;
|
||||
}
|
||||
|
||||
oatpp::async::Action readSomeDataAsyncInline(oatpp::data::stream::InputStream* stream,
|
||||
void*& data,
|
||||
os::io::Library::v_size& bytesLeftToRead,
|
||||
const oatpp::async::Action& nextAction) {
|
||||
auto res = stream->read(data, bytesLeftToRead);
|
||||
if(res == oatpp::data::stream::Errors::ERROR_IO_WAIT_RETRY) {
|
||||
return oatpp::async::Action::_WAIT_RETRY;
|
||||
} else if(res == oatpp::data::stream::Errors::ERROR_IO_RETRY) {
|
||||
return oatpp::async::Action::_REPEAT;
|
||||
} else if( res < 0) {
|
||||
return oatpp::async::Action(oatpp::async::Error(Errors::ERROR_ASYNC_FAILED_TO_READ_DATA));
|
||||
} else if(res < bytesLeftToRead) {
|
||||
data = &((p_char8) data)[res];
|
||||
bytesLeftToRead -= res;
|
||||
return nextAction;
|
||||
}
|
||||
bytesLeftToRead -= res;
|
||||
return nextAction;
|
||||
}
|
||||
|
||||
oatpp::async::Action readExactSizeDataAsyncInline(oatpp::data::stream::InputStream* stream,
|
||||
void*& data,
|
||||
os::io::Library::v_size& bytesLeftToRead,
|
||||
const oatpp::async::Action& nextAction) {
|
||||
auto res = stream->read(data, bytesLeftToRead);
|
||||
if(res == oatpp::data::stream::Errors::ERROR_IO_WAIT_RETRY) {
|
||||
return oatpp::async::Action::_WAIT_RETRY;
|
||||
} else if(res == oatpp::data::stream::Errors::ERROR_IO_RETRY) {
|
||||
return oatpp::async::Action::_REPEAT;
|
||||
} else if(res == oatpp::data::stream::Errors::ERROR_IO_PIPE) {
|
||||
return oatpp::async::Action::_ABORT;
|
||||
} else if( res < 0) {
|
||||
return oatpp::async::Action(oatpp::async::Error(Errors::ERROR_ASYNC_FAILED_TO_READ_DATA));
|
||||
} else if(res < bytesLeftToRead) {
|
||||
data = &((p_char8) data)[res];
|
||||
bytesLeftToRead -= res;
|
||||
return oatpp::async::Action::_REPEAT;
|
||||
}
|
||||
bytesLeftToRead -= res;
|
||||
return nextAction;
|
||||
}
|
||||
|
||||
oatpp::os::io::Library::v_size writeExactSizeData(oatpp::data::stream::OutputStream* stream, const void* data, os::io::Library::v_size size) {
|
||||
const char* buffer = (char*)data;
|
||||
oatpp::os::io::Library::v_size progress = 0;
|
||||
while (progress < size) {
|
||||
auto res = stream->write(&buffer[progress], size - progress);
|
||||
if(res <= 0) { // if res == 0 then probably stream handles write() error incorrectly. return.
|
||||
if(res == oatpp::data::stream::Errors::ERROR_IO_PIPE ||
|
||||
(res != oatpp::data::stream::Errors::ERROR_IO_RETRY && res != oatpp::data::stream::Errors::ERROR_IO_WAIT_RETRY)) {
|
||||
return progress;
|
||||
}
|
||||
}
|
||||
progress += res;
|
||||
}
|
||||
return progress;
|
||||
}
|
||||
|
||||
}}}
|
||||
|
@ -36,9 +36,24 @@
|
||||
|
||||
namespace oatpp { namespace data{ namespace stream {
|
||||
|
||||
class Errors {
|
||||
public:
|
||||
constexpr static os::io::Library::v_size ERROR_IO_NOTHING_TO_READ = -1001;
|
||||
constexpr static os::io::Library::v_size ERROR_IO_WAIT_RETRY = -1002;
|
||||
constexpr static os::io::Library::v_size ERROR_IO_RETRY = -1003;
|
||||
constexpr static os::io::Library::v_size ERROR_IO_PIPE = -1004;
|
||||
|
||||
constexpr static const char* const ERROR_ASYNC_FAILED_TO_WRITE_DATA = "ERROR_ASYNC_FAILED_TO_WRITE_DATA";
|
||||
constexpr static const char* const ERROR_ASYNC_FAILED_TO_READ_DATA = "ERROR_ASYNC_FAILED_TO_READ_DATA";
|
||||
};
|
||||
|
||||
class OutputStream {
|
||||
public:
|
||||
|
||||
/**
|
||||
* Write data to stream up to count bytes, and return number of bytes actually written
|
||||
* It is a legal case if return result < count. Caller should handle this!
|
||||
*/
|
||||
virtual os::io::Library::v_size write(const void *data, os::io::Library::v_size count) = 0;
|
||||
|
||||
os::io::Library::v_size write(const char* data){
|
||||
@ -63,41 +78,16 @@ public:
|
||||
|
||||
class InputStream {
|
||||
public:
|
||||
/**
|
||||
* Read data from stream up to count bytes, and return number of bytes actually read
|
||||
* It is a legal case if return result < count. Caller should handle this!
|
||||
*/
|
||||
virtual os::io::Library::v_size read(void *data, os::io::Library::v_size count) = 0;
|
||||
};
|
||||
|
||||
class IOStream : public InputStream, public OutputStream {
|
||||
public:
|
||||
constexpr static os::io::Library::v_size ERROR_IO_NOTHING_TO_READ = -1001;
|
||||
constexpr static os::io::Library::v_size ERROR_IO_WAIT_RETRY = -1002;
|
||||
constexpr static os::io::Library::v_size ERROR_IO_RETRY = -1003;
|
||||
constexpr static os::io::Library::v_size ERROR_IO_PIPE = -1004;
|
||||
|
||||
constexpr static const char* const ERROR_ASYNC_FAILED_TO_WRITE_DATA = "ERROR_ASYNC_FAILED_TO_WRITE_DATA";
|
||||
constexpr static const char* const ERROR_ASYNC_FAILED_TO_READ_DATA = "ERROR_ASYNC_FAILED_TO_READ_DATA";
|
||||
|
||||
public:
|
||||
typedef os::io::Library::v_size v_size;
|
||||
|
||||
/**
|
||||
* Async write data withot starting new Coroutine.
|
||||
* Should be called from a separate Coroutine method
|
||||
*/
|
||||
static oatpp::async::Action writeDataAsyncInline(oatpp::data::stream::OutputStream* stream,
|
||||
const void*& data,
|
||||
os::io::Library::v_size& size,
|
||||
const oatpp::async::Action& nextAction);
|
||||
|
||||
static oatpp::async::Action readSomeDataAsyncInline(oatpp::data::stream::InputStream* stream,
|
||||
void*& data,
|
||||
os::io::Library::v_size& bytesLeftToRead,
|
||||
const oatpp::async::Action& nextAction);
|
||||
|
||||
static oatpp::async::Action readExactSizeDataAsyncInline(oatpp::data::stream::InputStream* stream,
|
||||
void*& data,
|
||||
os::io::Library::v_size& bytesLeftToRead,
|
||||
const oatpp::async::Action& nextAction);
|
||||
|
||||
};
|
||||
|
||||
class CompoundIOStream : public oatpp::base::Controllable, public IOStream {
|
||||
@ -145,13 +135,19 @@ const std::shared_ptr<OutputStream>& operator << (const std::shared_ptr<OutputSt
|
||||
const std::shared_ptr<OutputStream>& operator << (const std::shared_ptr<OutputStream>& s, v_float64 value);
|
||||
|
||||
const std::shared_ptr<OutputStream>& operator << (const std::shared_ptr<OutputStream>& s, bool value);
|
||||
|
||||
|
||||
/**
|
||||
* Read bytes from @fromStream" and write to @toStream" using @buffer of size @bufferSize
|
||||
*/
|
||||
void transfer(const std::shared_ptr<InputStream>& fromStream,
|
||||
const std::shared_ptr<OutputStream>& toStream,
|
||||
oatpp::os::io::Library::v_size transferSize,
|
||||
void* buffer,
|
||||
oatpp::os::io::Library::v_size bufferSize);
|
||||
|
||||
/**
|
||||
* Same as transfer but asynchronous
|
||||
*/
|
||||
oatpp::async::Action transferAsync(oatpp::async::AbstractCoroutine* parentCoroutine,
|
||||
const oatpp::async::Action& actionOnReturn,
|
||||
const std::shared_ptr<InputStream>& fromStream,
|
||||
@ -159,6 +155,32 @@ oatpp::async::Action transferAsync(oatpp::async::AbstractCoroutine* parentCorout
|
||||
oatpp::os::io::Library::v_size transferSize,
|
||||
const std::shared_ptr<oatpp::data::buffer::IOBuffer>& buffer);
|
||||
|
||||
/**
|
||||
* Async write data withot starting new Coroutine.
|
||||
* Should be called from a separate Coroutine method
|
||||
*/
|
||||
oatpp::async::Action writeDataAsyncInline(oatpp::data::stream::OutputStream* stream,
|
||||
const void*& data,
|
||||
os::io::Library::v_size& size,
|
||||
const oatpp::async::Action& nextAction);
|
||||
|
||||
oatpp::async::Action readSomeDataAsyncInline(oatpp::data::stream::InputStream* stream,
|
||||
void*& data,
|
||||
os::io::Library::v_size& bytesLeftToRead,
|
||||
const oatpp::async::Action& nextAction);
|
||||
|
||||
oatpp::async::Action readExactSizeDataAsyncInline(oatpp::data::stream::InputStream* stream,
|
||||
void*& data,
|
||||
os::io::Library::v_size& bytesLeftToRead,
|
||||
const oatpp::async::Action& nextAction);
|
||||
|
||||
/**
|
||||
* Write exact amount of bytes to stream.
|
||||
* returns exact amount of bytes was written.
|
||||
* return result can be < size only in case of some disaster like broken pipe
|
||||
*/
|
||||
oatpp::os::io::Library::v_size writeExactSizeData(oatpp::data::stream::OutputStream* stream, const void* data, os::io::Library::v_size size);
|
||||
|
||||
}}}
|
||||
|
||||
#endif /* defined(_data_Stream) */
|
||||
|
@ -123,9 +123,9 @@ oatpp::async::Action OutputStreamBufferedProxy::flushAsync(oatpp::async::Abstrac
|
||||
m_stream->m_pos = 0;
|
||||
m_stream->m_posEnd = 0;
|
||||
return finish();
|
||||
} else if(result == IOStream::ERROR_IO_WAIT_RETRY) {
|
||||
} else if(result == oatpp::data::stream::Errors::ERROR_IO_WAIT_RETRY) {
|
||||
return waitRetry();
|
||||
} else if(result == IOStream::ERROR_IO_RETRY) {
|
||||
} else if(result == oatpp::data::stream::Errors::ERROR_IO_RETRY) {
|
||||
return repeat();
|
||||
} else if(result > 0){
|
||||
m_stream->m_pos += (v_bufferSize) result;
|
||||
|
@ -45,11 +45,11 @@ Connection::Library::v_size Connection::write(const void *buff, Library::v_size
|
||||
if(result <= 0) {
|
||||
auto e = errno;
|
||||
if(e == EAGAIN || e == EWOULDBLOCK){
|
||||
return ERROR_IO_WAIT_RETRY; // For async io. In case socket is non_blocking
|
||||
return oatpp::data::stream::Errors::ERROR_IO_WAIT_RETRY; // For async io. In case socket is non_blocking
|
||||
} else if(e == EINTR) {
|
||||
return ERROR_IO_RETRY;
|
||||
return oatpp::data::stream::Errors::ERROR_IO_RETRY;
|
||||
} else if(e == EPIPE) {
|
||||
return ERROR_IO_PIPE;
|
||||
return oatpp::data::stream::Errors::ERROR_IO_PIPE;
|
||||
} else {
|
||||
//OATPP_LOGD("Connection", "write errno=%d", e);
|
||||
}
|
||||
@ -63,11 +63,11 @@ Connection::Library::v_size Connection::read(void *buff, Library::v_size count){
|
||||
if(result <= 0) {
|
||||
auto e = errno;
|
||||
if(e == EAGAIN || e == EWOULDBLOCK){
|
||||
return ERROR_IO_WAIT_RETRY; // For async io. In case socket is non_blocking
|
||||
return oatpp::data::stream::Errors::ERROR_IO_WAIT_RETRY; // For async io. In case socket is non_blocking
|
||||
} else if(e == EINTR) {
|
||||
return ERROR_IO_RETRY;
|
||||
return oatpp::data::stream::Errors::ERROR_IO_RETRY;
|
||||
} else if(e == ECONNRESET) {
|
||||
return ERROR_IO_PIPE;
|
||||
return oatpp::data::stream::Errors::ERROR_IO_PIPE;
|
||||
} else {
|
||||
//OATPP_LOGD("Connection", "write errno=%d", e);
|
||||
}
|
||||
|
@ -30,7 +30,7 @@ os::io::Library::v_size Pipe::Reader::read(void *data, os::io::Library::v_size c
|
||||
|
||||
Pipe& pipe = *m_pipe;
|
||||
if(!pipe.m_alive) {
|
||||
return oatpp::data::stream::IOStream::ERROR_IO_PIPE;
|
||||
return oatpp::data::stream::Errors::ERROR_IO_PIPE;
|
||||
}
|
||||
|
||||
if(m_nonBlocking) {
|
||||
@ -40,7 +40,7 @@ os::io::Library::v_size Pipe::Reader::read(void *data, os::io::Library::v_size c
|
||||
pipe.m_writeCondition.notify_one();
|
||||
return result;
|
||||
} else {
|
||||
return oatpp::data::stream::IOStream::ERROR_IO_WAIT_RETRY;
|
||||
return oatpp::data::stream::Errors::ERROR_IO_WAIT_RETRY;
|
||||
}
|
||||
}
|
||||
|
||||
@ -53,11 +53,11 @@ os::io::Library::v_size Pipe::Reader::read(void *data, os::io::Library::v_size c
|
||||
lock.unlock();
|
||||
pipe.m_writeCondition.notify_all();
|
||||
pipe.m_readCondition.notify_all();
|
||||
return oatpp::data::stream::IOStream::ERROR_IO_PIPE;
|
||||
return oatpp::data::stream::Errors::ERROR_IO_PIPE;
|
||||
}
|
||||
|
||||
if(pipe.m_buffer.availableToRead() == 0) {
|
||||
return oatpp::data::stream::IOStream::ERROR_IO_RETRY;
|
||||
return oatpp::data::stream::Errors::ERROR_IO_RETRY;
|
||||
}
|
||||
|
||||
auto result = pipe.m_buffer.read(data, count);
|
||||
@ -72,7 +72,7 @@ os::io::Library::v_size Pipe::Writer::write(const void *data, os::io::Library::v
|
||||
|
||||
Pipe& pipe = *m_pipe;
|
||||
if(!pipe.m_alive) {
|
||||
return oatpp::data::stream::IOStream::ERROR_IO_PIPE;
|
||||
return oatpp::data::stream::Errors::ERROR_IO_PIPE;
|
||||
}
|
||||
|
||||
if(m_nonBlocking) {
|
||||
@ -82,7 +82,7 @@ os::io::Library::v_size Pipe::Writer::write(const void *data, os::io::Library::v
|
||||
pipe.m_readCondition.notify_one();
|
||||
return result;
|
||||
} else {
|
||||
return oatpp::data::stream::IOStream::ERROR_IO_WAIT_RETRY;
|
||||
return oatpp::data::stream::Errors::ERROR_IO_WAIT_RETRY;
|
||||
}
|
||||
}
|
||||
|
||||
@ -95,11 +95,11 @@ os::io::Library::v_size Pipe::Writer::write(const void *data, os::io::Library::v
|
||||
lock.unlock();
|
||||
pipe.m_writeCondition.notify_all();
|
||||
pipe.m_readCondition.notify_all();
|
||||
return oatpp::data::stream::IOStream::ERROR_IO_PIPE;
|
||||
return oatpp::data::stream::Errors::ERROR_IO_PIPE;
|
||||
}
|
||||
|
||||
if(pipe.m_buffer.availableToWrite() == 0) {
|
||||
return oatpp::data::stream::IOStream::ERROR_IO_RETRY;
|
||||
return oatpp::data::stream::Errors::ERROR_IO_RETRY;
|
||||
}
|
||||
|
||||
auto result = pipe.m_buffer.write(data, count);
|
||||
|
@ -147,7 +147,7 @@ oatpp::async::Action HttpRequestExecutor::executeAsync(oatpp::async::AbstractCor
|
||||
}
|
||||
|
||||
Action readResponse() {
|
||||
return oatpp::data::stream::IOStream::
|
||||
return oatpp::data::stream::
|
||||
readSomeDataAsyncInline(m_connection.get(), m_bufferPointer, m_bufferBytesLeftToRead, yieldTo(&ExecutorCoroutine::parseResponse));
|
||||
}
|
||||
|
||||
|
@ -150,9 +150,9 @@ oatpp::async::Action BodyDecoder::doChunkedDecodingAsync(oatpp::async::AbstractC
|
||||
|
||||
Action readLineChar() {
|
||||
auto res = m_fromStream->read(&m_lineChar, 1);
|
||||
if(res == oatpp::data::stream::IOStream::ERROR_IO_WAIT_RETRY) {
|
||||
if(res == oatpp::data::stream::Errors::ERROR_IO_WAIT_RETRY) {
|
||||
return oatpp::async::Action::_WAIT_RETRY;
|
||||
} else if(res == oatpp::data::stream::IOStream::ERROR_IO_RETRY) {
|
||||
} else if(res == oatpp::data::stream::Errors::ERROR_IO_RETRY) {
|
||||
return oatpp::async::Action::_REPEAT;
|
||||
} else if( res < 0) {
|
||||
return error("[BodyDecoder::ChunkedDecoder] Can't read line char");
|
||||
@ -197,12 +197,12 @@ oatpp::async::Action BodyDecoder::doChunkedDecodingAsync(oatpp::async::AbstractC
|
||||
|
||||
Action skipRN() {
|
||||
if(m_done) {
|
||||
return oatpp::data::stream::IOStream::readExactSizeDataAsyncInline(m_fromStream.get(),
|
||||
return oatpp::data::stream::readExactSizeDataAsyncInline(m_fromStream.get(),
|
||||
m_skipData,
|
||||
m_skipSize,
|
||||
finish());
|
||||
} else {
|
||||
return oatpp::data::stream::IOStream::readExactSizeDataAsyncInline(m_fromStream.get(),
|
||||
return oatpp::data::stream::readExactSizeDataAsyncInline(m_fromStream.get(),
|
||||
m_skipData,
|
||||
m_skipSize,
|
||||
yieldTo(&ChunkedDecoder::readLineChar));
|
||||
|
@ -38,13 +38,16 @@ protected:
|
||||
typedef oatpp::collection::ListMap<oatpp::String, oatpp::String> Headers;
|
||||
typedef oatpp::data::stream::OutputStream OutputStream;
|
||||
public:
|
||||
virtual void declareHeaders(const std::shared_ptr<Headers>& headers) = 0;
|
||||
|
||||
/**
|
||||
* Do not call this method if stream::write is non blocking!
|
||||
* For fast (not network) BLOCKING streams only!!!
|
||||
* declare headers describing body
|
||||
*/
|
||||
virtual void writeToStream(const std::shared_ptr<OutputStream>& stream) = 0;
|
||||
virtual void declareHeaders(const std::shared_ptr<Headers>& headers) noexcept = 0;
|
||||
|
||||
/**
|
||||
* write content to stream
|
||||
*/
|
||||
virtual void writeToStream(const std::shared_ptr<OutputStream>& stream) noexcept = 0;
|
||||
|
||||
virtual Action writeToStreamAsync(oatpp::async::AbstractCoroutine* parentCoroutine,
|
||||
const Action& actionOnReturn,
|
||||
|
@ -47,23 +47,13 @@ public:
|
||||
return Shared_Http_Outgoing_BufferBody_Pool::allocateShared(buffer);
|
||||
}
|
||||
|
||||
void declareHeaders(const std::shared_ptr<Headers>& headers) override {
|
||||
void declareHeaders(const std::shared_ptr<Headers>& headers) noexcept override {
|
||||
headers->put(oatpp::web::protocol::http::Header::CONTENT_LENGTH,
|
||||
oatpp::utils::conversion::int32ToStr(m_buffer->getSize()));
|
||||
}
|
||||
|
||||
void writeToStream(const std::shared_ptr<OutputStream>& stream) override {
|
||||
oatpp::os::io::Library::v_size progress = 0;
|
||||
while (progress < m_buffer->getSize()) {
|
||||
auto res = stream->write(&m_buffer->getData()[progress], m_buffer->getSize() - progress);
|
||||
if(res < 0) {
|
||||
if(res == oatpp::data::stream::IOStream::ERROR_IO_PIPE ||
|
||||
(res != oatpp::data::stream::IOStream::ERROR_IO_RETRY && res != oatpp::data::stream::IOStream::ERROR_IO_WAIT_RETRY)) {
|
||||
return;
|
||||
}
|
||||
}
|
||||
progress += res;
|
||||
}
|
||||
void writeToStream(const std::shared_ptr<OutputStream>& stream) noexcept override {
|
||||
oatpp::data::stream::writeExactSizeData(stream.get(), m_buffer->getData(), m_buffer->getSize());
|
||||
}
|
||||
|
||||
public:
|
||||
@ -85,7 +75,7 @@ public:
|
||||
{}
|
||||
|
||||
Action act() override {
|
||||
return oatpp::data::stream::IOStream::writeDataAsyncInline(m_stream.get(), m_currData, m_currDataSize, finish());
|
||||
return oatpp::data::stream::writeDataAsyncInline(m_stream.get(), m_currData, m_currDataSize, finish());
|
||||
}
|
||||
|
||||
};
|
||||
|
@ -58,7 +58,7 @@ public:
|
||||
return Shared_Http_Outgoing_ChunkedBufferBody_Pool::allocateShared(buffer, chunked);
|
||||
}
|
||||
|
||||
void declareHeaders(const std::shared_ptr<Headers>& headers) override {
|
||||
void declareHeaders(const std::shared_ptr<Headers>& headers) noexcept override {
|
||||
if(m_chunked){
|
||||
headers->put(oatpp::web::protocol::http::Header::TRANSFER_ENCODING,
|
||||
oatpp::web::protocol::http::Header::Value::TRANSFER_ENCODING_CHUNKED);
|
||||
@ -68,7 +68,7 @@ public:
|
||||
}
|
||||
}
|
||||
|
||||
void writeToStream(const std::shared_ptr<OutputStream>& stream) override {
|
||||
void writeToStream(const std::shared_ptr<OutputStream>& stream) noexcept override {
|
||||
if(m_chunked){
|
||||
auto chunks = m_buffer->getChunks();
|
||||
auto curr = chunks->getFirstNode();
|
||||
@ -147,7 +147,7 @@ public:
|
||||
}
|
||||
|
||||
Action writeCurrData() {
|
||||
return oatpp::data::stream::IOStream::writeDataAsyncInline(m_stream.get(), m_currData, m_currDataSize, m_nextAction);
|
||||
return oatpp::data::stream::writeDataAsyncInline(m_stream.get(), m_currData, m_currDataSize, m_nextAction);
|
||||
}
|
||||
|
||||
};
|
||||
|
@ -60,11 +60,10 @@ public:
|
||||
return Shared_Http_Outgoing_DtoBody_Pool::allocateShared(dto, objectMapper, chunked);
|
||||
}
|
||||
|
||||
void declareHeaders(const std::shared_ptr<Headers>& headers) override {
|
||||
if(!m_dto) {
|
||||
throw std::runtime_error("Sending null object");
|
||||
void declareHeaders(const std::shared_ptr<Headers>& headers) noexcept override {
|
||||
if(m_dto) {
|
||||
m_objectMapper->write(m_buffer, m_dto);
|
||||
}
|
||||
m_objectMapper->write(m_buffer, m_dto);
|
||||
ChunkedBufferBody::declareHeaders(headers);
|
||||
headers->putIfNotExists(Header::CONTENT_TYPE, m_objectMapper->getInfo().http_content_type);
|
||||
}
|
||||
|
@ -28,7 +28,7 @@
|
||||
|
||||
namespace oatpp { namespace web { namespace protocol { namespace http { namespace outgoing {
|
||||
|
||||
void Response::send(const std::shared_ptr<data::stream::OutputStream>& stream){
|
||||
void Response::send(const std::shared_ptr<data::stream::OutputStream>& stream) {
|
||||
|
||||
if(body){
|
||||
body->declareHeaders(headers);
|
||||
|
@ -121,7 +121,7 @@ HttpProcessor::processRequest(HttpRouter* router,
|
||||
return errorHandler->handleError(protocol::http::Status::CODE_404, "Current url has no mapping");
|
||||
}
|
||||
|
||||
} if(readCount == oatpp::data::stream::IOStream::ERROR_IO_NOTHING_TO_READ) {
|
||||
} if(readCount == oatpp::data::stream::Errors::ERROR_IO_NOTHING_TO_READ) {
|
||||
keepAlive = true;
|
||||
}
|
||||
|
||||
@ -178,9 +178,9 @@ HttpProcessor::Coroutine::Action HttpProcessor::Coroutine::act() {
|
||||
if(readCount > 0) {
|
||||
m_currentResponse = nullptr;
|
||||
return parseRequest((v_int32)readCount);
|
||||
} else if(readCount == oatpp::data::stream::IOStream::ERROR_IO_WAIT_RETRY) {
|
||||
} else if(readCount == oatpp::data::stream::Errors::ERROR_IO_WAIT_RETRY) {
|
||||
return waitRetry();
|
||||
} else if(readCount == oatpp::data::stream::IOStream::ERROR_IO_RETRY) {
|
||||
} else if(readCount == oatpp::data::stream::Errors::ERROR_IO_RETRY) {
|
||||
return repeat();
|
||||
}
|
||||
return abort();
|
||||
|
Loading…
x
Reference in New Issue
Block a user