Optimization. Keep buffer for response headers.

This commit is contained in:
lganzzzo 2019-10-12 16:37:36 +03:00
parent 46a6ca729c
commit 869966c466
8 changed files with 118 additions and 55 deletions

View File

@ -118,6 +118,39 @@ oatpp::String BufferOutputStream::getSubstring(data::v_io_size pos, data::v_io_s
}
}
oatpp::data::v_io_size BufferOutputStream::flushToStream(OutputStream* stream) {
return oatpp::data::stream::writeExactSizeData(stream, m_data, m_position);
}
oatpp::async::CoroutineStarter BufferOutputStream::flushToStreamAsync(const std::shared_ptr<BufferOutputStream>& _this, const std::shared_ptr<OutputStream>& stream) {
class WriteDataCoroutine : public oatpp::async::Coroutine<WriteDataCoroutine> {
private:
std::shared_ptr<BufferOutputStream> m_this;
std::shared_ptr<oatpp::data::stream::OutputStream> m_stream;
AsyncInlineWriteData m_inlineData;
public:
WriteDataCoroutine(const std::shared_ptr<BufferOutputStream>& _this,
const std::shared_ptr<oatpp::data::stream::OutputStream>& stream)
: m_this(_this)
, m_stream(stream)
{}
Action act() {
if(m_inlineData.currBufferPtr == nullptr) {
m_inlineData.currBufferPtr = m_this->m_data;
m_inlineData.bytesLeft = m_this->m_position;
}
return writeExactSizeDataAsyncInline(m_stream.get(), m_inlineData, finish());
}
};
return WriteDataCoroutine::start(_this, stream);
}
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// BufferInputStream

View File

