extend stream interfaces to suggest async actions

This commit is contained in:
lganzzzo 2019-04-23 03:05:10 +03:00
parent d8200cd28b
commit 48b217ede1
16 changed files with 224 additions and 25 deletions

View File

@ -138,14 +138,14 @@ public:
* @param ioHandle - &id:oatpp::data::v_io_handle;. * @param ioHandle - &id:oatpp::data::v_io_handle;.
* @return - Action. * @return - Action.
*/ */
static Action createIOWaitAction(data::v_io_handle ioHandle = -1); static Action createIOWaitAction(data::v_io_handle ioHandle);
/** /**
* Create TYPE_IO_REPEAT Action * Create TYPE_IO_REPEAT Action
* @param ioHandle - &id:oatpp::data::v_io_handle;. * @param ioHandle - &id:oatpp::data::v_io_handle;.
* @return - Action. * @return - Action.
*/ */
static Action createIORepeatAction(data::v_io_handle ioHandle = -1); static Action createIORepeatAction(data::v_io_handle ioHandle);
/** /**
* Create TYPE_WAIT_REPEAT Action. * Create TYPE_WAIT_REPEAT Action.
@ -481,7 +481,7 @@ public:
* Convenience method to generate Action of `type == Action::TYPE_IO_WAIT`. * Convenience method to generate Action of `type == Action::TYPE_IO_WAIT`.
* @return - TYPE_WAIT_FOR_IO Action. * @return - TYPE_WAIT_FOR_IO Action.
*/ */
Action ioWait(data::v_io_handle ioHandle = -1) const { Action ioWait(data::v_io_handle ioHandle) const {
return Action::createIOWaitAction(ioHandle); return Action::createIOWaitAction(ioHandle);
} }
@ -489,7 +489,7 @@ public:
* Convenience method to generate Action of `type == Action::TYPE_IO_WAIT`. * Convenience method to generate Action of `type == Action::TYPE_IO_WAIT`.
* @return - TYPE_IO_REPEAT Action. * @return - TYPE_IO_REPEAT Action.
*/ */
Action ioRepeat(data::v_io_handle ioHandle = -1) const { Action ioRepeat(data::v_io_handle ioHandle) const {
return Action::createIORepeatAction(ioHandle); return Action::createIORepeatAction(ioHandle);
} }
@ -670,7 +670,7 @@ public:
* Convenience method to generate Action of `type == Action::TYPE_IO_WAIT`. * Convenience method to generate Action of `type == Action::TYPE_IO_WAIT`.
* @return - TYPE_WAIT_FOR_IO Action. * @return - TYPE_WAIT_FOR_IO Action.
*/ */
Action ioWait(data::v_io_handle ioHandle = -1) const { Action ioWait(data::v_io_handle ioHandle) const {
return Action::createIOWaitAction(ioHandle); return Action::createIOWaitAction(ioHandle);
} }
@ -678,7 +678,7 @@ public:
* Convenience method to generate Action of `type == Action::TYPE_IO_WAIT`. * Convenience method to generate Action of `type == Action::TYPE_IO_WAIT`.
* @return - TYPE_IO_REPEAT Action. * @return - TYPE_IO_REPEAT Action.
*/ */
Action ioRepeat(data::v_io_handle ioHandle = -1) const { Action ioRepeat(data::v_io_handle ioHandle) const {
return Action::createIORepeatAction(ioHandle); return Action::createIORepeatAction(ioHandle);
} }

View File

@ -127,8 +127,10 @@ public:
} else if(entry->_ref == nullptr) { } else if(entry->_ref == nullptr) {
prevEntry->_ref = nullptr; prevEntry->_ref = nullptr;
last = prevEntry; last = prevEntry;
-- count;
} else { } else {
prevEntry->_ref = entry->_ref; prevEntry->_ref = entry->_ref;
-- count;
} }
} }

View File

