diff --git a/src/oatpp/web/protocol/http/incoming/BodyDecoder.cpp b/src/oatpp/web/protocol/http/incoming/BodyDecoder.cpp index 4d1e0275..fc6f6b99 100644 --- a/src/oatpp/web/protocol/http/incoming/BodyDecoder.cpp +++ b/src/oatpp/web/protocol/http/incoming/BodyDecoder.cpp @@ -26,26 +26,6 @@ namespace oatpp { namespace web { namespace protocol { namespace http { namespace incoming { -//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// -// BodyDecoder::ToStringDecoder - -BodyDecoder::ToStringDecoder::ToStringDecoder(const BodyDecoder* decoder, - const Headers& headers, - const std::shared_ptr& bodyStream) - : m_decoder(decoder) - , m_headers(headers) - , m_bodyStream(bodyStream) - , m_chunkedBuffer(data::stream::ChunkedBuffer::createShared()) -{} - -async::Action BodyDecoder::ToStringDecoder::act() { - return m_decoder->decodeToStreamAsync(m_headers, m_bodyStream, m_chunkedBuffer).next(yieldTo(&ToStringDecoder::onDecoded)); -} - -async::Action BodyDecoder::ToStringDecoder::onDecoded() { - return _return(m_chunkedBuffer->toString()); -} - //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// // BodyDecoder @@ -58,11 +38,45 @@ void BodyDecoder::decodeToStream(const Headers& headers, } async::CoroutineStarter BodyDecoder::decodeToStreamAsync(const Headers& headers, - const std::shared_ptr& bodyStream, - const std::shared_ptr& toStream) const + const std::shared_ptr& bodyStream, + const std::shared_ptr& toStream) const { auto callback = std::make_shared(toStream); return decodeAsync(headers, bodyStream, callback); } +oatpp::async::CoroutineStarterForResult +BodyDecoder::decodeToStringAsync(const Headers& headers, const std::shared_ptr& bodyStream) const { + + class ToStringDecoder : public oatpp::async::CoroutineWithResult { + private: + const BodyDecoder* m_decoder; + Headers m_headers; + std::shared_ptr m_bodyStream; + std::shared_ptr m_chunkedBuffer; + public: + + ToStringDecoder(const BodyDecoder* decoder, + const Headers& headers, + const std::shared_ptr& bodyStream) + : m_decoder(decoder) + , m_headers(headers) + , m_bodyStream(bodyStream) + , m_chunkedBuffer(data::stream::ChunkedBuffer::createShared()) + {} + + Action act() override { + return m_decoder->decodeToStreamAsync(m_headers, m_bodyStream, m_chunkedBuffer).next(yieldTo(&ToStringDecoder::onDecoded)); + } + + Action onDecoded() { + return _return(m_chunkedBuffer->toString()); + } + + }; + + return ToStringDecoder::startForResult(this, headers, bodyStream); + +} + }}}}} diff --git a/src/oatpp/web/protocol/http/incoming/BodyDecoder.hpp b/src/oatpp/web/protocol/http/incoming/BodyDecoder.hpp index a8703385..f4b42270 100644 --- a/src/oatpp/web/protocol/http/incoming/BodyDecoder.hpp +++ b/src/oatpp/web/protocol/http/incoming/BodyDecoder.hpp @@ -39,23 +39,6 @@ namespace oatpp { namespace web { namespace protocol { namespace http { namespac */ class BodyDecoder { private: - - class ToStringDecoder : public oatpp::async::CoroutineWithResult { - private: - const BodyDecoder* m_decoder; - Headers m_headers; - std::shared_ptr m_bodyStream; - std::shared_ptr m_chunkedBuffer; - public: - - ToStringDecoder(const BodyDecoder* decoder, - const Headers& headers, - const std::shared_ptr& bodyStream); - - Action act() override; - Action onDecoded(); - - }; template class ToDtoDecoder : public oatpp::async::CoroutineWithResult, const typename Type::ObjectWrapper&> { @@ -78,7 +61,7 @@ private: {} oatpp::async::Action act() override { - return m_decoder->decodeAsync(m_headers, m_bodyStream, m_chunkedBuffer).next(this->yieldTo(&ToDtoDecoder::onDecoded)); + return m_decoder->decodeToStreamAsync(m_headers, m_bodyStream, m_chunkedBuffer).next(this->yieldTo(&ToDtoDecoder::onDecoded)); } oatpp::async::Action onDecoded() { @@ -172,9 +155,7 @@ public: * @return - &id:oatpp::async::CoroutineStarterForResult;. */ oatpp::async::CoroutineStarterForResult - decodeToStringAsync(const Headers& headers, const std::shared_ptr& bodyStream) const { - return ToStringDecoder::startForResult(this, headers, bodyStream); - } + decodeToStringAsync(const Headers& headers, const std::shared_ptr& bodyStream) const; /** * Same as &l:BodyDecoder::decodeToDto (); but Async. diff --git a/test/oatpp/web/FullAsyncClientTest.cpp b/test/oatpp/web/FullAsyncClientTest.cpp index 11ec901b..c2d37f76 100644 --- a/test/oatpp/web/FullAsyncClientTest.cpp +++ b/test/oatpp/web/FullAsyncClientTest.cpp @@ -161,6 +161,47 @@ public: std::atomic ClientCoroutine_getRootAsync::SUCCESS_COUNTER(0); +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// +// ClientCoroutine_postBodyAsync + +class ClientCoroutine_postBodyAsync : public oatpp::async::Coroutine { +public: + static std::atomic SUCCESS_COUNTER; +private: + OATPP_COMPONENT(std::shared_ptr, appClient); + OATPP_COMPONENT(std::shared_ptr, objectMapper); +public: + + Action act() override { + return appClient->postBodyAsync("my_test_body").callbackTo(&ClientCoroutine_postBodyAsync::onResponse); + } + + Action onResponse(const std::shared_ptr& response) { + OATPP_ASSERT(response->getStatusCode() == 200 && "ClientCoroutine_postBodyAsync"); + return response->readBodyToDtoAsync(objectMapper).callbackTo(&ClientCoroutine_postBodyAsync::onBodyRead); + } + + Action onBodyRead(const app::TestDto::ObjectWrapper& body) { + OATPP_ASSERT(body); + OATPP_ASSERT(body->testValue == "my_test_body"); + ++ SUCCESS_COUNTER; + return finish(); + } + + Action handleError(const std::shared_ptr& error) override { + if(error->is()) { + auto e = static_cast(error.get()); + OATPP_LOGE("[FullAsyncClientTest::ClientCoroutine_postBodyAsync::handleError()]", "AsyncIOError. %s, %d", e->what(), e->getCode()); + } else { + OATPP_LOGE("[FullAsyncClientTest::ClientCoroutine_postBodyAsync::handleError()]", "Error. %s", error->what()); + } + return propagateError(); + } + +}; + +std::atomic ClientCoroutine_postBodyAsync::SUCCESS_COUNTER(0); + //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// // ClientCoroutine_echoBodyAsync @@ -223,30 +264,39 @@ void FullAsyncClientTest::onRun() { OATPP_COMPONENT(std::shared_ptr, executor); ClientCoroutine_getRootAsync::SUCCESS_COUNTER = 0; + ClientCoroutine_postBodyAsync::SUCCESS_COUNTER = 0; ClientCoroutine_echoBodyAsync::SUCCESS_COUNTER = 0; v_int32 iterations = m_connectionsPerEndpoint; for(v_int32 i = 0; i < iterations; i++) { executor->execute(); + executor->execute(); executor->execute(); } while( ClientCoroutine_getRootAsync::SUCCESS_COUNTER != -1 || + ClientCoroutine_postBodyAsync::SUCCESS_COUNTER != -1 || ClientCoroutine_echoBodyAsync::SUCCESS_COUNTER != -1 ) { - OATPP_LOGV("Client", "Root=%d, Body=%d", + OATPP_LOGV("Client", "Root=%d, postBody=%d, echoBody=%d", ClientCoroutine_getRootAsync::SUCCESS_COUNTER.load(), + ClientCoroutine_postBodyAsync::SUCCESS_COUNTER.load(), ClientCoroutine_echoBodyAsync::SUCCESS_COUNTER.load() ); std::this_thread::sleep_for(std::chrono::milliseconds(100)); + if(ClientCoroutine_getRootAsync::SUCCESS_COUNTER == iterations){ ClientCoroutine_getRootAsync::SUCCESS_COUNTER = -1; OATPP_LOGV("Client", "getRootAsync - DONE!"); } + if(ClientCoroutine_postBodyAsync::SUCCESS_COUNTER == iterations){ + ClientCoroutine_postBodyAsync::SUCCESS_COUNTER = -1; + OATPP_LOGV("Client", "postBodyAsync - DONE!"); + } if(ClientCoroutine_echoBodyAsync::SUCCESS_COUNTER == iterations){ ClientCoroutine_echoBodyAsync::SUCCESS_COUNTER = -1; OATPP_LOGV("Client", "echoBodyAsync - DONE!"); @@ -254,6 +304,7 @@ void FullAsyncClientTest::onRun() { } OATPP_ASSERT(ClientCoroutine_getRootAsync::SUCCESS_COUNTER == -1); // -1 is success + OATPP_ASSERT(ClientCoroutine_postBodyAsync::SUCCESS_COUNTER == -1); // -1 is success OATPP_ASSERT(ClientCoroutine_echoBodyAsync::SUCCESS_COUNTER == -1); // -1 is success executor->waitTasksFinished(); // Wait executor tasks before quit.