@ -116,6 +116,21 @@ public:
*/
oatpp::String getSubstring(data::v_io_size pos, data::v_io_size count);
/**
* Write all bytes from buffer to stream.
* @param stream - stream to flush all data to.
* @return - actual amount of bytes flushed.
*/
oatpp::data::v_io_size flushToStream(OutputStream* stream);
/**
* Write all bytes from buffer to stream in async manner.
* @param _this - pointer to `this` buffer.
* @param stream - stream to flush all data to.
* @return - &id:oatpp::async::CoroutineStarter;.
*/
static oatpp::async::CoroutineStarter flushToStreamAsync(const std::shared_ptr<BufferOutputStream>& _this, const std::shared_ptr<OutputStream>& stream);
};
/**

View File

@ -24,8 +24,6 @@
#include "./Response.hpp"
#include "oatpp/core/data/stream/ChunkedBuffer.hpp"
namespace oatpp { namespace web { namespace protocol { namespace http { namespace outgoing {
Response::Response(const Status& status,
@ -84,95 +82,101 @@ std::shared_ptr<const Response::ConnectionHandler::ParameterMap> Response::getCo
return m_connectionUpgradeParameters;
}
void Response::send(data::stream::OutputStream* stream) {
void Response::send(data::stream::OutputStream* stream, oatpp::data::stream::BufferOutputStream* headersWriteBuffer) {
if(m_body){
m_body->declareHeaders(m_headers);
} else {
m_headers[Header::CONTENT_LENGTH] = "0";
}
oatpp::data::stream::ChunkedBuffer buffer;
headersWriteBuffer->setCurrentPosition(0);
buffer.write("HTTP/1.1 ", 9);
buffer.writeAsString(m_status.code);
buffer.write(" ", 1);
buffer.OutputStream::write(m_status.description);
buffer.write("\r\n", 2);
headersWriteBuffer->write("HTTP/1.1 ", 9);
headersWriteBuffer->writeAsString(m_status.code);
headersWriteBuffer->write(" ", 1);
headersWriteBuffer->OutputStream::write(m_status.description);
headersWriteBuffer->write("\r\n", 2);
auto it = m_headers.begin();
while(it != m_headers.end()) {
buffer.write(it->first.getData(), it->first.getSize());
buffer.write(": ", 2);
buffer.write(it->second.getData(), it->second.getSize());
buffer.write("\r\n", 2);
headersWriteBuffer->write(it->first.getData(), it->first.getSize());
headersWriteBuffer->write(": ", 2);
headersWriteBuffer->write(it->second.getData(), it->second.getSize());
headersWriteBuffer->write("\r\n", 2);
it ++;
}
buffer.write("\r\n", 2);
headersWriteBuffer->write("\r\n", 2);
if(m_body) {
auto bodySize = m_body->getKnownSize();
if(bodySize >= 0 && bodySize + buffer.getSize() < oatpp::data::stream::ChunkedBuffer::CHUNK_ENTRY_SIZE) {
m_body->writeToStream(&buffer);
buffer.flushToStream(stream);
if(bodySize >= 0 && bodySize + headersWriteBuffer->getCurrentPosition() < headersWriteBuffer->getCapacity()) {
m_body->writeToStream(headersWriteBuffer);
headersWriteBuffer->flushToStream(stream);
} else {
buffer.flushToStream(stream);
headersWriteBuffer->flushToStream(stream);
m_body->writeToStream(stream);
}
} else {
buffer.flushToStream(stream);
headersWriteBuffer->flushToStream(stream);
}
}
oatpp::async::CoroutineStarter Response::sendAsync(const std::shared_ptr<data::stream::OutputStream>& stream){
oatpp::async::CoroutineStarter Response::sendAsync(const std::shared_ptr<Response>& _this,
const std::shared_ptr<data::stream::OutputStream>& stream,
const std::shared_ptr<oatpp::data::stream::BufferOutputStream>& headersWriteBuffer)
{
class SendAsyncCoroutine : public oatpp::async::Coroutine<SendAsyncCoroutine> {
private:
std::shared_ptr<Response> m_response;
std::shared_ptr<Response> m_this;
std::shared_ptr<data::stream::OutputStream> m_stream;
std::shared_ptr<oatpp::data::stream::ChunkedBuffer> m_buffer;
std::shared_ptr<oatpp::data::stream::BufferOutputStream> m_headersWriteBuffer;
public:
SendAsyncCoroutine(const std::shared_ptr<Response>& response,
const std::shared_ptr<data::stream::OutputStream>& stream)
: m_response(response)
SendAsyncCoroutine(const std::shared_ptr<Response>& _this,
const std::shared_ptr<data::stream::OutputStream>& stream,
const std::shared_ptr<oatpp::data::stream::BufferOutputStream>& headersWriteBuffer)
: m_this(_this)
, m_stream(stream)
, m_buffer(oatpp::data::stream::ChunkedBuffer::createShared())
, m_headersWriteBuffer(headersWriteBuffer)
{}
Action act() override {
if(m_response->m_body){
m_response->m_body->declareHeaders(m_response->m_headers);
if(m_this->m_body){
m_this->m_body->declareHeaders(m_this->m_headers);
} else {
m_response->m_headers[Header::CONTENT_LENGTH] = "0";
m_this->m_headers[Header::CONTENT_LENGTH] = "0";
}
m_buffer->write("HTTP/1.1 ", 9);
m_buffer->writeAsString(m_response->m_status.code);
m_buffer->write(" ", 1);
m_buffer->OutputStream::write(m_response->m_status.description);
m_buffer->write("\r\n", 2);
http::Utils::writeHeaders(m_response->m_headers, m_buffer.get());
m_buffer->write("\r\n", 2);
m_headersWriteBuffer->setCurrentPosition(0);
const auto& body = m_response->m_body;
m_headersWriteBuffer->write("HTTP/1.1 ", 9);
m_headersWriteBuffer->writeAsString(m_this->m_status.code);
m_headersWriteBuffer->write(" ", 1);
m_headersWriteBuffer->OutputStream::write(m_this->m_status.description);
m_headersWriteBuffer->write("\r\n", 2);
http::Utils::writeHeaders(m_this->m_headers, m_headersWriteBuffer.get());
m_headersWriteBuffer->write("\r\n", 2);
const auto& body = m_this->m_body;
if(body) {
auto bodySize = body->getKnownSize();
if(bodySize >= 0 && bodySize + m_buffer->getSize() < oatpp::data::stream::ChunkedBuffer::CHUNK_ENTRY_SIZE) {
if(bodySize >= 0 && bodySize + m_headersWriteBuffer->getCurrentPosition() < m_headersWriteBuffer->getCapacity()) {
return body->writeToStreamAsync(m_buffer)
.next(m_buffer->flushToStreamAsync(m_stream))
return body->writeToStreamAsync(m_headersWriteBuffer)
.next(oatpp::data::stream::BufferOutputStream::flushToStreamAsync(m_headersWriteBuffer, m_stream))
.next(finish());
} else {
@ -186,19 +190,19 @@ oatpp::async::CoroutineStarter Response::sendAsync(const std::shared_ptr<data::s
}
Action writeHeaders() {
return m_buffer->flushToStreamAsync(m_stream).next(yieldTo(&SendAsyncCoroutine::writeBody));
return oatpp::data::stream::BufferOutputStream::flushToStreamAsync(m_headersWriteBuffer, m_stream).next(yieldTo(&SendAsyncCoroutine::writeBody));
}
Action writeBody() {
if(m_response->m_body) {
return m_response->m_body->writeToStreamAsync(m_stream).next(finish());
if(m_this->m_body) {
return m_this->m_body->writeToStreamAsync(m_stream).next(finish());
}
return finish();
}
};
return SendAsyncCoroutine::start(shared_from_this(), stream);
return SendAsyncCoroutine::start(_this, stream, headersWriteBuffer);
}

View File

@ -29,13 +29,14 @@
#include "oatpp/web/protocol/http/Http.hpp"
#include "oatpp/network/server/ConnectionHandler.hpp"
#include "oatpp/core/async/Coroutine.hpp"
#include "oatpp/core/data/stream/BufferStream.hpp"
namespace oatpp { namespace web { namespace protocol { namespace http { namespace outgoing {
/**
* Class which stores information of outgoing http Response.
*/
class Response : public oatpp::base::Countable, public std::enable_shared_from_this<Response> {
class Response : public oatpp::base::Countable {
public:
/**
* Convenience typedef for Headers. <br>
@ -137,15 +138,20 @@ public:
/**
* Write this Response to stream.
* @param stream - pointer to &id:oatpp::data::stream::OutputStream;.
* @param headersWriteBuffer - pointer to &id:oatpp::data::stream::BufferOutputStream;.
*/
void send(data::stream::OutputStream* stream);
void send(data::stream::OutputStream* stream, oatpp::data::stream::BufferOutputStream* headersWriteBuffer);
/**
* Same as &l:Response::send (); but async.
* @param _this - `this` response.
* @param stream - `std::shared_ptr` to &id:oatpp::data::stream::OutputStream;.
* @param headersWriteBuffer - `std::shared_ptr` to &id:oatpp::data::stream::BufferOutputStream;.
* @return - &id:oatpp::async::CoroutineStarter;.
*/
oatpp::async::CoroutineStarter sendAsync(const std::shared_ptr<data::stream::OutputStream>& stream);
static oatpp::async::CoroutineStarter sendAsync(const std::shared_ptr<Response>& _this,
const std::shared_ptr<data::stream::OutputStream>& stream,
const std::shared_ptr<oatpp::data::stream::BufferOutputStream>& headersWriteBuffer);
};

View File

@ -73,6 +73,7 @@ void HttpConnectionHandler::Task::run(){
std::shared_ptr<oatpp::web::protocol::http::outgoing::Response> response;
oatpp::data::stream::BufferOutputStream headersInBuffer(2048 /* initial capacity */, 2048 /* grow bytes */);
oatpp::data::stream::BufferOutputStream headersOutBuffer(2048 /* initial capacity */, 2048 /* grow bytes */);
oatpp::web::protocol::http::incoming::RequestHeadersReader headersReader(&headersInBuffer, 2048 /* read chunk size */, 4096 /* max headers size */);
do {
@ -80,7 +81,7 @@ void HttpConnectionHandler::Task::run(){
response = HttpProcessor::processRequest(m_router, headersReader, inStream, m_bodyDecoder, m_errorHandler, m_requestInterceptors, connectionState);
if(response) {
response->send(m_connection.get());
response->send(m_connection.get(), &headersOutBuffer);
} else {
return;
}

View File

@ -140,7 +140,8 @@ HttpProcessor::Coroutine::Action HttpProcessor::Coroutine::onResponseFormed() {
m_currentResponse->putHeaderIfNotExists(protocol::http::Header::SERVER, protocol::http::Header::Value::SERVER);
m_connectionState = oatpp::web::protocol::http::outgoing::CommunicationUtils::considerConnectionState(m_currentRequest, m_currentResponse);
return m_currentResponse->sendAsync(m_connection).next(yieldTo(&HttpProcessor::Coroutine::onRequestDone));
return protocol::http::outgoing::Response::sendAsync(m_currentResponse, m_connection, m_headersOutBuffer)
.next(yieldTo(&HttpProcessor::Coroutine::onRequestDone));
}

View File

@ -69,6 +69,7 @@ public:
HttpRouter* m_router;
oatpp::data::stream::BufferOutputStream m_headersInBuffer;
RequestHeadersReader m_headersReader;
std::shared_ptr<oatpp::data::stream::BufferOutputStream> m_headersOutBuffer;
std::shared_ptr<const oatpp::web::protocol::http::incoming::BodyDecoder> m_bodyDecoder;
std::shared_ptr<handler::ErrorHandler> m_errorHandler;
RequestInterceptors* m_requestInterceptors;
@ -90,6 +91,7 @@ public:
: m_router(router)
, m_headersInBuffer(2048 /* initialCapacity */, 2048 /* growBytes */)
, m_headersReader(&m_headersInBuffer, 2048 /* read chunk size */, 4096 /* max headers size */)
, m_headersOutBuffer(std::make_shared<oatpp::data::stream::BufferOutputStream>(2048 /* initialCapacity */, 2048 /* growBytes */))
, m_bodyDecoder(bodyDecoder)
, m_errorHandler(errorHandler)
, m_requestInterceptors(requestInterceptors)

View File

@ -88,6 +88,7 @@ void ApiControllerTest::onRun() {
typedef oatpp::web::protocol::http::Status Status;
Controller controller(nullptr);
oatpp::data::stream::BufferOutputStream headersOutBuffer;
{
auto endpoint = controller.Z__ENDPOINT_root;
@ -106,7 +107,7 @@ void ApiControllerTest::onRun() {
OATPP_ASSERT(response->getStatus().code == 200);
oatpp::data::stream::ChunkedBuffer stream;
response->send(&stream);
response->send(&stream, &headersOutBuffer);
OATPP_LOGD(TAG, "response:\n---\n%s\n---\n", stream.toString()->c_str());
@ -133,7 +134,7 @@ void ApiControllerTest::onRun() {
OATPP_ASSERT(response->getStatus().code == 200);
oatpp::data::stream::ChunkedBuffer stream;
response->send(&stream);
response->send(&stream, &headersOutBuffer);
OATPP_LOGD(TAG, "response:\n---\n%s\n---\n", stream.toString()->c_str());
@ -154,7 +155,7 @@ void ApiControllerTest::onRun() {
OATPP_ASSERT(response->getStatus().code == 200);
oatpp::data::stream::ChunkedBuffer stream;
response->send(&stream);
response->send(&stream, &headersOutBuffer);
OATPP_LOGD(TAG, "response:\n---\n%s\n---\n", stream.toString()->c_str());