@ -292,12 +292,32 @@ oatpp::async::CoroutineStarter transferAsync(const std::shared_ptr<InputStream>&
namespace { namespace {
oatpp::async::Action asyncActionOnIOError(oatpp::async::AbstractCoroutine* coroutine, data::v_io_size res) { oatpp::async::Action asyncOutputStreamActionOnIOError(oatpp::async::AbstractCoroutine* coroutine,
oatpp::data::stream::OutputStream* stream,
data::v_io_size res)
{
switch (res) { switch (res) {
case IOError::WAIT_RETRY: case IOError::WAIT_RETRY:
return oatpp::async::Action::createIOWaitAction(); return stream->suggestOutputStreamAction(res);
case IOError::RETRY: case IOError::RETRY:
return oatpp::async::Action::createIORepeatAction(); return stream->suggestOutputStreamAction(res);
case IOError::BROKEN_PIPE:
return coroutine->error(oatpp::data::AsyncIOError::ERROR_BROKEN_PIPE);
case IOError::ZERO_VALUE:
return coroutine->error(oatpp::data::AsyncIOError::ERROR_ZERO_VALUE);
}
return coroutine->error<AsyncIOError>("Unknown IO Error result", res);
}
oatpp::async::Action asyncInputStreamActionOnIOError(oatpp::async::AbstractCoroutine* coroutine,
oatpp::data::stream::InputStream* stream,
data::v_io_size res)
{
switch (res) {
case IOError::WAIT_RETRY:
return stream->suggestInputStreamAction(res);
case IOError::RETRY:
return stream->suggestInputStreamAction(res);
case IOError::BROKEN_PIPE: case IOError::BROKEN_PIPE:
return coroutine->error(oatpp::data::AsyncIOError::ERROR_BROKEN_PIPE); return coroutine->error(oatpp::data::AsyncIOError::ERROR_BROKEN_PIPE);
case IOError::ZERO_VALUE: case IOError::ZERO_VALUE:
@ -319,10 +339,10 @@ oatpp::async::Action writeExactSizeDataAsyncInline(oatpp::async::AbstractCorouti
data = &((p_char8) data)[res]; data = &((p_char8) data)[res];
size -= res; size -= res;
if (size > 0) { if (size > 0) {
return oatpp::async::Action::createIORepeatAction(); return stream->suggestOutputStreamAction(size);
} }
} else { } else {
return asyncActionOnIOError(coroutine, res); return asyncOutputStreamActionOnIOError(coroutine, stream, res);
} }
} }
return std::forward<oatpp::async::Action>(nextAction); return std::forward<oatpp::async::Action>(nextAction);
@ -341,7 +361,7 @@ oatpp::async::Action readSomeDataAsyncInline(oatpp::async::AbstractCoroutine* co
data = &((p_char8) data)[res]; data = &((p_char8) data)[res];
size -= res; size -= res;
} else { } else {
return asyncActionOnIOError(coroutine, res); return asyncInputStreamActionOnIOError(coroutine, stream, res);
} }
} }
return std::forward<oatpp::async::Action>(nextAction); return std::forward<oatpp::async::Action>(nextAction);
@ -359,10 +379,10 @@ oatpp::async::Action readExactSizeDataAsyncInline(oatpp::async::AbstractCoroutin
data = &((p_char8) data)[res]; data = &((p_char8) data)[res];
size -= res; size -= res;
if (size > 0) { if (size > 0) {
return oatpp::async::Action::createIORepeatAction(); return stream->suggestInputStreamAction(res);
} }
} else { } else {
return asyncActionOnIOError(coroutine, res); return asyncInputStreamActionOnIOError(coroutine, stream, res);
} }
} }
return std::forward<oatpp::async::Action>(nextAction); return std::forward<oatpp::async::Action>(nextAction);

View File

