Optimization. Use BufferOutputStream in RequestHeadersReader.

This commit is contained in:
lganzzzo 2019-10-11 03:52:25 +03:00
parent 2ea43a2867
commit 8b4dc2e3b0
15 changed files with 245 additions and 78 deletions

View File

@ -86,8 +86,8 @@ add_library(oatpp
oatpp/core/data/mapping/type/Type.hpp
oatpp/core/data/share/MemoryLabel.cpp
oatpp/core/data/share/MemoryLabel.hpp
oatpp/core/data/stream/BufferInputStream.cpp
oatpp/core/data/stream/BufferInputStream.hpp
oatpp/core/data/stream/BufferStream.cpp
oatpp/core/data/stream/BufferStream.hpp
oatpp/core/data/stream/ChunkedBuffer.cpp
oatpp/core/data/stream/ChunkedBuffer.hpp
oatpp/core/data/stream/FileStream.cpp

View File

@ -220,7 +220,9 @@ CoroutineHandle::~CoroutineHandle() {
Action CoroutineHandle::takeAction(Action&& action) {
while (true) {
v_int32 iterations = 0;
while (iterations < 10) {
switch (action.m_type) {
@ -245,7 +247,15 @@ Action CoroutineHandle::takeAction(Action&& action) {
case Action::TYPE_YIELD_TO: {
_FP = action.m_data.fptr;
return std::forward<oatpp::async::Action>(action);
break;
}
case Action::TYPE_REPEAT: {
break;
}
case Action::TYPE_IO_REPEAT: {
break;
}
case Action::TYPE_ERROR: {
@ -277,6 +287,9 @@ Action CoroutineHandle::takeAction(Action&& action) {
};
action = iterate();
++ iterations;
}
return std::forward<oatpp::async::Action>(action);

View File

@ -22,10 +22,94 @@
*
***************************************************************************/
#include "BufferInputStream.hpp"
#include "BufferStream.hpp"
namespace oatpp { namespace data{ namespace stream {
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// BufferOutputStream
BufferOutputStream::BufferOutputStream(v_io_size initialCapacity, v_io_size growBytes)
: m_data(new v_char8[initialCapacity])
, m_capacity(initialCapacity)
, m_position(0)
, m_growBytes(growBytes)
, m_ioMode(IOMode::NON_BLOCKING)
{}
BufferOutputStream::~BufferOutputStream() {
delete [] m_data;
}
data::v_io_size BufferOutputStream::write(const void *data, data::v_io_size count) {
reserveBytesUpfront(count);
std::memcpy(m_data + m_position, data, count);
m_position += count;
return count;
}
void BufferOutputStream::setOutputStreamIOMode(IOMode ioMode) {
m_ioMode = ioMode;
}
IOMode BufferOutputStream::getOutputStreamIOMode() {
return m_ioMode;
}
void BufferOutputStream::reserveBytesUpfront(v_io_size count) {
if(m_position + count > m_capacity) {
if(m_growBytes <= 0) {
throw std::runtime_error("[oatpp::data::stream::BufferOutputStream::reserveBytesUpfront()]: Error. Buffer was not allowed to grow.");
}
data::v_io_size extraNeeded = m_position + count - m_capacity;
data::v_io_size extraChunks = extraNeeded / m_growBytes;
if(extraChunks * m_growBytes < extraNeeded) {
extraChunks ++;
}
data::v_io_size newCapacity = m_capacity + extraChunks * m_growBytes;
p_char8 newData = new v_char8[newCapacity];
std::memcpy(newData, m_data, m_position);
delete [] m_data;
m_data = newData;
m_capacity = newCapacity;
}
}
p_char8 BufferOutputStream::getData() {
return m_data;
}
v_io_size BufferOutputStream::getCapacity() {
return m_capacity;
}
v_io_size BufferOutputStream::getCurrentPosition() {
return m_position;
}
void BufferOutputStream::setCurrentPosition(v_io_size position) {
m_position = position;
}
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// BufferInputStream
BufferInputStream::BufferInputStream(const std::shared_ptr<base::StrBuffer>& memoryHandle, p_char8 data, v_io_size size)
: m_memoryHandle(memoryHandle)
, m_data(data)

View File

@ -22,13 +22,91 @@
*
***************************************************************************/
#ifndef oatpp_data_stream_BufferInputStream_hpp
#define oatpp_data_stream_BufferInputStream_hpp
#ifndef oatpp_data_stream_BufferStream_hpp
#define oatpp_data_stream_BufferStream_hpp
#include "Stream.hpp"
namespace oatpp { namespace data{ namespace stream {
/**
* BufferOutputStream
*/
class BufferOutputStream : public ConsistentOutputStream {
private:
p_char8 m_data;
v_io_size m_capacity;
v_io_size m_position;
v_io_size m_growBytes;
IOMode m_ioMode;
public:
/**
* Constructor.
* @param growBytes
*/
BufferOutputStream(v_io_size initialCapacity = 2048, v_io_size growBytes = 2048);
/**
* Virtual destructor.
*/
~BufferOutputStream();
/**
* Write `count` of bytes to stream.
* @param data - data to write.
* @param count - number of bytes to write.
* @return - actual number of bytes written. &id:oatpp::data::v_io_size;.
*/
data::v_io_size write(const void *data, data::v_io_size count) override;
/**
* Set stream I/O mode.
* @throws
*/
void setOutputStreamIOMode(IOMode ioMode) override;
/**
* Get stream I/O mode.
* @return
*/
IOMode getOutputStreamIOMode() override;
/**
* Reserve bytes for future writes.
*/
void reserveBytesUpfront(v_io_size count);
/**
* Get pointer to data.
* @return - pointer to data.
*/
p_char8 getData();
/**
* Get current capacity.
* Capacity may change.
* @return
*/
v_io_size getCapacity();
/**
* Get current data write position.
* @return - current data write position.
*/
v_io_size getCurrentPosition();
/**
* Set current data write position.
* @param position - data write position.
*/
void setCurrentPosition(v_io_size position);
};
/**
* BufferInputStream
*/
class BufferInputStream : public InputStream {
private:
std::shared_ptr<base::StrBuffer> m_memoryHandle;
@ -131,4 +209,4 @@ public:
}}}
#endif // oatpp_data_stream_BufferInputStream_hpp
#endif // oatpp_data_stream_BufferStream_hpp

View File

@ -24,7 +24,7 @@
#include "InMemoryPartReader.hpp"
#include "oatpp/core/data/stream/BufferInputStream.hpp"
#include "oatpp/core/data/stream/BufferStream.hpp"
namespace oatpp { namespace web { namespace mime { namespace multipart {

View File

@ -28,29 +28,27 @@
namespace oatpp { namespace web { namespace protocol { namespace http { namespace incoming {
data::v_io_size RequestHeadersReader::readHeadersSection(data::stream::InputStreamBufferedProxy* stream,
oatpp::data::stream::ConsistentOutputStream* bufferStream,
Result& result) {
data::v_io_size RequestHeadersReader::readHeadersSection(data::stream::InputStreamBufferedProxy* stream, Result& result) {
v_word32 accumulator = 0;
v_int32 progress = 0;
m_bufferStream.setCurrentPosition(0);
data::v_io_size res;
while (true) {
v_int32 desiredToRead = m_buffer.getSize();
if(progress + desiredToRead > m_maxHeadersSize) {
desiredToRead = m_maxHeadersSize - progress;
v_int32 desiredToRead = m_readChunkSize;
if(m_bufferStream.getCurrentPosition() + desiredToRead > m_maxHeadersSize) {
desiredToRead = m_maxHeadersSize - m_bufferStream.getCurrentPosition();
if(desiredToRead <= 0) {
return -1;
}
}
auto bufferData = m_buffer.getData();
m_bufferStream.reserveBytesUpfront(desiredToRead);
auto bufferData = m_bufferStream.getData() + m_bufferStream.getCurrentPosition();
res = stream->peek(bufferData, desiredToRead);
if(res > 0) {
bufferStream->write(bufferData, res);
progress += res;
m_bufferStream.setCurrentPosition(m_bufferStream.getCurrentPosition() + res);
for(v_int32 i = 0; i < res; i ++) {
accumulator <<= 8;
@ -79,17 +77,15 @@ RequestHeadersReader::Result RequestHeadersReader::readHeaders(data::stream::Inp
http::HttpError::Info& error) {
RequestHeadersReader::Result result;
oatpp::data::stream::ChunkedBuffer buffer;
error.ioStatus = readHeadersSection(stream, &buffer, result);
error.ioStatus = readHeadersSection(stream, result);
if(error.ioStatus > 0) {
auto headersText = buffer.toString();
oatpp::parser::Caret caret (headersText);
oatpp::parser::Caret caret (m_bufferStream.getData(), m_bufferStream.getCurrentPosition());
http::Status status;
http::Parser::parseRequestStartingLine(result.startingLine, headersText.getPtr(), caret, status);
http::Parser::parseRequestStartingLine(result.startingLine, nullptr, caret, status);
if(status.code == 0) {
http::Parser::parseHeaders(result.headers, headersText.getPtr(), caret, status);
http::Parser::parseHeaders(result.headers, nullptr, caret, status);
}
}
@ -99,45 +95,42 @@ RequestHeadersReader::Result RequestHeadersReader::readHeaders(data::stream::Inp
oatpp::async::CoroutineStarterForResult<const RequestHeadersReader::Result&>
RequestHeadersReader::readHeadersAsync(const std::shared_ptr<data::stream::InputStreamBufferedProxy>& connection)
RequestHeadersReader::readHeadersAsync(const std::shared_ptr<data::stream::InputStreamBufferedProxy>& stream)
{
class ReaderCoroutine : public oatpp::async::CoroutineWithResult<ReaderCoroutine, const Result&> {
private:
std::shared_ptr<data::stream::InputStreamBufferedProxy> m_stream;
oatpp::data::share::MemoryLabel m_buffer;
v_int32 m_maxHeadersSize;
RequestHeadersReader* m_this;
v_word32 m_accumulator;
v_int32 m_progress;
RequestHeadersReader::Result m_result;
oatpp::data::stream::ChunkedBuffer m_bufferStream;
public:
ReaderCoroutine(const std::shared_ptr<data::stream::InputStreamBufferedProxy>& stream,
const oatpp::data::share::MemoryLabel& buffer, v_int32 maxHeadersSize)
: m_stream(stream)
, m_buffer(buffer)
, m_maxHeadersSize(maxHeadersSize)
ReaderCoroutine(RequestHeadersReader* _this,
const std::shared_ptr<data::stream::InputStreamBufferedProxy>& stream)
: m_this(_this)
, m_stream(stream)
, m_accumulator(0)
, m_progress(0)
{}
{
m_this->m_bufferStream.setCurrentPosition(0);
}
Action act() override {
v_int32 desiredToRead = m_buffer.getSize();
if(m_progress + desiredToRead > m_maxHeadersSize) {
desiredToRead = m_maxHeadersSize - m_progress;
v_int32 desiredToRead = m_this->m_readChunkSize;
if(m_this->m_bufferStream.getCurrentPosition() + desiredToRead > m_this->m_maxHeadersSize) {
desiredToRead = m_this->m_maxHeadersSize - m_this->m_bufferStream.getCurrentPosition();
if(desiredToRead <= 0) {
return error<Error>("[oatpp::web::protocol::http::incoming::RequestHeadersReader::readHeadersAsync()]: Error. Headers section is too large.");
}
}
auto bufferData = m_buffer.getData();
m_this->m_bufferStream.reserveBytesUpfront(desiredToRead);
auto bufferData = m_this->m_bufferStream.getData() + m_this->m_bufferStream.getCurrentPosition();
auto res = m_stream->peek(bufferData, desiredToRead);
if(res > 0) {
m_bufferStream.write(bufferData, res);
m_progress += res;
m_this->m_bufferStream.setCurrentPosition(m_this->m_bufferStream.getCurrentPosition() + res);
for(v_int32 i = 0; i < res; i ++) {
m_accumulator <<= 8;
@ -165,13 +158,12 @@ RequestHeadersReader::readHeadersAsync(const std::shared_ptr<data::stream::Input
}
Action parseHeaders() {
auto headersText = m_bufferStream.toString();
oatpp::parser::Caret caret (headersText);
oatpp::parser::Caret caret (m_this->m_bufferStream.getData(), m_this->m_bufferStream.getCurrentPosition());
http::Status status;
http::Parser::parseRequestStartingLine(m_result.startingLine, headersText.getPtr(), caret, status);
http::Parser::parseRequestStartingLine(m_result.startingLine, nullptr, caret, status);
if(status.code == 0) {
http::Parser::parseHeaders(m_result.headers, headersText.getPtr(), caret, status);
http::Parser::parseHeaders(m_result.headers, nullptr, caret, status);
if(status.code == 0) {
return _return(m_result);
} else {
@ -180,12 +172,12 @@ RequestHeadersReader::readHeadersAsync(const std::shared_ptr<data::stream::Input
} else {
return error<Error>("[oatpp::web::protocol::http::incoming::RequestHeadersReader::readHeadersAsync()]: Error. Can't parse starting line.");
}
}
};
return ReaderCoroutine::startForResult(connection, m_buffer, m_maxHeadersSize);
return ReaderCoroutine::startForResult(this, stream);
}

View File

@ -28,6 +28,7 @@
#include "oatpp/web/protocol/http/Http.hpp"
#include "oatpp/core/async/Coroutine.hpp"
#include "oatpp/core/data/stream/StreamBufferedProxy.hpp"
#include "oatpp/core/data/stream/BufferStream.hpp"
namespace oatpp { namespace web { namespace protocol { namespace http { namespace incoming {
@ -61,21 +62,20 @@ public:
};
private:
data::v_io_size readHeadersSection(data::stream::InputStreamBufferedProxy* stream,
oatpp::data::stream::ConsistentOutputStream* bufferStream,
Result& result);
data::v_io_size readHeadersSection(data::stream::InputStreamBufferedProxy* stream, Result& result);
private:
oatpp::data::share::MemoryLabel m_buffer;
v_int32 m_readChunkSize;
v_int32 m_maxHeadersSize;
oatpp::data::stream::BufferOutputStream m_bufferStream;
public:
/**
* Constructor.
* @param buffer - buffer to use to read data from stream. &id:oatpp::data::share::MemoryLabel;.
* @param readChunkSize
* @param maxHeadersSize
*/
RequestHeadersReader(const oatpp::data::share::MemoryLabel& buffer, v_int32 maxHeadersSize)
: m_buffer(buffer)
RequestHeadersReader(v_int32 readChunkSize = 2048, v_int32 maxHeadersSize = 4096)
: m_readChunkSize(readChunkSize)
, m_maxHeadersSize(maxHeadersSize)
{}
@ -92,7 +92,7 @@ public:
* @param stream - `std::shared_ptr` to &id:oatpp::data::stream::InputStreamBufferedProxy;.
* @return - &id:oatpp::async::CoroutineStarterForResult;.
*/
oatpp::async::CoroutineStarterForResult<const RequestHeadersReader::Result&> readHeadersAsync(const std::shared_ptr<data::stream::InputStreamBufferedProxy>& connection);
oatpp::async::CoroutineStarterForResult<const RequestHeadersReader::Result&> readHeadersAsync(const std::shared_ptr<data::stream::InputStreamBufferedProxy>& stream);
};

View File

@ -30,7 +30,7 @@
#include "oatpp/web/mime/multipart/Multipart.hpp"
#include "oatpp/core/data/stream/BufferInputStream.hpp"
#include "oatpp/core/data/stream/BufferStream.hpp"
namespace oatpp { namespace web { namespace protocol { namespace http { namespace outgoing {

View File

@ -31,6 +31,12 @@
#include "oatpp/core/concurrency/Thread.hpp"
#include "oatpp/core/data/buffer/IOBuffer.hpp"
#include "oatpp/core/data/stream/BufferStream.hpp"
#include "oatpp/core/data/stream/StreamBufferedProxy.hpp"
namespace oatpp { namespace web { namespace server {
HttpConnectionHandler::Task::Task(HttpRouter* router,
@ -66,9 +72,11 @@ void HttpConnectionHandler::Task::run(){
v_int32 connectionState = oatpp::web::protocol::http::outgoing::CommunicationUtils::CONNECTION_STATE_CLOSE;
std::shared_ptr<oatpp::web::protocol::http::outgoing::Response> response;
oatpp::web::protocol::http::incoming::RequestHeadersReader headersReader;
do {
response = HttpProcessor::processRequest(m_router, inStream, m_bodyDecoder, m_errorHandler, m_requestInterceptors, connectionState);
response = HttpProcessor::processRequest(m_router, headersReader, inStream, m_bodyDecoder, m_errorHandler, m_requestInterceptors, connectionState);
if(response) {
response->send(m_connection.get());

View File

@ -34,9 +34,6 @@
#include "oatpp/network/server/ConnectionHandler.hpp"
#include "oatpp/network/Connection.hpp"
#include "oatpp/core/data/stream/StreamBufferedProxy.hpp"
#include "oatpp/core/data/buffer/IOBuffer.hpp"
namespace oatpp { namespace web { namespace server {
/**

View File

@ -28,6 +28,7 @@ namespace oatpp { namespace web { namespace server {
std::shared_ptr<protocol::http::outgoing::Response>
HttpProcessor::processRequest(HttpRouter* router,
RequestHeadersReader& headersReader,
const std::shared_ptr<oatpp::data::stream::InputStreamBufferedProxy>& inStream,
const std::shared_ptr<const oatpp::web::protocol::http::incoming::BodyDecoder>& bodyDecoder,
const std::shared_ptr<handler::ErrorHandler>& errorHandler,
@ -35,16 +36,9 @@ HttpProcessor::processRequest(HttpRouter* router,
v_int32& connectionState) {
RequestHeadersReader::Result headersReadResult;
oatpp::web::protocol::http::HttpError::Info error;
{
const v_int32 bufferSize = 2048;
v_char8 buffer [bufferSize];
oatpp::data::share::MemoryLabel bufferLabel(nullptr, buffer, bufferSize);
RequestHeadersReader headersReader(bufferLabel, 4096);
headersReadResult = headersReader.readHeaders(inStream.get(), error);
}
oatpp::web::protocol::http::HttpError::Info error;
auto headersReadResult = headersReader.readHeaders(inStream.get(), error);
if(error.status.code != 0) {
connectionState = oatpp::web::protocol::http::outgoing::CommunicationUtils::CONNECTION_STATE_CLOSE;
@ -130,8 +124,7 @@ oatpp::async::Action HttpProcessor::Coroutine::onHeadersParsed(const RequestHead
}
HttpProcessor::Coroutine::Action HttpProcessor::Coroutine::act() {
RequestHeadersReader headersReader(m_headerReaderBuffer, 4096);
return headersReader.readHeadersAsync(m_inStream).callbackTo(&HttpProcessor::Coroutine::onHeadersParsed);
return m_headersReader.readHeadersAsync(m_inStream).callbackTo(&HttpProcessor::Coroutine::onHeadersParsed);
}
HttpProcessor::Coroutine::Action HttpProcessor::Coroutine::onRequestFormed() {

View File

@ -67,6 +67,7 @@ public:
class Coroutine : public oatpp::async::Coroutine<HttpProcessor::Coroutine> {
private:
HttpRouter* m_router;
RequestHeadersReader m_headersReader;
std::shared_ptr<const oatpp::web::protocol::http::incoming::BodyDecoder> m_bodyDecoder;
std::shared_ptr<handler::ErrorHandler> m_errorHandler;
RequestInterceptors* m_requestInterceptors;
@ -113,6 +114,7 @@ public:
static std::shared_ptr<protocol::http::outgoing::Response>
processRequest(HttpRouter* router,
RequestHeadersReader& headersReader,
const std::shared_ptr<oatpp::data::stream::InputStreamBufferedProxy>& inStream,
const std::shared_ptr<const oatpp::web::protocol::http::incoming::BodyDecoder>& bodyDecoder,
const std::shared_ptr<handler::ErrorHandler>& errorHandler,

View File

@ -38,7 +38,7 @@
#include "oatpp/network/virtual_/server/ConnectionProvider.hpp"
#include "oatpp/network/virtual_/Interface.hpp"
#include "oatpp/core/data/stream/BufferInputStream.hpp"
#include "oatpp/core/data/stream/BufferStream.hpp"
#include "oatpp/core/macro/component.hpp"
#include "oatpp-test/web/ClientServerTestRunner.hpp"

View File

@ -38,7 +38,7 @@
#include "oatpp/network/virtual_/server/ConnectionProvider.hpp"
#include "oatpp/network/virtual_/Interface.hpp"
#include "oatpp/core/data/stream/BufferInputStream.hpp"
#include "oatpp/core/data/stream/BufferStream.hpp"
#include "oatpp/core/macro/component.hpp"
#include "oatpp-test/web/ClientServerTestRunner.hpp"

View File

@ -27,7 +27,7 @@
#include "oatpp/web/mime/multipart/InMemoryPartReader.hpp"
#include "oatpp/web/mime/multipart/Reader.hpp"
#include "oatpp/core/data/stream/BufferInputStream.hpp"
#include "oatpp/core/data/stream/BufferStream.hpp"
#include <unordered_map>