Extends IO Error to Read/Write specific.

This commit is contained in:
lganzzzo 2019-12-15 22:12:15 +02:00
parent add3d799fd
commit 2689284c1b
8 changed files with 101 additions and 51 deletions

View File

@ -75,22 +75,34 @@ enum IOError : v_io_size {
ZERO_VALUE = 0,
/**
* I/O operation is not possible any more
* Client should give up trying and free all related resources
* I/O operation is not possible any more.
* Client should give up trying and free all related resources.
*/
BROKEN_PIPE = -1001,
/**
* I/O operation was interrupted because of some reason
* Client may retry immediately
* I/O operation was interrupted because of some reason.
* Client may retry read immediately.
*/
RETRY = -1002,
RETRY_READ = -1002,
/**
* I/O operation is not currently available due to some reason
* Client should wait then retry
* I/O operation was interrupted because of some reason.
* Client may retry immediately.
*/
WAIT_RETRY = -1003
RETRY_WRITE = -1003,
/**
* I/O operation is not currently available due to some reason.
* Client should wait then retry read.
*/
WAIT_RETRY_READ = -1004,
/**
* I/O operation is not currently available due to some reason.
* Client should wait then retry write.
*/
WAIT_RETRY_WRITE = -1005
};

View File

@ -70,7 +70,7 @@ v_buff_size FIFOBuffer::getBufferSize() const {
data::v_io_size FIFOBuffer::read(void *data, v_buff_size count) {
if(!m_canRead) {
return data::IOError::WAIT_RETRY;
return data::IOError::WAIT_RETRY_READ;
}
if(count == 0) {
@ -119,7 +119,7 @@ data::v_io_size FIFOBuffer::read(void *data, v_buff_size count) {
data::v_io_size FIFOBuffer::peek(void *data, v_buff_size count) {
if(!m_canRead) {
return data::IOError::WAIT_RETRY;
return data::IOError::WAIT_RETRY_READ;
}
if(count == 0) {
@ -159,7 +159,7 @@ data::v_io_size FIFOBuffer::peek(void *data, v_buff_size count) {
data::v_io_size FIFOBuffer::commitReadOffset(v_buff_size count) {
if(!m_canRead) {
return data::IOError::WAIT_RETRY;
return data::IOError::WAIT_RETRY_READ;
}
if(count == 0) {
@ -204,7 +204,7 @@ data::v_io_size FIFOBuffer::commitReadOffset(v_buff_size count) {
data::v_io_size FIFOBuffer::write(const void *data, v_buff_size count) {
if(m_canRead && m_writePosition == m_readPosition) {
return data::IOError::WAIT_RETRY;
return data::IOError::WAIT_RETRY_WRITE;
}
if(count == 0) {
@ -249,7 +249,7 @@ data::v_io_size FIFOBuffer::write(const void *data, v_buff_size count) {
data::v_io_size FIFOBuffer::readAndWriteToStream(data::stream::OutputStream* stream, v_buff_size count) {
if(!m_canRead) {
return data::IOError::WAIT_RETRY;
return data::IOError::WAIT_RETRY_READ;
}
if(count == 0) {
@ -302,7 +302,7 @@ data::v_io_size FIFOBuffer::readAndWriteToStream(data::stream::OutputStream* str
data::v_io_size FIFOBuffer::readFromStreamAndWrite(data::stream::InputStream* stream, v_buff_size count) {
if(m_canRead && m_writePosition == m_readPosition) {
return data::IOError::WAIT_RETRY;
return data::IOError::WAIT_RETRY_WRITE;
}
if(count == 0) {

View File

@ -390,7 +390,10 @@ oatpp::data::v_io_size transfer(InputStream* fromStream,
if(writeResult > 0) {
data = &data[writeResult];
bytesLeft -= writeResult;
} else if (writeResult == data::IOError::RETRY || writeResult == data::IOError::WAIT_RETRY) {
} else if (// In general case `OutputStream` may return `RETRY_READ`, `WAIT_RETRY_READ` on
// because of the underlying transport. Check all retry values here!
writeResult == data::IOError::RETRY_READ || writeResult == data::IOError::WAIT_RETRY_READ ||
writeResult == data::IOError::RETRY_WRITE || writeResult == data::IOError::WAIT_RETRY_WRITE) {
continue;
} else {
throw std::runtime_error("[oatpp::data::stream::transfer()]: Unknown Error. Can't continue transfer.");
@ -400,7 +403,10 @@ oatpp::data::v_io_size transfer(InputStream* fromStream,
progress += readResult;
} else {
if(readResult == data::IOError::RETRY || readResult == data::IOError::WAIT_RETRY) {
if(// In general case `InputStream` may return `RETRY_WRITE`, `WAIT_RETRY_WRITE` on
// because of the underlying transport. Check all retry values here!
readResult == data::IOError::RETRY_READ || readResult == data::IOError::WAIT_RETRY_READ ||
readResult == data::IOError::RETRY_WRITE || readResult == data::IOError::WAIT_RETRY_WRITE) {
continue;
}
return progress;
@ -499,31 +505,58 @@ oatpp::async::CoroutineStarter transferAsync(const std::shared_ptr<InputStream>&
namespace {
oatpp::async::Action asyncOutputStreamActionOnIOError(oatpp::data::stream::OutputStream* stream, data::v_io_size res) {
switch (res) {
case IOError::WAIT_RETRY:
// In general case `OutputStream` may return `RETRY_READ`, `WAIT_RETRY_READ` on
// because of the underlying transport. Check all retry values here!
case IOError::WAIT_RETRY_READ:
return stream->suggestOutputStreamAction(res);
case IOError::RETRY:
case IOError::RETRY_READ:
return stream->suggestOutputStreamAction(res);
case IOError::WAIT_RETRY_WRITE:
return stream->suggestOutputStreamAction(res);
case IOError::RETRY_WRITE:
return stream->suggestOutputStreamAction(res);
case IOError::BROKEN_PIPE:
return new AsyncIOError(IOError::BROKEN_PIPE);
case IOError::ZERO_VALUE:
return new AsyncIOError(IOError::ZERO_VALUE);
}
return new AsyncIOError("Unknown IO Error result", res);
}
oatpp::async::Action asyncInputStreamActionOnIOError(oatpp::data::stream::InputStream* stream, data::v_io_size res) {
switch (res) {
case IOError::WAIT_RETRY:
// In general case `InputStream` may return `RETRY_WRITE`, `WAIT_RETRY_WRITE` on
// because of the underlying transport. Check all retry values here!
case IOError::WAIT_RETRY_READ:
return stream->suggestInputStreamAction(res);
case IOError::RETRY:
case IOError::RETRY_READ:
return stream->suggestInputStreamAction(res);
case IOError::WAIT_RETRY_WRITE:
return stream->suggestInputStreamAction(res);
case IOError::RETRY_WRITE:
return stream->suggestInputStreamAction(res);
case IOError::BROKEN_PIPE:
return new AsyncIOError(IOError::BROKEN_PIPE);
case IOError::ZERO_VALUE:
return new AsyncIOError(IOError::ZERO_VALUE);
}
return new AsyncIOError("Unknown IO Error result", res);
}
}
@ -619,8 +652,9 @@ oatpp::data::v_io_size readExactSizeData(oatpp::data::stream::InputStream* strea
if(res > 0) {
progress += res;
} else { // if res == 0 then probably stream handles read() error incorrectly. return.
if(res == data::IOError::RETRY || res == data::IOError::WAIT_RETRY) {
} else {
if(res == data::IOError::RETRY_READ || res == data::IOError::WAIT_RETRY_READ ||
res == data::IOError::RETRY_WRITE || res == data::IOError::WAIT_RETRY_WRITE) {
continue;
}
return progress;
@ -643,8 +677,9 @@ oatpp::data::v_io_size writeExactSizeData(oatpp::data::stream::OutputStream* str
if(res > 0) {
progress += res;
} else { // if res == 0 then probably stream handles write() error incorrectly. return.
if(res == data::IOError::RETRY || res == data::IOError::WAIT_RETRY) {
} else {
if(res == data::IOError::RETRY_READ || res == data::IOError::WAIT_RETRY_READ ||
res == data::IOError::RETRY_WRITE || res == data::IOError::WAIT_RETRY_WRITE) {
continue;
}
return progress;

View File

@ -68,9 +68,9 @@ data::v_io_size Connection::write(const void *buff, v_buff_size count){
auto e = WSAGetLastError();
if(e == WSAEWOULDBLOCK){
return data::IOError::WAIT_RETRY; // For async io. In case socket is non_blocking
return data::IOError::WAIT_RETRY_WRITE; // For async io. In case socket is non_blocking
} else if(e == WSAEINTR) {
return data::IOError::RETRY;
return data::IOError::RETRY_WRITE;
} else if(e == WSAECONNRESET) {
return data::IOError::BROKEN_PIPE;
} else {
@ -93,9 +93,9 @@ data::v_io_size Connection::write(const void *buff, v_buff_size count){
if(result <= 0) {
auto e = errno;
if(e == EAGAIN || e == EWOULDBLOCK){
return data::IOError::WAIT_RETRY; // For async io. In case socket is non_blocking
return data::IOError::WAIT_RETRY_WRITE; // For async io. In case socket is non_blocking
} else if(e == EINTR) {
return data::IOError::RETRY;
return data::IOError::RETRY_WRITE;
} else if(e == EPIPE) {
return data::IOError::BROKEN_PIPE;
} else {
@ -119,9 +119,9 @@ data::v_io_size Connection::read(void *buff, v_buff_size count){
auto e = WSAGetLastError();
if(e == WSAEWOULDBLOCK){
return data::IOError::WAIT_RETRY; // For async io. In case socket is non_blocking
return data::IOError::WAIT_RETRY_READ; // For async io. In case socket is non_blocking
} else if(e == WSAEINTR) {
return data::IOError::RETRY;
return data::IOError::RETRY_READ;
} else if(e == WSAECONNRESET) {
return data::IOError::BROKEN_PIPE;
} else {
@ -140,9 +140,9 @@ data::v_io_size Connection::read(void *buff, v_buff_size count){
if(result <= 0) {
auto e = errno;
if(e == EAGAIN || e == EWOULDBLOCK){
return data::IOError::WAIT_RETRY; // For async io. In case socket is non_blocking
return data::IOError::WAIT_RETRY_READ; // For async io. In case socket is non_blocking
} else if(e == EINTR) {
return data::IOError::RETRY;
return data::IOError::RETRY_READ;
} else if(e == ECONNRESET) {
return data::IOError::BROKEN_PIPE;
} else {
@ -234,9 +234,9 @@ oatpp::async::Action Connection::suggestOutputStreamAction(data::v_io_size ioRes
}
switch (ioResult) {
case oatpp::data::IOError::WAIT_RETRY:
case oatpp::data::IOError::WAIT_RETRY_WRITE:
return oatpp::async::Action::createIOWaitAction(m_handle, oatpp::async::Action::IOEventType::IO_EVENT_WRITE);
case oatpp::data::IOError::RETRY:
case oatpp::data::IOError::RETRY_WRITE:
return oatpp::async::Action::createIORepeatAction(m_handle, oatpp::async::Action::IOEventType::IO_EVENT_WRITE);
}
@ -251,9 +251,9 @@ oatpp::async::Action Connection::suggestInputStreamAction(data::v_io_size ioResu
}
switch (ioResult) {
case oatpp::data::IOError::WAIT_RETRY:
case oatpp::data::IOError::WAIT_RETRY_READ:
return oatpp::async::Action::createIOWaitAction(m_handle, oatpp::async::Action::IOEventType::IO_EVENT_READ);
case oatpp::data::IOError::RETRY:
case oatpp::data::IOError::RETRY_READ:
return oatpp::async::Action::createIORepeatAction(m_handle, oatpp::async::Action::IOEventType::IO_EVENT_READ);
}

View File

@ -55,12 +55,12 @@ data::v_io_size Pipe::Reader::read(void *data, v_buff_size count) {
if (pipe.m_fifo.availableToRead() > 0) {
result = pipe.m_fifo.read(data, count);
} else if (pipe.m_open) {
result = data::IOError::WAIT_RETRY;
result = data::IOError::WAIT_RETRY_READ;
} else {
result = data::IOError::BROKEN_PIPE;
}
} else {
result = data::IOError::WAIT_RETRY;
result = data::IOError::WAIT_RETRY_READ;
}
} else {
std::unique_lock<std::mutex> lock(pipe.m_mutex);
@ -90,14 +90,14 @@ oatpp::async::Action Pipe::Reader::suggestInputStreamAction(data::v_io_size ioRe
}
switch (ioResult) {
case oatpp::data::IOError::WAIT_RETRY: {
case oatpp::data::IOError::WAIT_RETRY_READ: {
std::unique_lock<std::mutex> lock(m_pipe->m_mutex);
if (m_pipe->m_fifo.availableToRead() > 0 || !m_pipe->m_open) {
return oatpp::async::Action::createActionByType(oatpp::async::Action::TYPE_REPEAT);
}
return oatpp::async::Action::createWaitListAction(&m_waitList);
}
case oatpp::data::IOError::RETRY:
case oatpp::data::IOError::RETRY_READ:
return oatpp::async::Action::createActionByType(oatpp::async::Action::TYPE_REPEAT);
}
@ -148,12 +148,12 @@ data::v_io_size Pipe::Writer::write(const void *data, v_buff_size count) {
if (pipe.m_fifo.availableToWrite() > 0) {
result = pipe.m_fifo.write(data, count);
} else if (pipe.m_open) {
result = data::IOError::WAIT_RETRY;
result = data::IOError::WAIT_RETRY_WRITE;
} else {
result = data::IOError::BROKEN_PIPE;
}
} else {
result = data::IOError::WAIT_RETRY;
result = data::IOError::WAIT_RETRY_WRITE;
}
} else {
std::unique_lock<std::mutex> lock(pipe.m_mutex);
@ -183,14 +183,14 @@ oatpp::async::Action Pipe::Writer::suggestOutputStreamAction(data::v_io_size ioR
}
switch (ioResult) {
case oatpp::data::IOError::WAIT_RETRY: {
case oatpp::data::IOError::WAIT_RETRY_WRITE: {
std::unique_lock<std::mutex> lock(m_pipe->m_mutex);
if (m_pipe->m_fifo.availableToWrite() > 0 || !m_pipe->m_open) {
return oatpp::async::Action::createActionByType(oatpp::async::Action::TYPE_REPEAT);
}
return oatpp::async::Action::createWaitListAction(&m_waitList);
}
case oatpp::data::IOError::RETRY:
case oatpp::data::IOError::RETRY_WRITE:
return oatpp::async::Action::createActionByType(oatpp::async::Action::TYPE_REPEAT);
}

View File

@ -61,7 +61,8 @@ data::v_io_size RequestHeadersReader::readHeadersSection(data::stream::InputStre
stream->commitReadOffset(res);
} else if(res == data::IOError::WAIT_RETRY || res == data::IOError::RETRY) {
} else if(res == data::IOError::WAIT_RETRY_READ || res == data::IOError::RETRY_READ ||
res == data::IOError::WAIT_RETRY_WRITE || res == data::IOError::RETRY_WRITE) {
continue;
} else {
break;
@ -145,7 +146,8 @@ RequestHeadersReader::readHeadersAsync(const std::shared_ptr<data::stream::Input
return m_stream->suggestInputStreamAction(res);
} else if(res == data::IOError::WAIT_RETRY || res == data::IOError::RETRY) {
} else if(res == data::IOError::WAIT_RETRY_READ || res == data::IOError::RETRY_READ ||
res == data::IOError::WAIT_RETRY_WRITE || res == data::IOError::RETRY_WRITE) {
return m_stream->suggestInputStreamAction(res);
} else if(res == data::IOError::BROKEN_PIPE){
return error<oatpp::data::AsyncIOError>(data::IOError::BROKEN_PIPE);

View File

@ -60,7 +60,8 @@ data::v_io_size ResponseHeadersReader::readHeadersSection(const std::shared_ptr<
}
}
} else if(res == data::IOError::WAIT_RETRY || res == data::IOError::RETRY) {
} else if(res == data::IOError::WAIT_RETRY_READ || res == data::IOError::RETRY_READ ||
res == data::IOError::WAIT_RETRY_WRITE || res == data::IOError::RETRY_WRITE) {
continue;
} else {
break;
@ -145,7 +146,8 @@ ResponseHeadersReader::readHeadersAsync(const std::shared_ptr<oatpp::data::strea
return m_connection->suggestInputStreamAction(res);
} else if(res == data::IOError::WAIT_RETRY || res == data::IOError::RETRY) {
} else if(res == data::IOError::WAIT_RETRY_READ || res == data::IOError::RETRY_READ ||
res == data::IOError::WAIT_RETRY_WRITE || res == data::IOError::RETRY_WRITE) {
return m_connection->suggestInputStreamAction(res);
} else if(res == data::IOError::BROKEN_PIPE) {
return error<oatpp::data::AsyncIOError>(data::IOError::BROKEN_PIPE);

View File

@ -161,9 +161,8 @@ oatpp::async::CoroutineStarter SimpleBodyDecoder::doChunkedDecodingAsync(const s
Action readLineChar() {
auto res = m_fromStream->read(&m_lineChar, 1);
if(res == data::IOError::WAIT_RETRY) {
return m_fromStream->suggestInputStreamAction(res);
} else if(res == data::IOError::RETRY) {
if(res == data::IOError::WAIT_RETRY_READ || res == data::IOError::RETRY_READ ||
res == data::IOError::WAIT_RETRY_WRITE || res == data::IOError::RETRY_WRITE) {
return m_fromStream->suggestInputStreamAction(res);
} else if( res < 0) {
return error<Error>("[BodyDecoder::ChunkedDecoder] Can't read line char");