@ -57,6 +57,14 @@ public:
*/ */
virtual data::v_io_size write(const void *data, data::v_io_size count) = 0; virtual data::v_io_size write(const void *data, data::v_io_size count) = 0;
/**
* Implementation of OutputStream must suggest async actions for I/O results.
* Suggested Action is used for scheduling coroutines in async::Executor.
* @param ioResult - result of the call to &l:OutputStream::write ();.
* @return - &id:oatpp::async::Action;.
*/
virtual oatpp::async::Action suggestOutputStreamAction(data::v_io_size ioResult) = 0;
/** /**
* Set stream I/O mode. * Set stream I/O mode.
* @throws * @throws
@ -118,6 +126,14 @@ public:
*/ */
virtual data::v_io_size read(void *data, data::v_io_size count) = 0; virtual data::v_io_size read(void *data, data::v_io_size count) = 0;
/**
* Implementation of InputStream must suggest async actions for I/O results.
* Suggested Action is used for scheduling coroutines in async::Executor.
* @param ioResult - result of the call to &l:InputStream::read ();.
* @return - &id:oatpp::async::Action;.
*/
virtual oatpp::async::Action suggestInputStreamAction(data::v_io_size ioResult) = 0;
/** /**
* Set stream I/O mode. * Set stream I/O mode.
* @throws * @throws
@ -168,6 +184,14 @@ public:
return m_inputStream->read(data, count); return m_inputStream->read(data, count);
} }
oatpp::async::Action suggestOutputStreamAction(data::v_io_size ioResult) override {
return m_outputStream->suggestOutputStreamAction(ioResult);
}
oatpp::async::Action suggestInputStreamAction(data::v_io_size ioResult) override {
return m_inputStream->suggestInputStreamAction(ioResult);
}
void setOutputStreamIOMode(IOMode ioMode) override { void setOutputStreamIOMode(IOMode ioMode) override {
m_outputStream->setOutputStreamIOMode(ioMode); m_outputStream->setOutputStreamIOMode(ioMode);
} }
@ -192,6 +216,22 @@ public:
class ConsistentOutputStream : public OutputStream { class ConsistentOutputStream : public OutputStream {
public: public:
/**
* This should never be called. Call to implementation of this particular method will throw `std::runtime_error`.<br>
* No suggestions for ConsistentOutputStream async I/O operations are needed.<br>
* ConsistentOutputStream always fully satisfies call to write() method.<br>
* @param ioResult - result of call to write() method.
* @return - &id:oatpp::async::Action;.
* @throws - `std::runtime_error`
*/
oatpp::async::Action suggestOutputStreamAction(data::v_io_size ioResult) override {
const char* message =
"Error. ConsistentOutputStream::suggestOutputStreamAction() method is called.\n"
"No suggestions for ConsistentOutputStream async I/O operations are needed.\n "
"ConsistentOutputStream always fully satisfies call to write() method.";
throw std::runtime_error(message);
}
/** /**
* Convert value to string and write to stream. * Convert value to string and write to stream.
* @param value * @param value

View File

@ -38,6 +38,10 @@ data::v_io_size OutputStreamBufferedProxy::write(const void *data, data::v_io_si
} }
} }
oatpp::async::Action OutputStreamBufferedProxy::suggestOutputStreamAction(data::v_io_size ioResult) {
return m_outputStream->suggestOutputStreamAction(ioResult);
}
void OutputStreamBufferedProxy::setOutputStreamIOMode(oatpp::data::stream::IOMode ioMode) { void OutputStreamBufferedProxy::setOutputStreamIOMode(oatpp::data::stream::IOMode ioMode) {
m_outputStream->setOutputStreamIOMode(ioMode); m_outputStream->setOutputStreamIOMode(ioMode);
} }
@ -68,6 +72,10 @@ data::v_io_size InputStreamBufferedProxy::read(void *data, data::v_io_size count
} }
oatpp::async::Action InputStreamBufferedProxy::suggestInputStreamAction(data::v_io_size ioResult) {
return m_inputStream->suggestInputStreamAction(ioResult);
}
void InputStreamBufferedProxy::setInputStreamIOMode(oatpp::data::stream::IOMode ioMode) { void InputStreamBufferedProxy::setInputStreamIOMode(oatpp::data::stream::IOMode ioMode) {
m_inputStream->setInputStreamIOMode(ioMode); m_inputStream->setInputStreamIOMode(ioMode);
} }

View File

@ -74,6 +74,8 @@ public:
data::v_io_size write(const void *data, data::v_io_size count) override; data::v_io_size write(const void *data, data::v_io_size count) override;
oatpp::async::Action suggestOutputStreamAction(data::v_io_size ioResult) override;
/** /**
* Set OutputStream I/O mode. * Set OutputStream I/O mode.
* @param ioMode * @param ioMode
@ -173,6 +175,8 @@ public:
data::v_io_size read(void *data, data::v_io_size count) override; data::v_io_size read(void *data, data::v_io_size count) override;
oatpp::async::Action suggestInputStreamAction(data::v_io_size ioResult) override;
/** /**
* Set InputStream I/O mode. * Set InputStream I/O mode.
* @param ioMode * @param ioMode

View File

@ -125,6 +125,41 @@ oatpp::data::stream::IOMode Connection::getStreamIOMode() {
} }
oatpp::async::Action Connection::suggestOutputStreamAction(data::v_io_size ioResult) {
if(ioResult > 0) {
return oatpp::async::Action::createIORepeatAction(m_handle);
}
switch (ioResult) {
case oatpp::data::IOError::WAIT_RETRY:
return oatpp::async::Action::createIOWaitAction(m_handle);
case oatpp::data::IOError::RETRY:
return oatpp::async::Action::createIORepeatAction(m_handle);
}
throw std::runtime_error("[oatpp::network::virtual_::Pipe::Reader::suggestInputStreamAction()]: Error. Unable to suggest async action for I/O result.");
}
oatpp::async::Action Connection::suggestInputStreamAction(data::v_io_size ioResult) {
if(ioResult > 0) {
return oatpp::async::Action::createIORepeatAction(m_handle);
}
switch (ioResult) {
case oatpp::data::IOError::WAIT_RETRY:
return oatpp::async::Action::createIOWaitAction(m_handle);
case oatpp::data::IOError::RETRY:
return oatpp::async::Action::createIORepeatAction(m_handle);
}
throw std::runtime_error("[oatpp::network::virtual_::Pipe::Reader::suggestInputStreamAction()]: Error. Unable to suggest async action for I/O result.");
}
void Connection::setOutputStreamIOMode(oatpp::data::stream::IOMode ioMode) { void Connection::setOutputStreamIOMode(oatpp::data::stream::IOMode ioMode) {
setStreamIOMode(ioMode); setStreamIOMode(ioMode);
} }

View File

@ -81,6 +81,22 @@ public:
*/ */
data::v_io_size read(void *buff, data::v_io_size count) override; data::v_io_size read(void *buff, data::v_io_size count) override;
/**
* Implementation of OutputStream must suggest async actions for I/O results.
* Suggested Action is used for scheduling coroutines in async::Executor.
* @param ioResult - result of the call to &l:OutputStream::write ();.
* @return - &id:oatpp::async::Action;.
*/
oatpp::async::Action suggestOutputStreamAction(data::v_io_size ioResult) override;
/**
* Implementation of InputStream must suggest async actions for I/O results.
* Suggested Action is used for scheduling coroutines in async::Executor.
* @param ioResult - result of the call to &l:InputStream::read ();.
* @return - &id:oatpp::async::Action;.
*/
oatpp::async::Action suggestInputStreamAction(data::v_io_size ioResult) override;
/** /**
* Set OutputStream I/O mode. * Set OutputStream I/O mode.
* @param ioMode * @param ioMode

View File

@ -197,9 +197,9 @@ oatpp::async::CoroutineStarterForResult<const std::shared_ptr<oatpp::data::strea
return _return(oatpp::network::Connection::createShared(m_clientHandle)); return _return(oatpp::network::Connection::createShared(m_clientHandle));
} }
if(errno == EALREADY || errno == EINPROGRESS) { if(errno == EALREADY || errno == EINPROGRESS) {
return ioWait(); return ioWait(m_clientHandle);
} else if(errno == EINTR) { } else if(errno == EINTR) {
return ioRepeat(); return ioRepeat(m_clientHandle);
} }
::close(m_clientHandle); ::close(m_clientHandle);

View File

@ -78,6 +78,22 @@ data::v_io_size Pipe::Reader::read(void *data, data::v_io_size count) {
} }
oatpp::async::Action Pipe::Reader::suggestInputStreamAction(data::v_io_size ioResult) {
if(ioResult > 0) {
return oatpp::async::Action::createActionByType(oatpp::async::Action::TYPE_REPEAT);
}
switch (ioResult) {
case oatpp::data::IOError::WAIT_RETRY:
return oatpp::async::Action::createWaitRepeatAction(10 * 1000 /* 10 milliseconds */);
case oatpp::data::IOError::RETRY:
return oatpp::async::Action::createActionByType(oatpp::async::Action::TYPE_REPEAT);
}
throw std::runtime_error("[oatpp::network::virtual_::Pipe::Reader::suggestInputStreamAction()]: Error. Unable to suggest async action for I/O result.");
}
void Pipe::Writer::setOutputStreamIOMode(oatpp::data::stream::IOMode ioMode) { void Pipe::Writer::setOutputStreamIOMode(oatpp::data::stream::IOMode ioMode) {
m_ioMode = ioMode; m_ioMode = ioMode;
@ -131,6 +147,23 @@ data::v_io_size Pipe::Writer::write(const void *data, data::v_io_size count) {
} }
oatpp::async::Action Pipe::Writer::suggestOutputStreamAction(data::v_io_size ioResult) {
if(ioResult > 0) {
return oatpp::async::Action::createActionByType(oatpp::async::Action::TYPE_REPEAT);
}
switch (ioResult) {
case oatpp::data::IOError::WAIT_RETRY:
return oatpp::async::Action::createWaitRepeatAction(10 * 1000 /* 10 milliseconds */);
case oatpp::data::IOError::RETRY:
return oatpp::async::Action::createActionByType(oatpp::async::Action::TYPE_REPEAT);
}
throw std::runtime_error("[oatpp::network::virtual_::Pipe::Writer::suggestOutputStreamAction()]: Error. Unable to suggest async action for I/O result.");
}
Pipe::Pipe() Pipe::Pipe()
: m_open(true) : m_open(true)
, m_writer(this) , m_writer(this)

View File

@ -85,6 +85,15 @@ public:
*/ */
data::v_io_size read(void *data, data::v_io_size count) override; data::v_io_size read(void *data, data::v_io_size count) override;
/**
* Implementation of InputStream must suggest async actions for I/O results.
* Suggested Action is used for scheduling coroutines in async::Executor.
* @param ioResult - result of the call to &l:InputStream::read ();.
* @return - &id:oatpp::async::Action;.
*/
oatpp::async::Action suggestInputStreamAction(data::v_io_size ioResult) override;
/** /**
* Set InputStream I/O mode. * Set InputStream I/O mode.
* @param ioMode * @param ioMode
@ -140,6 +149,14 @@ public:
*/ */
data::v_io_size write(const void *data, data::v_io_size count) override; data::v_io_size write(const void *data, data::v_io_size count) override;
/**
* Implementation of OutputStream must suggest async actions for I/O results.
* Suggested Action is used for scheduling coroutines in async::Executor.
* @param ioResult - result of the call to &l:OutputStream::write ();.
* @return - &id:oatpp::async::Action;.
*/
oatpp::async::Action suggestOutputStreamAction(data::v_io_size ioResult) override;
/** /**
* Set OutputStream I/O mode. * Set OutputStream I/O mode.
* @param ioMode * @param ioMode

View File

@ -52,6 +52,14 @@ data::v_io_size Socket::write(const void *data, data::v_io_size count) {
return m_pipeOut->getWriter()->write(data, count); return m_pipeOut->getWriter()->write(data, count);
} }
oatpp::async::Action Socket::suggestOutputStreamAction(data::v_io_size ioResult) {
return m_pipeOut->getWriter()->suggestOutputStreamAction(ioResult);
}
oatpp::async::Action Socket::suggestInputStreamAction(data::v_io_size ioResult) {
return m_pipeIn->getReader()->suggestInputStreamAction(ioResult);
}
void Socket::setOutputStreamIOMode(oatpp::data::stream::IOMode ioMode) { void Socket::setOutputStreamIOMode(oatpp::data::stream::IOMode ioMode) {
m_pipeOut->getWriter()->setOutputStreamIOMode(ioMode); m_pipeOut->getWriter()->setOutputStreamIOMode(ioMode);
} }

View File

@ -84,6 +84,22 @@ public:
*/ */
data::v_io_size write(const void *data, data::v_io_size count) override; data::v_io_size write(const void *data, data::v_io_size count) override;
/**
* Implementation of OutputStream must suggest async actions for I/O results.
* Suggested Action is used for scheduling coroutines in async::Executor.
* @param ioResult - result of the call to &l:OutputStream::write ();.
* @return - &id:oatpp::async::Action;.
*/
oatpp::async::Action suggestOutputStreamAction(data::v_io_size ioResult) override;
/**
* Implementation of InputStream must suggest async actions for I/O results.
* Suggested Action is used for scheduling coroutines in async::Executor.
* @param ioResult - result of the call to &l:InputStream::read ();.
* @return - &id:oatpp::async::Action;.
*/
oatpp::async::Action suggestInputStreamAction(data::v_io_size ioResult) override;
/** /**
* Set OutputStream I/O mode. * Set OutputStream I/O mode.
* @param ioMode * @param ioMode

View File

@ -29,8 +29,8 @@
namespace oatpp { namespace web { namespace protocol { namespace http { namespace incoming { namespace oatpp { namespace web { namespace protocol { namespace http { namespace incoming {
data::v_io_size RequestHeadersReader::readHeadersSection(const std::shared_ptr<oatpp::data::stream::IOStream>& connection, data::v_io_size RequestHeadersReader::readHeadersSection(const std::shared_ptr<oatpp::data::stream::IOStream>& connection,
oatpp::data::stream::OutputStream* bufferStream, oatpp::data::stream::OutputStream* bufferStream,
Result& result) { Result& result) {
v_word32 sectionEnd = ('\r' << 24) | ('\n' << 16) | ('\r' << 8) | ('\n'); v_word32 sectionEnd = ('\r' << 24) | ('\n' << 16) | ('\r' << 8) | ('\n');
v_word32 accumulator = 0; v_word32 accumulator = 0;
@ -146,10 +146,10 @@ RequestHeadersReader::readHeadersAsync(const std::shared_ptr<oatpp::data::stream
} }
} }
return ioRepeat(); return m_connection->suggestInputStreamAction(res);
} else if(res == data::IOError::WAIT_RETRY || res == data::IOError::RETRY) { } else if(res == data::IOError::WAIT_RETRY || res == data::IOError::RETRY) {
return ioWait(); return m_connection->suggestInputStreamAction(res);
} else if(res == data::IOError::BROKEN_PIPE){ } else if(res == data::IOError::BROKEN_PIPE){
return error(oatpp::data::AsyncIOError::ERROR_BROKEN_PIPE); return error(oatpp::data::AsyncIOError::ERROR_BROKEN_PIPE);
} else if(res == data::IOError::ZERO_VALUE){ } else if(res == data::IOError::ZERO_VALUE){

View File

@ -144,11 +144,11 @@ ResponseHeadersReader::readHeadersAsync(const std::shared_ptr<oatpp::data::strea
return yieldTo(&ReaderCoroutine::parseHeaders); return yieldTo(&ReaderCoroutine::parseHeaders);
} }
} }
return ioRepeat(); return m_connection->suggestInputStreamAction(res);
} else if(res == data::IOError::WAIT_RETRY || res == data::IOError::RETRY) { } else if(res == data::IOError::WAIT_RETRY || res == data::IOError::RETRY) {
return ioWait(); return m_connection->suggestInputStreamAction(res);
} else if(res == data::IOError::BROKEN_PIPE) { } else if(res == data::IOError::BROKEN_PIPE) {
return error(oatpp::data::AsyncIOError::ERROR_BROKEN_PIPE); return error(oatpp::data::AsyncIOError::ERROR_BROKEN_PIPE);
} else if(res == data::IOError::ZERO_VALUE) { } else if(res == data::IOError::ZERO_VALUE) {

View File

@ -151,9 +151,9 @@ oatpp::async::CoroutineStarter SimpleBodyDecoder::doChunkedDecodingAsync(const s
Action readLineChar() { Action readLineChar() {
auto res = m_fromStream->read(&m_lineChar, 1); auto res = m_fromStream->read(&m_lineChar, 1);
if(res == data::IOError::WAIT_RETRY) { if(res == data::IOError::WAIT_RETRY) {
return oatpp::async::Action::createIOWaitAction(); return m_fromStream->suggestInputStreamAction(res);
} else if(res == data::IOError::RETRY) { } else if(res == data::IOError::RETRY) {
return oatpp::async::Action::createIORepeatAction(); return m_fromStream->suggestInputStreamAction(res);
} else if( res < 0) { } else if( res < 0) {
return error<Error>("[BodyDecoder::ChunkedDecoder] Can't read line char"); return error<Error>("[BodyDecoder::ChunkedDecoder] Can't read line char");
} }