2
0
mirror of https://github.com/oatpp/oatpp.git synced 2025-03-31 18:30:22 +08:00

Fix BodyDecoder. Better FullAsyncClientTest

This commit is contained in:
lganzzzo 2019-07-26 02:35:58 +04:00
parent 464fc4da13
commit 5e045a42de
3 changed files with 90 additions and 44 deletions
src/oatpp/web/protocol/http/incoming
test/oatpp/web

@ -26,26 +26,6 @@
namespace oatpp { namespace web { namespace protocol { namespace http { namespace incoming {
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// BodyDecoder::ToStringDecoder
BodyDecoder::ToStringDecoder::ToStringDecoder(const BodyDecoder* decoder,
const Headers& headers,
const std::shared_ptr<data::stream::InputStream>& bodyStream)
: m_decoder(decoder)
, m_headers(headers)
, m_bodyStream(bodyStream)
, m_chunkedBuffer(data::stream::ChunkedBuffer::createShared())
{}
async::Action BodyDecoder::ToStringDecoder::act() {
return m_decoder->decodeToStreamAsync(m_headers, m_bodyStream, m_chunkedBuffer).next(yieldTo(&ToStringDecoder::onDecoded));
}
async::Action BodyDecoder::ToStringDecoder::onDecoded() {
return _return(m_chunkedBuffer->toString());
}
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// BodyDecoder
@ -58,11 +38,45 @@ void BodyDecoder::decodeToStream(const Headers& headers,
}
async::CoroutineStarter BodyDecoder::decodeToStreamAsync(const Headers& headers,
const std::shared_ptr<data::stream::InputStream>& bodyStream,
const std::shared_ptr<data::stream::OutputStream>& toStream) const
const std::shared_ptr<data::stream::InputStream>& bodyStream,
const std::shared_ptr<data::stream::OutputStream>& toStream) const
{
auto callback = std::make_shared<data::stream::DefaultAsyncWriteCallback>(toStream);
return decodeAsync(headers, bodyStream, callback);
}
oatpp::async::CoroutineStarterForResult<const oatpp::String&>
BodyDecoder::decodeToStringAsync(const Headers& headers, const std::shared_ptr<data::stream::InputStream>& bodyStream) const {
class ToStringDecoder : public oatpp::async::CoroutineWithResult<ToStringDecoder, const oatpp::String&> {
private:
const BodyDecoder* m_decoder;
Headers m_headers;
std::shared_ptr<oatpp::data::stream::InputStream> m_bodyStream;
std::shared_ptr<oatpp::data::stream::ChunkedBuffer> m_chunkedBuffer;
public:
ToStringDecoder(const BodyDecoder* decoder,
const Headers& headers,
const std::shared_ptr<data::stream::InputStream>& bodyStream)
: m_decoder(decoder)
, m_headers(headers)
, m_bodyStream(bodyStream)
, m_chunkedBuffer(data::stream::ChunkedBuffer::createShared())
{}
Action act() override {
return m_decoder->decodeToStreamAsync(m_headers, m_bodyStream, m_chunkedBuffer).next(yieldTo(&ToStringDecoder::onDecoded));
}
Action onDecoded() {
return _return(m_chunkedBuffer->toString());
}
};
return ToStringDecoder::startForResult(this, headers, bodyStream);
}
}}}}}

