From 2689284c1bfbe86a1e0923bc180aa4ae301323ed Mon Sep 17 00:00:00 2001 From: lganzzzo Date: Sun, 15 Dec 2019 22:12:15 +0200 Subject: [PATCH] Extends IO Error to Read/Write specific. --- src/oatpp/core/data/IODefinitions.hpp | 28 +++++++--- src/oatpp/core/data/buffer/FIFOBuffer.cpp | 12 ++-- src/oatpp/core/data/stream/Stream.cpp | 55 +++++++++++++++---- src/oatpp/network/Connection.cpp | 24 ++++---- src/oatpp/network/virtual_/Pipe.cpp | 16 +++--- .../http/incoming/RequestHeadersReader.cpp | 6 +- .../http/incoming/ResponseHeadersReader.cpp | 6 +- .../http/incoming/SimpleBodyDecoder.cpp | 5 +- 8 files changed, 101 insertions(+), 51 deletions(-) diff --git a/src/oatpp/core/data/IODefinitions.hpp b/src/oatpp/core/data/IODefinitions.hpp index 4da1519d..8a3b9177 100644 --- a/src/oatpp/core/data/IODefinitions.hpp +++ b/src/oatpp/core/data/IODefinitions.hpp @@ -75,22 +75,34 @@ enum IOError : v_io_size { ZERO_VALUE = 0, /** - * I/O operation is not possible any more - * Client should give up trying and free all related resources + * I/O operation is not possible any more. + * Client should give up trying and free all related resources. */ BROKEN_PIPE = -1001, /** - * I/O operation was interrupted because of some reason - * Client may retry immediately + * I/O operation was interrupted because of some reason. + * Client may retry read immediately. */ - RETRY = -1002, + RETRY_READ = -1002, /** - * I/O operation is not currently available due to some reason - * Client should wait then retry + * I/O operation was interrupted because of some reason. + * Client may retry immediately. */ - WAIT_RETRY = -1003 + RETRY_WRITE = -1003, + + /** + * I/O operation is not currently available due to some reason. + * Client should wait then retry read. + */ + WAIT_RETRY_READ = -1004, + + /** + * I/O operation is not currently available due to some reason. + * Client should wait then retry write. + */ + WAIT_RETRY_WRITE = -1005 }; diff --git a/src/oatpp/core/data/buffer/FIFOBuffer.cpp b/src/oatpp/core/data/buffer/FIFOBuffer.cpp index c8b13e8e..16ce3eed 100644 --- a/src/oatpp/core/data/buffer/FIFOBuffer.cpp +++ b/src/oatpp/core/data/buffer/FIFOBuffer.cpp @@ -70,7 +70,7 @@ v_buff_size FIFOBuffer::getBufferSize() const { data::v_io_size FIFOBuffer::read(void *data, v_buff_size count) { if(!m_canRead) { - return data::IOError::WAIT_RETRY; + return data::IOError::WAIT_RETRY_READ; } if(count == 0) { @@ -119,7 +119,7 @@ data::v_io_size FIFOBuffer::read(void *data, v_buff_size count) { data::v_io_size FIFOBuffer::peek(void *data, v_buff_size count) { if(!m_canRead) { - return data::IOError::WAIT_RETRY; + return data::IOError::WAIT_RETRY_READ; } if(count == 0) { @@ -159,7 +159,7 @@ data::v_io_size FIFOBuffer::peek(void *data, v_buff_size count) { data::v_io_size FIFOBuffer::commitReadOffset(v_buff_size count) { if(!m_canRead) { - return data::IOError::WAIT_RETRY; + return data::IOError::WAIT_RETRY_READ; } if(count == 0) { @@ -204,7 +204,7 @@ data::v_io_size FIFOBuffer::commitReadOffset(v_buff_size count) { data::v_io_size FIFOBuffer::write(const void *data, v_buff_size count) { if(m_canRead && m_writePosition == m_readPosition) { - return data::IOError::WAIT_RETRY; + return data::IOError::WAIT_RETRY_WRITE; } if(count == 0) { @@ -249,7 +249,7 @@ data::v_io_size FIFOBuffer::write(const void *data, v_buff_size count) { data::v_io_size FIFOBuffer::readAndWriteToStream(data::stream::OutputStream* stream, v_buff_size count) { if(!m_canRead) { - return data::IOError::WAIT_RETRY; + return data::IOError::WAIT_RETRY_READ; } if(count == 0) { @@ -302,7 +302,7 @@ data::v_io_size FIFOBuffer::readAndWriteToStream(data::stream::OutputStream* str data::v_io_size FIFOBuffer::readFromStreamAndWrite(data::stream::InputStream* stream, v_buff_size count) { if(m_canRead && m_writePosition == m_readPosition) { - return data::IOError::WAIT_RETRY; + return data::IOError::WAIT_RETRY_WRITE; } if(count == 0) { diff --git a/src/oatpp/core/data/stream/Stream.cpp b/src/oatpp/core/data/stream/Stream.cpp index 19964607..55327bb3 100644 --- a/src/oatpp/core/data/stream/Stream.cpp +++ b/src/oatpp/core/data/stream/Stream.cpp @@ -390,7 +390,10 @@ oatpp::data::v_io_size transfer(InputStream* fromStream, if(writeResult > 0) { data = &data[writeResult]; bytesLeft -= writeResult; - } else if (writeResult == data::IOError::RETRY || writeResult == data::IOError::WAIT_RETRY) { + } else if (// In general case `OutputStream` may return `RETRY_READ`, `WAIT_RETRY_READ` on + // because of the underlying transport. Check all retry values here! + writeResult == data::IOError::RETRY_READ || writeResult == data::IOError::WAIT_RETRY_READ || + writeResult == data::IOError::RETRY_WRITE || writeResult == data::IOError::WAIT_RETRY_WRITE) { continue; } else { throw std::runtime_error("[oatpp::data::stream::transfer()]: Unknown Error. Can't continue transfer."); @@ -400,7 +403,10 @@ oatpp::data::v_io_size transfer(InputStream* fromStream, progress += readResult; } else { - if(readResult == data::IOError::RETRY || readResult == data::IOError::WAIT_RETRY) { + if(// In general case `InputStream` may return `RETRY_WRITE`, `WAIT_RETRY_WRITE` on + // because of the underlying transport. Check all retry values here! + readResult == data::IOError::RETRY_READ || readResult == data::IOError::WAIT_RETRY_READ || + readResult == data::IOError::RETRY_WRITE || readResult == data::IOError::WAIT_RETRY_WRITE) { continue; } return progress; @@ -499,31 +505,58 @@ oatpp::async::CoroutineStarter transferAsync(const std::shared_ptr& namespace { oatpp::async::Action asyncOutputStreamActionOnIOError(oatpp::data::stream::OutputStream* stream, data::v_io_size res) { + switch (res) { - case IOError::WAIT_RETRY: + + // In general case `OutputStream` may return `RETRY_READ`, `WAIT_RETRY_READ` on + // because of the underlying transport. Check all retry values here! + + case IOError::WAIT_RETRY_READ: return stream->suggestOutputStreamAction(res); - case IOError::RETRY: + case IOError::RETRY_READ: return stream->suggestOutputStreamAction(res); + + case IOError::WAIT_RETRY_WRITE: + return stream->suggestOutputStreamAction(res); + case IOError::RETRY_WRITE: + return stream->suggestOutputStreamAction(res); + case IOError::BROKEN_PIPE: return new AsyncIOError(IOError::BROKEN_PIPE); case IOError::ZERO_VALUE: return new AsyncIOError(IOError::ZERO_VALUE); + } + return new AsyncIOError("Unknown IO Error result", res); + } oatpp::async::Action asyncInputStreamActionOnIOError(oatpp::data::stream::InputStream* stream, data::v_io_size res) { + switch (res) { - case IOError::WAIT_RETRY: + + // In general case `InputStream` may return `RETRY_WRITE`, `WAIT_RETRY_WRITE` on + // because of the underlying transport. Check all retry values here! + + case IOError::WAIT_RETRY_READ: return stream->suggestInputStreamAction(res); - case IOError::RETRY: + case IOError::RETRY_READ: return stream->suggestInputStreamAction(res); + + case IOError::WAIT_RETRY_WRITE: + return stream->suggestInputStreamAction(res); + case IOError::RETRY_WRITE: + return stream->suggestInputStreamAction(res); + case IOError::BROKEN_PIPE: return new AsyncIOError(IOError::BROKEN_PIPE); case IOError::ZERO_VALUE: return new AsyncIOError(IOError::ZERO_VALUE); } + return new AsyncIOError("Unknown IO Error result", res); + } } @@ -619,8 +652,9 @@ oatpp::data::v_io_size readExactSizeData(oatpp::data::stream::InputStream* strea if(res > 0) { progress += res; - } else { // if res == 0 then probably stream handles read() error incorrectly. return. - if(res == data::IOError::RETRY || res == data::IOError::WAIT_RETRY) { + } else { + if(res == data::IOError::RETRY_READ || res == data::IOError::WAIT_RETRY_READ || + res == data::IOError::RETRY_WRITE || res == data::IOError::WAIT_RETRY_WRITE) { continue; } return progress; @@ -643,8 +677,9 @@ oatpp::data::v_io_size writeExactSizeData(oatpp::data::stream::OutputStream* str if(res > 0) { progress += res; - } else { // if res == 0 then probably stream handles write() error incorrectly. return. - if(res == data::IOError::RETRY || res == data::IOError::WAIT_RETRY) { + } else { + if(res == data::IOError::RETRY_READ || res == data::IOError::WAIT_RETRY_READ || + res == data::IOError::RETRY_WRITE || res == data::IOError::WAIT_RETRY_WRITE) { continue; } return progress; diff --git a/src/oatpp/network/Connection.cpp b/src/oatpp/network/Connection.cpp index 2b670160..c441a50a 100644 --- a/src/oatpp/network/Connection.cpp +++ b/src/oatpp/network/Connection.cpp @@ -68,9 +68,9 @@ data::v_io_size Connection::write(const void *buff, v_buff_size count){ auto e = WSAGetLastError(); if(e == WSAEWOULDBLOCK){ - return data::IOError::WAIT_RETRY; // For async io. In case socket is non_blocking + return data::IOError::WAIT_RETRY_WRITE; // For async io. In case socket is non_blocking } else if(e == WSAEINTR) { - return data::IOError::RETRY; + return data::IOError::RETRY_WRITE; } else if(e == WSAECONNRESET) { return data::IOError::BROKEN_PIPE; } else { @@ -93,9 +93,9 @@ data::v_io_size Connection::write(const void *buff, v_buff_size count){ if(result <= 0) { auto e = errno; if(e == EAGAIN || e == EWOULDBLOCK){ - return data::IOError::WAIT_RETRY; // For async io. In case socket is non_blocking + return data::IOError::WAIT_RETRY_WRITE; // For async io. In case socket is non_blocking } else if(e == EINTR) { - return data::IOError::RETRY; + return data::IOError::RETRY_WRITE; } else if(e == EPIPE) { return data::IOError::BROKEN_PIPE; } else { @@ -119,9 +119,9 @@ data::v_io_size Connection::read(void *buff, v_buff_size count){ auto e = WSAGetLastError(); if(e == WSAEWOULDBLOCK){ - return data::IOError::WAIT_RETRY; // For async io. In case socket is non_blocking + return data::IOError::WAIT_RETRY_READ; // For async io. In case socket is non_blocking } else if(e == WSAEINTR) { - return data::IOError::RETRY; + return data::IOError::RETRY_READ; } else if(e == WSAECONNRESET) { return data::IOError::BROKEN_PIPE; } else { @@ -140,9 +140,9 @@ data::v_io_size Connection::read(void *buff, v_buff_size count){ if(result <= 0) { auto e = errno; if(e == EAGAIN || e == EWOULDBLOCK){ - return data::IOError::WAIT_RETRY; // For async io. In case socket is non_blocking + return data::IOError::WAIT_RETRY_READ; // For async io. In case socket is non_blocking } else if(e == EINTR) { - return data::IOError::RETRY; + return data::IOError::RETRY_READ; } else if(e == ECONNRESET) { return data::IOError::BROKEN_PIPE; } else { @@ -234,9 +234,9 @@ oatpp::async::Action Connection::suggestOutputStreamAction(data::v_io_size ioRes } switch (ioResult) { - case oatpp::data::IOError::WAIT_RETRY: + case oatpp::data::IOError::WAIT_RETRY_WRITE: return oatpp::async::Action::createIOWaitAction(m_handle, oatpp::async::Action::IOEventType::IO_EVENT_WRITE); - case oatpp::data::IOError::RETRY: + case oatpp::data::IOError::RETRY_WRITE: return oatpp::async::Action::createIORepeatAction(m_handle, oatpp::async::Action::IOEventType::IO_EVENT_WRITE); } @@ -251,9 +251,9 @@ oatpp::async::Action Connection::suggestInputStreamAction(data::v_io_size ioResu } switch (ioResult) { - case oatpp::data::IOError::WAIT_RETRY: + case oatpp::data::IOError::WAIT_RETRY_READ: return oatpp::async::Action::createIOWaitAction(m_handle, oatpp::async::Action::IOEventType::IO_EVENT_READ); - case oatpp::data::IOError::RETRY: + case oatpp::data::IOError::RETRY_READ: return oatpp::async::Action::createIORepeatAction(m_handle, oatpp::async::Action::IOEventType::IO_EVENT_READ); } diff --git a/src/oatpp/network/virtual_/Pipe.cpp b/src/oatpp/network/virtual_/Pipe.cpp index 349df1db..ac073584 100644 --- a/src/oatpp/network/virtual_/Pipe.cpp +++ b/src/oatpp/network/virtual_/Pipe.cpp @@ -55,12 +55,12 @@ data::v_io_size Pipe::Reader::read(void *data, v_buff_size count) { if (pipe.m_fifo.availableToRead() > 0) { result = pipe.m_fifo.read(data, count); } else if (pipe.m_open) { - result = data::IOError::WAIT_RETRY; + result = data::IOError::WAIT_RETRY_READ; } else { result = data::IOError::BROKEN_PIPE; } } else { - result = data::IOError::WAIT_RETRY; + result = data::IOError::WAIT_RETRY_READ; } } else { std::unique_lock lock(pipe.m_mutex); @@ -90,14 +90,14 @@ oatpp::async::Action Pipe::Reader::suggestInputStreamAction(data::v_io_size ioRe } switch (ioResult) { - case oatpp::data::IOError::WAIT_RETRY: { + case oatpp::data::IOError::WAIT_RETRY_READ: { std::unique_lock lock(m_pipe->m_mutex); if (m_pipe->m_fifo.availableToRead() > 0 || !m_pipe->m_open) { return oatpp::async::Action::createActionByType(oatpp::async::Action::TYPE_REPEAT); } return oatpp::async::Action::createWaitListAction(&m_waitList); } - case oatpp::data::IOError::RETRY: + case oatpp::data::IOError::RETRY_READ: return oatpp::async::Action::createActionByType(oatpp::async::Action::TYPE_REPEAT); } @@ -148,12 +148,12 @@ data::v_io_size Pipe::Writer::write(const void *data, v_buff_size count) { if (pipe.m_fifo.availableToWrite() > 0) { result = pipe.m_fifo.write(data, count); } else if (pipe.m_open) { - result = data::IOError::WAIT_RETRY; + result = data::IOError::WAIT_RETRY_WRITE; } else { result = data::IOError::BROKEN_PIPE; } } else { - result = data::IOError::WAIT_RETRY; + result = data::IOError::WAIT_RETRY_WRITE; } } else { std::unique_lock lock(pipe.m_mutex); @@ -183,14 +183,14 @@ oatpp::async::Action Pipe::Writer::suggestOutputStreamAction(data::v_io_size ioR } switch (ioResult) { - case oatpp::data::IOError::WAIT_RETRY: { + case oatpp::data::IOError::WAIT_RETRY_WRITE: { std::unique_lock lock(m_pipe->m_mutex); if (m_pipe->m_fifo.availableToWrite() > 0 || !m_pipe->m_open) { return oatpp::async::Action::createActionByType(oatpp::async::Action::TYPE_REPEAT); } return oatpp::async::Action::createWaitListAction(&m_waitList); } - case oatpp::data::IOError::RETRY: + case oatpp::data::IOError::RETRY_WRITE: return oatpp::async::Action::createActionByType(oatpp::async::Action::TYPE_REPEAT); } diff --git a/src/oatpp/web/protocol/http/incoming/RequestHeadersReader.cpp b/src/oatpp/web/protocol/http/incoming/RequestHeadersReader.cpp index 169b3b2b..ba25d97e 100644 --- a/src/oatpp/web/protocol/http/incoming/RequestHeadersReader.cpp +++ b/src/oatpp/web/protocol/http/incoming/RequestHeadersReader.cpp @@ -61,7 +61,8 @@ data::v_io_size RequestHeadersReader::readHeadersSection(data::stream::InputStre stream->commitReadOffset(res); - } else if(res == data::IOError::WAIT_RETRY || res == data::IOError::RETRY) { + } else if(res == data::IOError::WAIT_RETRY_READ || res == data::IOError::RETRY_READ || + res == data::IOError::WAIT_RETRY_WRITE || res == data::IOError::RETRY_WRITE) { continue; } else { break; @@ -145,7 +146,8 @@ RequestHeadersReader::readHeadersAsync(const std::shared_ptrsuggestInputStreamAction(res); - } else if(res == data::IOError::WAIT_RETRY || res == data::IOError::RETRY) { + } else if(res == data::IOError::WAIT_RETRY_READ || res == data::IOError::RETRY_READ || + res == data::IOError::WAIT_RETRY_WRITE || res == data::IOError::RETRY_WRITE) { return m_stream->suggestInputStreamAction(res); } else if(res == data::IOError::BROKEN_PIPE){ return error(data::IOError::BROKEN_PIPE); diff --git a/src/oatpp/web/protocol/http/incoming/ResponseHeadersReader.cpp b/src/oatpp/web/protocol/http/incoming/ResponseHeadersReader.cpp index 4ec3f643..88e1cb61 100644 --- a/src/oatpp/web/protocol/http/incoming/ResponseHeadersReader.cpp +++ b/src/oatpp/web/protocol/http/incoming/ResponseHeadersReader.cpp @@ -60,7 +60,8 @@ data::v_io_size ResponseHeadersReader::readHeadersSection(const std::shared_ptr< } } - } else if(res == data::IOError::WAIT_RETRY || res == data::IOError::RETRY) { + } else if(res == data::IOError::WAIT_RETRY_READ || res == data::IOError::RETRY_READ || + res == data::IOError::WAIT_RETRY_WRITE || res == data::IOError::RETRY_WRITE) { continue; } else { break; @@ -145,7 +146,8 @@ ResponseHeadersReader::readHeadersAsync(const std::shared_ptrsuggestInputStreamAction(res); - } else if(res == data::IOError::WAIT_RETRY || res == data::IOError::RETRY) { + } else if(res == data::IOError::WAIT_RETRY_READ || res == data::IOError::RETRY_READ || + res == data::IOError::WAIT_RETRY_WRITE || res == data::IOError::RETRY_WRITE) { return m_connection->suggestInputStreamAction(res); } else if(res == data::IOError::BROKEN_PIPE) { return error(data::IOError::BROKEN_PIPE); diff --git a/src/oatpp/web/protocol/http/incoming/SimpleBodyDecoder.cpp b/src/oatpp/web/protocol/http/incoming/SimpleBodyDecoder.cpp index c8ce4e47..a40e4b89 100644 --- a/src/oatpp/web/protocol/http/incoming/SimpleBodyDecoder.cpp +++ b/src/oatpp/web/protocol/http/incoming/SimpleBodyDecoder.cpp @@ -161,9 +161,8 @@ oatpp::async::CoroutineStarter SimpleBodyDecoder::doChunkedDecodingAsync(const s Action readLineChar() { auto res = m_fromStream->read(&m_lineChar, 1); - if(res == data::IOError::WAIT_RETRY) { - return m_fromStream->suggestInputStreamAction(res); - } else if(res == data::IOError::RETRY) { + if(res == data::IOError::WAIT_RETRY_READ || res == data::IOError::RETRY_READ || + res == data::IOError::WAIT_RETRY_WRITE || res == data::IOError::RETRY_WRITE) { return m_fromStream->suggestInputStreamAction(res); } else if( res < 0) { return error("[BodyDecoder::ChunkedDecoder] Can't read line char");