From 869966c466c270a153a3124b4646de9ad771ac66 Mon Sep 17 00:00:00 2001 From: lganzzzo Date: Sat, 12 Oct 2019 16:37:36 +0300 Subject: [PATCH] Optimization. Keep buffer for response headers. --- src/oatpp/core/data/stream/BufferStream.cpp | 33 +++++++ src/oatpp/core/data/stream/BufferStream.hpp | 15 +++ .../web/protocol/http/outgoing/Response.cpp | 98 ++++++++++--------- .../web/protocol/http/outgoing/Response.hpp | 12 ++- .../web/server/HttpConnectionHandler.cpp | 3 +- src/oatpp/web/server/HttpProcessor.cpp | 3 +- src/oatpp/web/server/HttpProcessor.hpp | 2 + .../web/server/api/ApiControllerTest.cpp | 7 +- 8 files changed, 118 insertions(+), 55 deletions(-) diff --git a/src/oatpp/core/data/stream/BufferStream.cpp b/src/oatpp/core/data/stream/BufferStream.cpp index cc123ef9..cf874e6c 100644 --- a/src/oatpp/core/data/stream/BufferStream.cpp +++ b/src/oatpp/core/data/stream/BufferStream.cpp @@ -118,6 +118,39 @@ oatpp::String BufferOutputStream::getSubstring(data::v_io_size pos, data::v_io_s } } +oatpp::data::v_io_size BufferOutputStream::flushToStream(OutputStream* stream) { + return oatpp::data::stream::writeExactSizeData(stream, m_data, m_position); +} + +oatpp::async::CoroutineStarter BufferOutputStream::flushToStreamAsync(const std::shared_ptr& _this, const std::shared_ptr& stream) { + + class WriteDataCoroutine : public oatpp::async::Coroutine { + private: + std::shared_ptr m_this; + std::shared_ptr m_stream; + AsyncInlineWriteData m_inlineData; + public: + + WriteDataCoroutine(const std::shared_ptr& _this, + const std::shared_ptr& stream) + : m_this(_this) + , m_stream(stream) + {} + + Action act() { + if(m_inlineData.currBufferPtr == nullptr) { + m_inlineData.currBufferPtr = m_this->m_data; + m_inlineData.bytesLeft = m_this->m_position; + } + return writeExactSizeDataAsyncInline(m_stream.get(), m_inlineData, finish()); + } + + }; + + return WriteDataCoroutine::start(_this, stream); + +} + //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// // BufferInputStream diff --git a/src/oatpp/core/data/stream/BufferStream.hpp b/src/oatpp/core/data/stream/BufferStream.hpp index a3aada0a..62d22e2c 100644 --- a/src/oatpp/core/data/stream/BufferStream.hpp +++ b/src/oatpp/core/data/stream/BufferStream.hpp @@ -116,6 +116,21 @@ public: */ oatpp::String getSubstring(data::v_io_size pos, data::v_io_size count); + /** + * Write all bytes from buffer to stream. + * @param stream - stream to flush all data to. + * @return - actual amount of bytes flushed. + */ + oatpp::data::v_io_size flushToStream(OutputStream* stream); + + /** + * Write all bytes from buffer to stream in async manner. + * @param _this - pointer to `this` buffer. + * @param stream - stream to flush all data to. + * @return - &id:oatpp::async::CoroutineStarter;. + */ + static oatpp::async::CoroutineStarter flushToStreamAsync(const std::shared_ptr& _this, const std::shared_ptr& stream); + }; /** diff --git a/src/oatpp/web/protocol/http/outgoing/Response.cpp b/src/oatpp/web/protocol/http/outgoing/Response.cpp index 4950b2d9..4fedcd67 100644 --- a/src/oatpp/web/protocol/http/outgoing/Response.cpp +++ b/src/oatpp/web/protocol/http/outgoing/Response.cpp @@ -24,8 +24,6 @@ #include "./Response.hpp" -#include "oatpp/core/data/stream/ChunkedBuffer.hpp" - namespace oatpp { namespace web { namespace protocol { namespace http { namespace outgoing { Response::Response(const Status& status, @@ -84,95 +82,101 @@ std::shared_ptr Response::getCo return m_connectionUpgradeParameters; } -void Response::send(data::stream::OutputStream* stream) { - +void Response::send(data::stream::OutputStream* stream, oatpp::data::stream::BufferOutputStream* headersWriteBuffer) { + if(m_body){ m_body->declareHeaders(m_headers); } else { m_headers[Header::CONTENT_LENGTH] = "0"; } - oatpp::data::stream::ChunkedBuffer buffer; + headersWriteBuffer->setCurrentPosition(0); - buffer.write("HTTP/1.1 ", 9); - buffer.writeAsString(m_status.code); - buffer.write(" ", 1); - buffer.OutputStream::write(m_status.description); - buffer.write("\r\n", 2); + headersWriteBuffer->write("HTTP/1.1 ", 9); + headersWriteBuffer->writeAsString(m_status.code); + headersWriteBuffer->write(" ", 1); + headersWriteBuffer->OutputStream::write(m_status.description); + headersWriteBuffer->write("\r\n", 2); auto it = m_headers.begin(); while(it != m_headers.end()) { - buffer.write(it->first.getData(), it->first.getSize()); - buffer.write(": ", 2); - buffer.write(it->second.getData(), it->second.getSize()); - buffer.write("\r\n", 2); + headersWriteBuffer->write(it->first.getData(), it->first.getSize()); + headersWriteBuffer->write(": ", 2); + headersWriteBuffer->write(it->second.getData(), it->second.getSize()); + headersWriteBuffer->write("\r\n", 2); it ++; } - buffer.write("\r\n", 2); + headersWriteBuffer->write("\r\n", 2); if(m_body) { auto bodySize = m_body->getKnownSize(); - if(bodySize >= 0 && bodySize + buffer.getSize() < oatpp::data::stream::ChunkedBuffer::CHUNK_ENTRY_SIZE) { - m_body->writeToStream(&buffer); - buffer.flushToStream(stream); + if(bodySize >= 0 && bodySize + headersWriteBuffer->getCurrentPosition() < headersWriteBuffer->getCapacity()) { + m_body->writeToStream(headersWriteBuffer); + headersWriteBuffer->flushToStream(stream); } else { - buffer.flushToStream(stream); + headersWriteBuffer->flushToStream(stream); m_body->writeToStream(stream); } } else { - buffer.flushToStream(stream); + headersWriteBuffer->flushToStream(stream); } } -oatpp::async::CoroutineStarter Response::sendAsync(const std::shared_ptr& stream){ +oatpp::async::CoroutineStarter Response::sendAsync(const std::shared_ptr& _this, + const std::shared_ptr& stream, + const std::shared_ptr& headersWriteBuffer) +{ class SendAsyncCoroutine : public oatpp::async::Coroutine { private: - std::shared_ptr m_response; + std::shared_ptr m_this; std::shared_ptr m_stream; - std::shared_ptr m_buffer; + std::shared_ptr m_headersWriteBuffer; public: - SendAsyncCoroutine(const std::shared_ptr& response, - const std::shared_ptr& stream) - : m_response(response) + SendAsyncCoroutine(const std::shared_ptr& _this, + const std::shared_ptr& stream, + const std::shared_ptr& headersWriteBuffer) + : m_this(_this) , m_stream(stream) - , m_buffer(oatpp::data::stream::ChunkedBuffer::createShared()) + , m_headersWriteBuffer(headersWriteBuffer) {} Action act() override { - if(m_response->m_body){ - m_response->m_body->declareHeaders(m_response->m_headers); + if(m_this->m_body){ + m_this->m_body->declareHeaders(m_this->m_headers); } else { - m_response->m_headers[Header::CONTENT_LENGTH] = "0"; + m_this->m_headers[Header::CONTENT_LENGTH] = "0"; } - - m_buffer->write("HTTP/1.1 ", 9); - m_buffer->writeAsString(m_response->m_status.code); - m_buffer->write(" ", 1); - m_buffer->OutputStream::write(m_response->m_status.description); - m_buffer->write("\r\n", 2); - http::Utils::writeHeaders(m_response->m_headers, m_buffer.get()); - - m_buffer->write("\r\n", 2); + m_headersWriteBuffer->setCurrentPosition(0); - const auto& body = m_response->m_body; + m_headersWriteBuffer->write("HTTP/1.1 ", 9); + m_headersWriteBuffer->writeAsString(m_this->m_status.code); + m_headersWriteBuffer->write(" ", 1); + m_headersWriteBuffer->OutputStream::write(m_this->m_status.description); + m_headersWriteBuffer->write("\r\n", 2); + + http::Utils::writeHeaders(m_this->m_headers, m_headersWriteBuffer.get()); + + m_headersWriteBuffer->write("\r\n", 2); + + const auto& body = m_this->m_body; if(body) { auto bodySize = body->getKnownSize(); - if(bodySize >= 0 && bodySize + m_buffer->getSize() < oatpp::data::stream::ChunkedBuffer::CHUNK_ENTRY_SIZE) { + if(bodySize >= 0 && bodySize + m_headersWriteBuffer->getCurrentPosition() < m_headersWriteBuffer->getCapacity()) { - return body->writeToStreamAsync(m_buffer) - .next(m_buffer->flushToStreamAsync(m_stream)) + return body->writeToStreamAsync(m_headersWriteBuffer) + .next(oatpp::data::stream::BufferOutputStream::flushToStreamAsync(m_headersWriteBuffer, m_stream)) .next(finish()); } else { @@ -186,19 +190,19 @@ oatpp::async::CoroutineStarter Response::sendAsync(const std::shared_ptrflushToStreamAsync(m_stream).next(yieldTo(&SendAsyncCoroutine::writeBody)); + return oatpp::data::stream::BufferOutputStream::flushToStreamAsync(m_headersWriteBuffer, m_stream).next(yieldTo(&SendAsyncCoroutine::writeBody)); } Action writeBody() { - if(m_response->m_body) { - return m_response->m_body->writeToStreamAsync(m_stream).next(finish()); + if(m_this->m_body) { + return m_this->m_body->writeToStreamAsync(m_stream).next(finish()); } return finish(); } }; - return SendAsyncCoroutine::start(shared_from_this(), stream); + return SendAsyncCoroutine::start(_this, stream, headersWriteBuffer); } diff --git a/src/oatpp/web/protocol/http/outgoing/Response.hpp b/src/oatpp/web/protocol/http/outgoing/Response.hpp index e4099fc3..ee2f0f2c 100644 --- a/src/oatpp/web/protocol/http/outgoing/Response.hpp +++ b/src/oatpp/web/protocol/http/outgoing/Response.hpp @@ -29,13 +29,14 @@ #include "oatpp/web/protocol/http/Http.hpp" #include "oatpp/network/server/ConnectionHandler.hpp" #include "oatpp/core/async/Coroutine.hpp" +#include "oatpp/core/data/stream/BufferStream.hpp" namespace oatpp { namespace web { namespace protocol { namespace http { namespace outgoing { /** * Class which stores information of outgoing http Response. */ -class Response : public oatpp::base::Countable, public std::enable_shared_from_this { +class Response : public oatpp::base::Countable { public: /** * Convenience typedef for Headers.
@@ -137,15 +138,20 @@ public: /** * Write this Response to stream. * @param stream - pointer to &id:oatpp::data::stream::OutputStream;. + * @param headersWriteBuffer - pointer to &id:oatpp::data::stream::BufferOutputStream;. */ - void send(data::stream::OutputStream* stream); + void send(data::stream::OutputStream* stream, oatpp::data::stream::BufferOutputStream* headersWriteBuffer); /** * Same as &l:Response::send (); but async. + * @param _this - `this` response. * @param stream - `std::shared_ptr` to &id:oatpp::data::stream::OutputStream;. + * @param headersWriteBuffer - `std::shared_ptr` to &id:oatpp::data::stream::BufferOutputStream;. * @return - &id:oatpp::async::CoroutineStarter;. */ - oatpp::async::CoroutineStarter sendAsync(const std::shared_ptr& stream); + static oatpp::async::CoroutineStarter sendAsync(const std::shared_ptr& _this, + const std::shared_ptr& stream, + const std::shared_ptr& headersWriteBuffer); }; diff --git a/src/oatpp/web/server/HttpConnectionHandler.cpp b/src/oatpp/web/server/HttpConnectionHandler.cpp index 1c483aaa..26ccb6d6 100644 --- a/src/oatpp/web/server/HttpConnectionHandler.cpp +++ b/src/oatpp/web/server/HttpConnectionHandler.cpp @@ -73,6 +73,7 @@ void HttpConnectionHandler::Task::run(){ std::shared_ptr response; oatpp::data::stream::BufferOutputStream headersInBuffer(2048 /* initial capacity */, 2048 /* grow bytes */); + oatpp::data::stream::BufferOutputStream headersOutBuffer(2048 /* initial capacity */, 2048 /* grow bytes */); oatpp::web::protocol::http::incoming::RequestHeadersReader headersReader(&headersInBuffer, 2048 /* read chunk size */, 4096 /* max headers size */); do { @@ -80,7 +81,7 @@ void HttpConnectionHandler::Task::run(){ response = HttpProcessor::processRequest(m_router, headersReader, inStream, m_bodyDecoder, m_errorHandler, m_requestInterceptors, connectionState); if(response) { - response->send(m_connection.get()); + response->send(m_connection.get(), &headersOutBuffer); } else { return; } diff --git a/src/oatpp/web/server/HttpProcessor.cpp b/src/oatpp/web/server/HttpProcessor.cpp index 15091fd0..0dc7dfa9 100644 --- a/src/oatpp/web/server/HttpProcessor.cpp +++ b/src/oatpp/web/server/HttpProcessor.cpp @@ -140,7 +140,8 @@ HttpProcessor::Coroutine::Action HttpProcessor::Coroutine::onResponseFormed() { m_currentResponse->putHeaderIfNotExists(protocol::http::Header::SERVER, protocol::http::Header::Value::SERVER); m_connectionState = oatpp::web::protocol::http::outgoing::CommunicationUtils::considerConnectionState(m_currentRequest, m_currentResponse); - return m_currentResponse->sendAsync(m_connection).next(yieldTo(&HttpProcessor::Coroutine::onRequestDone)); + return protocol::http::outgoing::Response::sendAsync(m_currentResponse, m_connection, m_headersOutBuffer) + .next(yieldTo(&HttpProcessor::Coroutine::onRequestDone)); } diff --git a/src/oatpp/web/server/HttpProcessor.hpp b/src/oatpp/web/server/HttpProcessor.hpp index 72ba69b1..7c23d667 100644 --- a/src/oatpp/web/server/HttpProcessor.hpp +++ b/src/oatpp/web/server/HttpProcessor.hpp @@ -69,6 +69,7 @@ public: HttpRouter* m_router; oatpp::data::stream::BufferOutputStream m_headersInBuffer; RequestHeadersReader m_headersReader; + std::shared_ptr m_headersOutBuffer; std::shared_ptr m_bodyDecoder; std::shared_ptr m_errorHandler; RequestInterceptors* m_requestInterceptors; @@ -90,6 +91,7 @@ public: : m_router(router) , m_headersInBuffer(2048 /* initialCapacity */, 2048 /* growBytes */) , m_headersReader(&m_headersInBuffer, 2048 /* read chunk size */, 4096 /* max headers size */) + , m_headersOutBuffer(std::make_shared(2048 /* initialCapacity */, 2048 /* growBytes */)) , m_bodyDecoder(bodyDecoder) , m_errorHandler(errorHandler) , m_requestInterceptors(requestInterceptors) diff --git a/test/oatpp/web/server/api/ApiControllerTest.cpp b/test/oatpp/web/server/api/ApiControllerTest.cpp index 0fcf4372..912d484c 100644 --- a/test/oatpp/web/server/api/ApiControllerTest.cpp +++ b/test/oatpp/web/server/api/ApiControllerTest.cpp @@ -88,6 +88,7 @@ void ApiControllerTest::onRun() { typedef oatpp::web::protocol::http::Status Status; Controller controller(nullptr); + oatpp::data::stream::BufferOutputStream headersOutBuffer; { auto endpoint = controller.Z__ENDPOINT_root; @@ -106,7 +107,7 @@ void ApiControllerTest::onRun() { OATPP_ASSERT(response->getStatus().code == 200); oatpp::data::stream::ChunkedBuffer stream; - response->send(&stream); + response->send(&stream, &headersOutBuffer); OATPP_LOGD(TAG, "response:\n---\n%s\n---\n", stream.toString()->c_str()); @@ -133,7 +134,7 @@ void ApiControllerTest::onRun() { OATPP_ASSERT(response->getStatus().code == 200); oatpp::data::stream::ChunkedBuffer stream; - response->send(&stream); + response->send(&stream, &headersOutBuffer); OATPP_LOGD(TAG, "response:\n---\n%s\n---\n", stream.toString()->c_str()); @@ -154,7 +155,7 @@ void ApiControllerTest::onRun() { OATPP_ASSERT(response->getStatus().code == 200); oatpp::data::stream::ChunkedBuffer stream; - response->send(&stream); + response->send(&stream, &headersOutBuffer); OATPP_LOGD(TAG, "response:\n---\n%s\n---\n", stream.toString()->c_str());