@ -39,23 +39,6 @@ namespace oatpp { namespace web { namespace protocol { namespace http { namespac
*/
class BodyDecoder {
private:
class ToStringDecoder : public oatpp::async::CoroutineWithResult<ToStringDecoder, const oatpp::String&> {
private:
const BodyDecoder* m_decoder;
Headers m_headers;
std::shared_ptr<oatpp::data::stream::InputStream> m_bodyStream;
std::shared_ptr<oatpp::data::stream::ChunkedBuffer> m_chunkedBuffer;
public:
ToStringDecoder(const BodyDecoder* decoder,
const Headers& headers,
const std::shared_ptr<oatpp::data::stream::InputStream>& bodyStream);
Action act() override;
Action onDecoded();
};
template<class Type>
class ToDtoDecoder : public oatpp::async::CoroutineWithResult<ToDtoDecoder<Type>, const typename Type::ObjectWrapper&> {
@ -78,7 +61,7 @@ private:
{}
oatpp::async::Action act() override {
return m_decoder->decodeAsync(m_headers, m_bodyStream, m_chunkedBuffer).next(this->yieldTo(&ToDtoDecoder::onDecoded));
return m_decoder->decodeToStreamAsync(m_headers, m_bodyStream, m_chunkedBuffer).next(this->yieldTo(&ToDtoDecoder::onDecoded));
}
oatpp::async::Action onDecoded() {
@ -172,9 +155,7 @@ public:
* @return - &id:oatpp::async::CoroutineStarterForResult;.
*/
oatpp::async::CoroutineStarterForResult<const oatpp::String&>
decodeToStringAsync(const Headers& headers, const std::shared_ptr<data::stream::InputStream>& bodyStream) const {
return ToStringDecoder::startForResult(this, headers, bodyStream);
}
decodeToStringAsync(const Headers& headers, const std::shared_ptr<data::stream::InputStream>& bodyStream) const;
/**
* Same as &l:BodyDecoder::decodeToDto (); but Async.

@ -161,6 +161,47 @@ public:
std::atomic<v_int32> ClientCoroutine_getRootAsync::SUCCESS_COUNTER(0);
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// ClientCoroutine_postBodyAsync
class ClientCoroutine_postBodyAsync : public oatpp::async::Coroutine<ClientCoroutine_postBodyAsync> {
public:
static std::atomic<v_int32> SUCCESS_COUNTER;
private:
OATPP_COMPONENT(std::shared_ptr<app::Client>, appClient);
OATPP_COMPONENT(std::shared_ptr<oatpp::data::mapping::ObjectMapper>, objectMapper);
public:
Action act() override {
return appClient->postBodyAsync("my_test_body").callbackTo(&ClientCoroutine_postBodyAsync::onResponse);
}
Action onResponse(const std::shared_ptr<IncomingResponse>& response) {
OATPP_ASSERT(response->getStatusCode() == 200 && "ClientCoroutine_postBodyAsync");
return response->readBodyToDtoAsync<app::TestDto>(objectMapper).callbackTo(&ClientCoroutine_postBodyAsync::onBodyRead);
}
Action onBodyRead(const app::TestDto::ObjectWrapper& body) {
OATPP_ASSERT(body);
OATPP_ASSERT(body->testValue == "my_test_body");
++ SUCCESS_COUNTER;
return finish();
}
Action handleError(const std::shared_ptr<const Error>& error) override {
if(error->is<oatpp::data::AsyncIOError>()) {
auto e = static_cast<const oatpp::data::AsyncIOError*>(error.get());
OATPP_LOGE("[FullAsyncClientTest::ClientCoroutine_postBodyAsync::handleError()]", "AsyncIOError. %s, %d", e->what(), e->getCode());
} else {
OATPP_LOGE("[FullAsyncClientTest::ClientCoroutine_postBodyAsync::handleError()]", "Error. %s", error->what());
}
return propagateError();
}
};
std::atomic<v_int32> ClientCoroutine_postBodyAsync::SUCCESS_COUNTER(0);
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// ClientCoroutine_echoBodyAsync
@ -223,30 +264,39 @@ void FullAsyncClientTest::onRun() {
OATPP_COMPONENT(std::shared_ptr<oatpp::async::Executor>, executor);
ClientCoroutine_getRootAsync::SUCCESS_COUNTER = 0;
ClientCoroutine_postBodyAsync::SUCCESS_COUNTER = 0;
ClientCoroutine_echoBodyAsync::SUCCESS_COUNTER = 0;
v_int32 iterations = m_connectionsPerEndpoint;
for(v_int32 i = 0; i < iterations; i++) {
executor->execute<ClientCoroutine_getRootAsync>();
executor->execute<ClientCoroutine_postBodyAsync>();
executor->execute<ClientCoroutine_echoBodyAsync>();
}
while(
ClientCoroutine_getRootAsync::SUCCESS_COUNTER != -1 ||
ClientCoroutine_postBodyAsync::SUCCESS_COUNTER != -1 ||
ClientCoroutine_echoBodyAsync::SUCCESS_COUNTER != -1
) {
OATPP_LOGV("Client", "Root=%d, Body=%d",
OATPP_LOGV("Client", "Root=%d, postBody=%d, echoBody=%d",
ClientCoroutine_getRootAsync::SUCCESS_COUNTER.load(),
ClientCoroutine_postBodyAsync::SUCCESS_COUNTER.load(),
ClientCoroutine_echoBodyAsync::SUCCESS_COUNTER.load()
);
std::this_thread::sleep_for(std::chrono::milliseconds(100));
if(ClientCoroutine_getRootAsync::SUCCESS_COUNTER == iterations){
ClientCoroutine_getRootAsync::SUCCESS_COUNTER = -1;
OATPP_LOGV("Client", "getRootAsync - DONE!");
}
if(ClientCoroutine_postBodyAsync::SUCCESS_COUNTER == iterations){
ClientCoroutine_postBodyAsync::SUCCESS_COUNTER = -1;
OATPP_LOGV("Client", "postBodyAsync - DONE!");
}
if(ClientCoroutine_echoBodyAsync::SUCCESS_COUNTER == iterations){
ClientCoroutine_echoBodyAsync::SUCCESS_COUNTER = -1;
OATPP_LOGV("Client", "echoBodyAsync - DONE!");
@ -254,6 +304,7 @@ void FullAsyncClientTest::onRun() {
}
OATPP_ASSERT(ClientCoroutine_getRootAsync::SUCCESS_COUNTER == -1); // -1 is success
OATPP_ASSERT(ClientCoroutine_postBodyAsync::SUCCESS_COUNTER == -1); // -1 is success
OATPP_ASSERT(ClientCoroutine_echoBodyAsync::SUCCESS_COUNTER == -1); // -1 is success
executor->waitTasksFinished(); // Wait executor tasks before quit.