diff --git a/src/oatpp/core/async/Coroutine.hpp b/src/oatpp/core/async/Coroutine.hpp index b1187da5..d2154f21 100644 --- a/src/oatpp/core/async/Coroutine.hpp +++ b/src/oatpp/core/async/Coroutine.hpp @@ -138,14 +138,14 @@ public: * @param ioHandle - &id:oatpp::data::v_io_handle;. * @return - Action. */ - static Action createIOWaitAction(data::v_io_handle ioHandle = -1); + static Action createIOWaitAction(data::v_io_handle ioHandle); /** * Create TYPE_IO_REPEAT Action * @param ioHandle - &id:oatpp::data::v_io_handle;. * @return - Action. */ - static Action createIORepeatAction(data::v_io_handle ioHandle = -1); + static Action createIORepeatAction(data::v_io_handle ioHandle); /** * Create TYPE_WAIT_REPEAT Action. @@ -481,7 +481,7 @@ public: * Convenience method to generate Action of `type == Action::TYPE_IO_WAIT`. * @return - TYPE_WAIT_FOR_IO Action. */ - Action ioWait(data::v_io_handle ioHandle = -1) const { + Action ioWait(data::v_io_handle ioHandle) const { return Action::createIOWaitAction(ioHandle); } @@ -489,7 +489,7 @@ public: * Convenience method to generate Action of `type == Action::TYPE_IO_WAIT`. * @return - TYPE_IO_REPEAT Action. */ - Action ioRepeat(data::v_io_handle ioHandle = -1) const { + Action ioRepeat(data::v_io_handle ioHandle) const { return Action::createIORepeatAction(ioHandle); } @@ -670,7 +670,7 @@ public: * Convenience method to generate Action of `type == Action::TYPE_IO_WAIT`. * @return - TYPE_WAIT_FOR_IO Action. */ - Action ioWait(data::v_io_handle ioHandle = -1) const { + Action ioWait(data::v_io_handle ioHandle) const { return Action::createIOWaitAction(ioHandle); } @@ -678,7 +678,7 @@ public: * Convenience method to generate Action of `type == Action::TYPE_IO_WAIT`. * @return - TYPE_IO_REPEAT Action. */ - Action ioRepeat(data::v_io_handle ioHandle = -1) const { + Action ioRepeat(data::v_io_handle ioHandle) const { return Action::createIORepeatAction(ioHandle); } diff --git a/src/oatpp/core/collection/FastQueue.hpp b/src/oatpp/core/collection/FastQueue.hpp index ddfeb9aa..72a1ddaa 100644 --- a/src/oatpp/core/collection/FastQueue.hpp +++ b/src/oatpp/core/collection/FastQueue.hpp @@ -127,8 +127,10 @@ public: } else if(entry->_ref == nullptr) { prevEntry->_ref = nullptr; last = prevEntry; + -- count; } else { prevEntry->_ref = entry->_ref; + -- count; } } diff --git a/src/oatpp/core/data/stream/Stream.cpp b/src/oatpp/core/data/stream/Stream.cpp index 6db50207..0c5eef7f 100644 --- a/src/oatpp/core/data/stream/Stream.cpp +++ b/src/oatpp/core/data/stream/Stream.cpp @@ -292,12 +292,32 @@ oatpp::async::CoroutineStarter transferAsync(const std::shared_ptr& namespace { - oatpp::async::Action asyncActionOnIOError(oatpp::async::AbstractCoroutine* coroutine, data::v_io_size res) { + oatpp::async::Action asyncOutputStreamActionOnIOError(oatpp::async::AbstractCoroutine* coroutine, + oatpp::data::stream::OutputStream* stream, + data::v_io_size res) + { switch (res) { case IOError::WAIT_RETRY: - return oatpp::async::Action::createIOWaitAction(); + return stream->suggestOutputStreamAction(res); case IOError::RETRY: - return oatpp::async::Action::createIORepeatAction(); + return stream->suggestOutputStreamAction(res); + case IOError::BROKEN_PIPE: + return coroutine->error(oatpp::data::AsyncIOError::ERROR_BROKEN_PIPE); + case IOError::ZERO_VALUE: + return coroutine->error(oatpp::data::AsyncIOError::ERROR_ZERO_VALUE); + } + return coroutine->error("Unknown IO Error result", res); + } + + oatpp::async::Action asyncInputStreamActionOnIOError(oatpp::async::AbstractCoroutine* coroutine, + oatpp::data::stream::InputStream* stream, + data::v_io_size res) + { + switch (res) { + case IOError::WAIT_RETRY: + return stream->suggestInputStreamAction(res); + case IOError::RETRY: + return stream->suggestInputStreamAction(res); case IOError::BROKEN_PIPE: return coroutine->error(oatpp::data::AsyncIOError::ERROR_BROKEN_PIPE); case IOError::ZERO_VALUE: @@ -319,10 +339,10 @@ oatpp::async::Action writeExactSizeDataAsyncInline(oatpp::async::AbstractCorouti data = &((p_char8) data)[res]; size -= res; if (size > 0) { - return oatpp::async::Action::createIORepeatAction(); + return stream->suggestOutputStreamAction(size); } } else { - return asyncActionOnIOError(coroutine, res); + return asyncOutputStreamActionOnIOError(coroutine, stream, res); } } return std::forward(nextAction); @@ -341,7 +361,7 @@ oatpp::async::Action readSomeDataAsyncInline(oatpp::async::AbstractCoroutine* co data = &((p_char8) data)[res]; size -= res; } else { - return asyncActionOnIOError(coroutine, res); + return asyncInputStreamActionOnIOError(coroutine, stream, res); } } return std::forward(nextAction); @@ -359,10 +379,10 @@ oatpp::async::Action readExactSizeDataAsyncInline(oatpp::async::AbstractCoroutin data = &((p_char8) data)[res]; size -= res; if (size > 0) { - return oatpp::async::Action::createIORepeatAction(); + return stream->suggestInputStreamAction(res); } } else { - return asyncActionOnIOError(coroutine, res); + return asyncInputStreamActionOnIOError(coroutine, stream, res); } } return std::forward(nextAction); diff --git a/src/oatpp/core/data/stream/Stream.hpp b/src/oatpp/core/data/stream/Stream.hpp index 2bc65e21..3a293140 100644 --- a/src/oatpp/core/data/stream/Stream.hpp +++ b/src/oatpp/core/data/stream/Stream.hpp @@ -57,6 +57,14 @@ public: */ virtual data::v_io_size write(const void *data, data::v_io_size count) = 0; + /** + * Implementation of OutputStream must suggest async actions for I/O results. + * Suggested Action is used for scheduling coroutines in async::Executor. + * @param ioResult - result of the call to &l:OutputStream::write ();. + * @return - &id:oatpp::async::Action;. + */ + virtual oatpp::async::Action suggestOutputStreamAction(data::v_io_size ioResult) = 0; + /** * Set stream I/O mode. * @throws @@ -118,6 +126,14 @@ public: */ virtual data::v_io_size read(void *data, data::v_io_size count) = 0; + /** + * Implementation of InputStream must suggest async actions for I/O results. + * Suggested Action is used for scheduling coroutines in async::Executor. + * @param ioResult - result of the call to &l:InputStream::read ();. + * @return - &id:oatpp::async::Action;. + */ + virtual oatpp::async::Action suggestInputStreamAction(data::v_io_size ioResult) = 0; + /** * Set stream I/O mode. * @throws @@ -168,6 +184,14 @@ public: return m_inputStream->read(data, count); } + oatpp::async::Action suggestOutputStreamAction(data::v_io_size ioResult) override { + return m_outputStream->suggestOutputStreamAction(ioResult); + } + + oatpp::async::Action suggestInputStreamAction(data::v_io_size ioResult) override { + return m_inputStream->suggestInputStreamAction(ioResult); + } + void setOutputStreamIOMode(IOMode ioMode) override { m_outputStream->setOutputStreamIOMode(ioMode); } @@ -192,6 +216,22 @@ public: class ConsistentOutputStream : public OutputStream { public: + /** + * This should never be called. Call to implementation of this particular method will throw `std::runtime_error`.
+ * No suggestions for ConsistentOutputStream async I/O operations are needed.
+ * ConsistentOutputStream always fully satisfies call to write() method.
+ * @param ioResult - result of call to write() method. + * @return - &id:oatpp::async::Action;. + * @throws - `std::runtime_error` + */ + oatpp::async::Action suggestOutputStreamAction(data::v_io_size ioResult) override { + const char* message = + "Error. ConsistentOutputStream::suggestOutputStreamAction() method is called.\n" + "No suggestions for ConsistentOutputStream async I/O operations are needed.\n " + "ConsistentOutputStream always fully satisfies call to write() method."; + throw std::runtime_error(message); + } + /** * Convert value to string and write to stream. * @param value diff --git a/src/oatpp/core/data/stream/StreamBufferedProxy.cpp b/src/oatpp/core/data/stream/StreamBufferedProxy.cpp index 8b855d05..6947ae6a 100644 --- a/src/oatpp/core/data/stream/StreamBufferedProxy.cpp +++ b/src/oatpp/core/data/stream/StreamBufferedProxy.cpp @@ -38,6 +38,10 @@ data::v_io_size OutputStreamBufferedProxy::write(const void *data, data::v_io_si } } +oatpp::async::Action OutputStreamBufferedProxy::suggestOutputStreamAction(data::v_io_size ioResult) { + return m_outputStream->suggestOutputStreamAction(ioResult); +} + void OutputStreamBufferedProxy::setOutputStreamIOMode(oatpp::data::stream::IOMode ioMode) { m_outputStream->setOutputStreamIOMode(ioMode); } @@ -68,6 +72,10 @@ data::v_io_size InputStreamBufferedProxy::read(void *data, data::v_io_size count } +oatpp::async::Action InputStreamBufferedProxy::suggestInputStreamAction(data::v_io_size ioResult) { + return m_inputStream->suggestInputStreamAction(ioResult); +} + void InputStreamBufferedProxy::setInputStreamIOMode(oatpp::data::stream::IOMode ioMode) { m_inputStream->setInputStreamIOMode(ioMode); } diff --git a/src/oatpp/core/data/stream/StreamBufferedProxy.hpp b/src/oatpp/core/data/stream/StreamBufferedProxy.hpp index 9075fa75..c8c3eb1b 100644 --- a/src/oatpp/core/data/stream/StreamBufferedProxy.hpp +++ b/src/oatpp/core/data/stream/StreamBufferedProxy.hpp @@ -74,6 +74,8 @@ public: data::v_io_size write(const void *data, data::v_io_size count) override; + oatpp::async::Action suggestOutputStreamAction(data::v_io_size ioResult) override; + /** * Set OutputStream I/O mode. * @param ioMode @@ -173,6 +175,8 @@ public: data::v_io_size read(void *data, data::v_io_size count) override; + oatpp::async::Action suggestInputStreamAction(data::v_io_size ioResult) override; + /** * Set InputStream I/O mode. * @param ioMode diff --git a/src/oatpp/network/Connection.cpp b/src/oatpp/network/Connection.cpp index 989fc603..d90bfee4 100644 --- a/src/oatpp/network/Connection.cpp +++ b/src/oatpp/network/Connection.cpp @@ -125,6 +125,41 @@ oatpp::data::stream::IOMode Connection::getStreamIOMode() { } +oatpp::async::Action Connection::suggestOutputStreamAction(data::v_io_size ioResult) { + + if(ioResult > 0) { + return oatpp::async::Action::createIORepeatAction(m_handle); + } + + switch (ioResult) { + case oatpp::data::IOError::WAIT_RETRY: + return oatpp::async::Action::createIOWaitAction(m_handle); + case oatpp::data::IOError::RETRY: + return oatpp::async::Action::createIORepeatAction(m_handle); + } + + throw std::runtime_error("[oatpp::network::virtual_::Pipe::Reader::suggestInputStreamAction()]: Error. Unable to suggest async action for I/O result."); + +} + +oatpp::async::Action Connection::suggestInputStreamAction(data::v_io_size ioResult) { + + if(ioResult > 0) { + return oatpp::async::Action::createIORepeatAction(m_handle); + } + + switch (ioResult) { + case oatpp::data::IOError::WAIT_RETRY: + return oatpp::async::Action::createIOWaitAction(m_handle); + case oatpp::data::IOError::RETRY: + return oatpp::async::Action::createIORepeatAction(m_handle); + } + + throw std::runtime_error("[oatpp::network::virtual_::Pipe::Reader::suggestInputStreamAction()]: Error. Unable to suggest async action for I/O result."); + + +} + void Connection::setOutputStreamIOMode(oatpp::data::stream::IOMode ioMode) { setStreamIOMode(ioMode); } diff --git a/src/oatpp/network/Connection.hpp b/src/oatpp/network/Connection.hpp index 84622fa7..44abdaba 100644 --- a/src/oatpp/network/Connection.hpp +++ b/src/oatpp/network/Connection.hpp @@ -81,6 +81,22 @@ public: */ data::v_io_size read(void *buff, data::v_io_size count) override; + /** + * Implementation of OutputStream must suggest async actions for I/O results. + * Suggested Action is used for scheduling coroutines in async::Executor. + * @param ioResult - result of the call to &l:OutputStream::write ();. + * @return - &id:oatpp::async::Action;. + */ + oatpp::async::Action suggestOutputStreamAction(data::v_io_size ioResult) override; + + /** + * Implementation of InputStream must suggest async actions for I/O results. + * Suggested Action is used for scheduling coroutines in async::Executor. + * @param ioResult - result of the call to &l:InputStream::read ();. + * @return - &id:oatpp::async::Action;. + */ + oatpp::async::Action suggestInputStreamAction(data::v_io_size ioResult) override; + /** * Set OutputStream I/O mode. * @param ioMode diff --git a/src/oatpp/network/client/SimpleTCPConnectionProvider.cpp b/src/oatpp/network/client/SimpleTCPConnectionProvider.cpp index 0bfaed09..59876d80 100644 --- a/src/oatpp/network/client/SimpleTCPConnectionProvider.cpp +++ b/src/oatpp/network/client/SimpleTCPConnectionProvider.cpp @@ -197,9 +197,9 @@ oatpp::async::CoroutineStarterForResult 0) { + return oatpp::async::Action::createActionByType(oatpp::async::Action::TYPE_REPEAT); + } + + switch (ioResult) { + case oatpp::data::IOError::WAIT_RETRY: + return oatpp::async::Action::createWaitRepeatAction(10 * 1000 /* 10 milliseconds */); + case oatpp::data::IOError::RETRY: + return oatpp::async::Action::createActionByType(oatpp::async::Action::TYPE_REPEAT); + } + + throw std::runtime_error("[oatpp::network::virtual_::Pipe::Reader::suggestInputStreamAction()]: Error. Unable to suggest async action for I/O result."); + +} void Pipe::Writer::setOutputStreamIOMode(oatpp::data::stream::IOMode ioMode) { m_ioMode = ioMode; @@ -131,6 +147,23 @@ data::v_io_size Pipe::Writer::write(const void *data, data::v_io_size count) { } +oatpp::async::Action Pipe::Writer::suggestOutputStreamAction(data::v_io_size ioResult) { + + if(ioResult > 0) { + return oatpp::async::Action::createActionByType(oatpp::async::Action::TYPE_REPEAT); + } + + switch (ioResult) { + case oatpp::data::IOError::WAIT_RETRY: + return oatpp::async::Action::createWaitRepeatAction(10 * 1000 /* 10 milliseconds */); + case oatpp::data::IOError::RETRY: + return oatpp::async::Action::createActionByType(oatpp::async::Action::TYPE_REPEAT); + } + + throw std::runtime_error("[oatpp::network::virtual_::Pipe::Writer::suggestOutputStreamAction()]: Error. Unable to suggest async action for I/O result."); + +} + Pipe::Pipe() : m_open(true) , m_writer(this) diff --git a/src/oatpp/network/virtual_/Pipe.hpp b/src/oatpp/network/virtual_/Pipe.hpp index 7806482e..cb74ce7c 100644 --- a/src/oatpp/network/virtual_/Pipe.hpp +++ b/src/oatpp/network/virtual_/Pipe.hpp @@ -85,6 +85,15 @@ public: */ data::v_io_size read(void *data, data::v_io_size count) override; + /** + * Implementation of InputStream must suggest async actions for I/O results. + * Suggested Action is used for scheduling coroutines in async::Executor. + * @param ioResult - result of the call to &l:InputStream::read ();. + * @return - &id:oatpp::async::Action;. + */ + oatpp::async::Action suggestInputStreamAction(data::v_io_size ioResult) override; + + /** * Set InputStream I/O mode. * @param ioMode @@ -140,6 +149,14 @@ public: */ data::v_io_size write(const void *data, data::v_io_size count) override; + /** + * Implementation of OutputStream must suggest async actions for I/O results. + * Suggested Action is used for scheduling coroutines in async::Executor. + * @param ioResult - result of the call to &l:OutputStream::write ();. + * @return - &id:oatpp::async::Action;. + */ + oatpp::async::Action suggestOutputStreamAction(data::v_io_size ioResult) override; + /** * Set OutputStream I/O mode. * @param ioMode diff --git a/src/oatpp/network/virtual_/Socket.cpp b/src/oatpp/network/virtual_/Socket.cpp index 4f7ce1ac..6cb2b316 100644 --- a/src/oatpp/network/virtual_/Socket.cpp +++ b/src/oatpp/network/virtual_/Socket.cpp @@ -52,6 +52,14 @@ data::v_io_size Socket::write(const void *data, data::v_io_size count) { return m_pipeOut->getWriter()->write(data, count); } +oatpp::async::Action Socket::suggestOutputStreamAction(data::v_io_size ioResult) { + return m_pipeOut->getWriter()->suggestOutputStreamAction(ioResult); +} + +oatpp::async::Action Socket::suggestInputStreamAction(data::v_io_size ioResult) { + return m_pipeIn->getReader()->suggestInputStreamAction(ioResult); +} + void Socket::setOutputStreamIOMode(oatpp::data::stream::IOMode ioMode) { m_pipeOut->getWriter()->setOutputStreamIOMode(ioMode); } diff --git a/src/oatpp/network/virtual_/Socket.hpp b/src/oatpp/network/virtual_/Socket.hpp index 346b4f22..2ad5c58d 100644 --- a/src/oatpp/network/virtual_/Socket.hpp +++ b/src/oatpp/network/virtual_/Socket.hpp @@ -84,6 +84,22 @@ public: */ data::v_io_size write(const void *data, data::v_io_size count) override; + /** + * Implementation of OutputStream must suggest async actions for I/O results. + * Suggested Action is used for scheduling coroutines in async::Executor. + * @param ioResult - result of the call to &l:OutputStream::write ();. + * @return - &id:oatpp::async::Action;. + */ + oatpp::async::Action suggestOutputStreamAction(data::v_io_size ioResult) override; + + /** + * Implementation of InputStream must suggest async actions for I/O results. + * Suggested Action is used for scheduling coroutines in async::Executor. + * @param ioResult - result of the call to &l:InputStream::read ();. + * @return - &id:oatpp::async::Action;. + */ + oatpp::async::Action suggestInputStreamAction(data::v_io_size ioResult) override; + /** * Set OutputStream I/O mode. * @param ioMode diff --git a/src/oatpp/web/protocol/http/incoming/RequestHeadersReader.cpp b/src/oatpp/web/protocol/http/incoming/RequestHeadersReader.cpp index c2fab7dd..540e8d69 100644 --- a/src/oatpp/web/protocol/http/incoming/RequestHeadersReader.cpp +++ b/src/oatpp/web/protocol/http/incoming/RequestHeadersReader.cpp @@ -29,8 +29,8 @@ namespace oatpp { namespace web { namespace protocol { namespace http { namespace incoming { data::v_io_size RequestHeadersReader::readHeadersSection(const std::shared_ptr& connection, - oatpp::data::stream::OutputStream* bufferStream, - Result& result) { + oatpp::data::stream::OutputStream* bufferStream, + Result& result) { v_word32 sectionEnd = ('\r' << 24) | ('\n' << 16) | ('\r' << 8) | ('\n'); v_word32 accumulator = 0; @@ -146,10 +146,10 @@ RequestHeadersReader::readHeadersAsync(const std::shared_ptrsuggestInputStreamAction(res); } else if(res == data::IOError::WAIT_RETRY || res == data::IOError::RETRY) { - return ioWait(); + return m_connection->suggestInputStreamAction(res); } else if(res == data::IOError::BROKEN_PIPE){ return error(oatpp::data::AsyncIOError::ERROR_BROKEN_PIPE); } else if(res == data::IOError::ZERO_VALUE){ diff --git a/src/oatpp/web/protocol/http/incoming/ResponseHeadersReader.cpp b/src/oatpp/web/protocol/http/incoming/ResponseHeadersReader.cpp index ea9c4760..a37d7f62 100644 --- a/src/oatpp/web/protocol/http/incoming/ResponseHeadersReader.cpp +++ b/src/oatpp/web/protocol/http/incoming/ResponseHeadersReader.cpp @@ -144,11 +144,11 @@ ResponseHeadersReader::readHeadersAsync(const std::shared_ptrsuggestInputStreamAction(res); } else if(res == data::IOError::WAIT_RETRY || res == data::IOError::RETRY) { - return ioWait(); + return m_connection->suggestInputStreamAction(res); } else if(res == data::IOError::BROKEN_PIPE) { return error(oatpp::data::AsyncIOError::ERROR_BROKEN_PIPE); } else if(res == data::IOError::ZERO_VALUE) { diff --git a/src/oatpp/web/protocol/http/incoming/SimpleBodyDecoder.cpp b/src/oatpp/web/protocol/http/incoming/SimpleBodyDecoder.cpp index 7daad100..81c5ba1d 100644 --- a/src/oatpp/web/protocol/http/incoming/SimpleBodyDecoder.cpp +++ b/src/oatpp/web/protocol/http/incoming/SimpleBodyDecoder.cpp @@ -151,9 +151,9 @@ oatpp::async::CoroutineStarter SimpleBodyDecoder::doChunkedDecodingAsync(const s Action readLineChar() { auto res = m_fromStream->read(&m_lineChar, 1); if(res == data::IOError::WAIT_RETRY) { - return oatpp::async::Action::createIOWaitAction(); + return m_fromStream->suggestInputStreamAction(res); } else if(res == data::IOError::RETRY) { - return oatpp::async::Action::createIORepeatAction(); + return m_fromStream->suggestInputStreamAction(res); } else if( res < 0) { return error("[BodyDecoder::ChunkedDecoder] Can't read line char"); }