minor optimization

This commit is contained in:
lganzzzo 2019-09-24 02:45:53 +03:00
parent bbbf7217bf
commit 2479af48c4
17 changed files with 143 additions and 205 deletions

View File

@ -401,7 +401,6 @@ private:
public:
CoroutineHandle(Processor* processor, AbstractCoroutine* rootCoroutine);
~CoroutineHandle();
Action takeAction(Action&& action);

View File

@ -151,8 +151,7 @@ void Processor::popTasks() {
void Processor::consumeAllTasks() {
for(auto& submission : m_taskList) {
auto coroutine = submission->createCoroutine();
m_queue.pushBack(new CoroutineHandle(this, coroutine));
m_queue.pushBack(submission->createCoroutine(this));
}
m_taskList.clear();
}
@ -197,55 +196,43 @@ bool Processor::iterate(v_int32 numIterations) {
for(v_int32 i = 0; i < numIterations; i++) {
for(v_int32 j = 0; j < 10; j ++) {
auto CP = m_queue.first;
if (CP == nullptr) {
goto end_loop;
}
if (CP->finished()) {
m_queue.popFrontNoData();
-- m_tasksCounter;
} else {
auto CP = m_queue.first;
if (CP == nullptr) {
goto end_loop;
}
if (CP->finished()) {
m_queue.popFrontNoData();
-- m_tasksCounter;
} else {
const Action &action = CP->iterateAndTakeAction();
const Action &action = CP->iterateAndTakeAction();
switch (action.m_type) {
switch (action.m_type) {
case Action::TYPE_IO_WAIT:
CP->_SCH_A = Action::clone(action);
m_queue.popFront();
popIOTask(CP);
break;
case Action::TYPE_IO_WAIT:
CP->_SCH_A = Action::clone(action);
m_queue.popFront();
popIOTask(CP);
break;
case Action::TYPE_WAIT_REPEAT:
CP->_SCH_A = Action::clone(action);
m_queue.popFront();
popTimerTask(CP);
break;
// case Action::TYPE_IO_REPEAT: // DO NOT RESCHEDULE COROUTINE WITH ACTIVE I/O
// CP->_SCH_A = Action::clone(action);
// m_queue.popFront();
// popIOTask(CP);
// break;
case Action::TYPE_WAIT_REPEAT:
CP->_SCH_A = Action::clone(action);
m_queue.popFront();
popTimerTask(CP);
break;
case Action::TYPE_WAIT_LIST:
CP->_SCH_A = Action::createActionByType(Action::TYPE_NONE);
m_queue.popFront();
action.m_data.waitList->pushBack(CP);
break;
// default:
// m_queue.round();
}
case Action::TYPE_WAIT_LIST:
CP->_SCH_A = Action::createActionByType(Action::TYPE_NONE);
m_queue.popFront();
action.m_data.waitList->pushBack(CP);
break;
default:
m_queue.round();
}
}
m_queue.round();
}
end_loop:

View File

@ -46,7 +46,7 @@ private:
class TaskSubmission {
public:
virtual ~TaskSubmission() {};
virtual AbstractCoroutine* createCoroutine() = 0;
virtual CoroutineHandle* createCoroutine(Processor* processor) = 0;
};
/*
@ -73,13 +73,13 @@ private:
: m_params(std::make_tuple(params...))
{}
virtual AbstractCoroutine* createCoroutine() {
return creator(typename SequenceGenerator<sizeof...(Args)>::type());
virtual CoroutineHandle* createCoroutine(Processor* processor) {
return creator(processor, typename SequenceGenerator<sizeof...(Args)>::type());
}
template<int ...S>
AbstractCoroutine* creator(IndexSequence<S...>) {
return new CoroutineType(std::get<S>(m_params) ...);
CoroutineHandle* creator(Processor* processor, IndexSequence<S...>) {
return new CoroutineHandle(processor, new CoroutineType(std::get<S>(m_params) ...));
}
};

View File

@ -377,14 +377,14 @@ async::CoroutineStarter FIFOBuffer::flushToStreamAsync(const std::shared_ptr<dat
class FlushCoroutine : public oatpp::async::Coroutine<FlushCoroutine> {
private:
std::shared_ptr<FIFOBuffer> m_fifo;
FIFOBuffer* m_fifo;
std::shared_ptr<data::stream::OutputStream> m_stream;
private:
data::stream::AsyncInlineWriteData m_data1;
data::stream::AsyncInlineWriteData m_data2;
public:
FlushCoroutine(const std::shared_ptr<FIFOBuffer>& fifo, const std::shared_ptr<data::stream::OutputStream>& stream)
FlushCoroutine(FIFOBuffer* fifo, const std::shared_ptr<data::stream::OutputStream>& stream)
: m_fifo(fifo)
, m_stream(stream)
{}
@ -428,7 +428,7 @@ async::CoroutineStarter FIFOBuffer::flushToStreamAsync(const std::shared_ptr<dat
};
return FlushCoroutine::start(shared_from_this(), stream);
return FlushCoroutine::start(this, stream);
}

View File

@ -36,7 +36,7 @@ namespace oatpp { namespace data { namespace buffer {
* FIFO operations over the buffer
* !FIFOBuffer is NOT an IOStream despite having similar APIs!
*/
class FIFOBuffer : public std::enable_shared_from_this<FIFOBuffer> {
class FIFOBuffer {
private:
p_char8 m_buffer;
v_io_size m_bufferSize;

View File

@ -51,6 +51,12 @@ public:
, m_size(0)
{}
/**
* Constructor.
* @param str
*/
MemoryLabel(const std::shared_ptr<base::StrBuffer>& str) : MemoryLabel(str, str->getData(), str->getSize()) {}
/**
* Constructor.
* @param memHandle - memory handle. `std::shared_ptr` to buffer pointed by a memory label.

View File

@ -27,12 +27,12 @@
namespace oatpp { namespace data{ namespace stream {
data::v_io_size OutputStreamBufferedProxy::write(const void *data, data::v_io_size count) {
if(m_buffer->availableToWrite() > 0) {
return m_buffer->write(data, count);
if(m_buffer.availableToWrite() > 0) {
return m_buffer.write(data, count);
} else {
auto bytesFlushed = m_buffer->readAndWriteToStream(m_outputStream.get(), m_buffer->getBufferSize());
auto bytesFlushed = m_buffer.readAndWriteToStream(m_outputStream.get(), m_buffer.getBufferSize());
if(bytesFlushed > 0) {
return m_buffer->write(data, count);
return m_buffer.write(data, count);
}
return bytesFlushed;
}
@ -54,11 +54,11 @@ oatpp::data::stream::IOMode OutputStreamBufferedProxy::getOutputStreamIOMode() {
}
data::v_io_size OutputStreamBufferedProxy::flush() {
return m_buffer->flushToStream(m_outputStream.get());
return m_buffer.flushToStream(m_outputStream.get());
}
oatpp::async::CoroutineStarter OutputStreamBufferedProxy::flushAsync() {
return m_buffer->flushToStreamAsync(m_outputStream);
return m_buffer.flushToStreamAsync(m_outputStream);
}
data::v_io_size InputStreamBufferedProxy::read(void *data, data::v_io_size count) {

View File

@ -27,7 +27,7 @@
#include "Stream.hpp"
#include "oatpp/core/data/buffer/FIFOBuffer.hpp"
#include "oatpp/core/data/buffer/IOBuffer.hpp"
#include "oatpp/core/data/share/MemoryLabel.hpp"
#include "oatpp/core/async/Coroutine.hpp"
namespace oatpp { namespace data{ namespace stream {
@ -40,36 +40,19 @@ public:
typedef v_int32 v_bufferSize;
private:
std::shared_ptr<OutputStream> m_outputStream;
std::shared_ptr<oatpp::data::buffer::IOBuffer> m_bufferPtr;
std::shared_ptr<buffer::FIFOBuffer> m_buffer;
oatpp::data::share::MemoryLabel m_memoryLabel;
buffer::FIFOBuffer m_buffer;
public:
OutputStreamBufferedProxy(const std::shared_ptr<OutputStream>& outputStream,
const std::shared_ptr<oatpp::data::buffer::IOBuffer>& bufferPtr,
p_char8 buffer,
v_bufferSize bufferSize)
const oatpp::data::share::MemoryLabel& memoryLabel)
: m_outputStream(outputStream)
, m_bufferPtr(bufferPtr)
, m_buffer(std::make_shared<buffer::FIFOBuffer>(buffer, bufferSize))
, m_memoryLabel(memoryLabel)
, m_buffer(memoryLabel.getData(), memoryLabel.getSize())
{}
public:
static std::shared_ptr<OutputStreamBufferedProxy> createShared(const std::shared_ptr<OutputStream>& outputStream,
const std::shared_ptr<oatpp::data::buffer::IOBuffer>& buffer)
{
return Shared_OutputStreamBufferedProxy_Pool::allocateShared(outputStream,
buffer,
(p_char8) buffer->getData(),
buffer->getSize());
}
static std::shared_ptr<OutputStreamBufferedProxy> createShared(const std::shared_ptr<OutputStream>& outputStream,
p_char8 buffer,
v_bufferSize bufferSize)
{
return Shared_OutputStreamBufferedProxy_Pool::allocateShared(outputStream,
nullptr,
buffer,
bufferSize);
static std::shared_ptr<OutputStreamBufferedProxy> createShared(const std::shared_ptr<OutputStream>& outputStream, const oatpp::data::share::MemoryLabel& memoryLabel) {
return Shared_OutputStreamBufferedProxy_Pool::allocateShared(outputStream, memoryLabel);
}
data::v_io_size write(const void *data, data::v_io_size count) override;
@ -92,7 +75,7 @@ public:
oatpp::async::CoroutineStarter flushAsync();
void setBufferPosition(data::v_io_size readPosition, data::v_io_size writePosition, bool canRead) {
m_buffer->setBufferPosition(readPosition, writePosition, canRead);
m_buffer.setBufferPosition(readPosition, writePosition, canRead);
}
};
@ -105,72 +88,33 @@ public:
typedef v_int32 v_bufferSize;
protected:
std::shared_ptr<InputStream> m_inputStream;
std::shared_ptr<oatpp::data::buffer::IOBuffer> m_bufferPtr;
oatpp::data::share::MemoryLabel m_memoryLabel;
buffer::FIFOBuffer m_buffer;
public:
InputStreamBufferedProxy(const std::shared_ptr<InputStream>& inputStream,
const std::shared_ptr<oatpp::data::buffer::IOBuffer>& bufferPtr,
p_char8 buffer,
v_bufferSize bufferSize,
const oatpp::data::share::MemoryLabel& memoryLabel,
data::v_io_size bufferReadPosition,
data::v_io_size bufferWritePosition,
bool bufferCanRead)
: m_inputStream(inputStream)
, m_bufferPtr(bufferPtr)
, m_buffer(buffer, bufferSize, bufferReadPosition, bufferWritePosition, bufferCanRead)
, m_memoryLabel(memoryLabel)
, m_buffer(memoryLabel.getData(), memoryLabel.getSize(), bufferReadPosition, bufferWritePosition, bufferCanRead)
{}
public:
static std::shared_ptr<InputStreamBufferedProxy> createShared(const std::shared_ptr<InputStream>& inputStream,
const std::shared_ptr<oatpp::data::buffer::IOBuffer>& buffer)
const oatpp::data::share::MemoryLabel& memoryLabel)
{
return Shared_InputStreamBufferedProxy_Pool::allocateShared(inputStream,
buffer,
(p_char8) buffer->getData(),
buffer->getSize(),
0, 0, false);
return Shared_InputStreamBufferedProxy_Pool::allocateShared(inputStream, memoryLabel, 0, 0, false);
}
static std::shared_ptr<InputStreamBufferedProxy> createShared(const std::shared_ptr<InputStream>& inputStream,
const std::shared_ptr<oatpp::data::buffer::IOBuffer>& buffer,
const oatpp::data::share::MemoryLabel& memoryLabel,
data::v_io_size bufferReadPosition,
data::v_io_size bufferWritePosition,
bool bufferCanRead)
{
return Shared_InputStreamBufferedProxy_Pool::allocateShared(inputStream,
buffer,
(p_char8) buffer->getData(),
buffer->getSize(),
bufferReadPosition,
bufferWritePosition,
bufferCanRead);
}
static std::shared_ptr<InputStreamBufferedProxy> createShared(const std::shared_ptr<InputStream>& inputStream,
p_char8 buffer,
v_bufferSize bufferSize)
{
return Shared_InputStreamBufferedProxy_Pool::allocateShared(inputStream,
nullptr,
buffer,
bufferSize,
0, 0, false);
}
static std::shared_ptr<InputStreamBufferedProxy> createShared(const std::shared_ptr<InputStream>& inputStream,
p_char8 buffer,
v_bufferSize bufferSize,
data::v_io_size bufferReadPosition,
data::v_io_size bufferWritePosition,
bool bufferCanRead)
{
return Shared_InputStreamBufferedProxy_Pool::allocateShared(inputStream,
nullptr,
buffer,
bufferSize,
bufferReadPosition,
bufferWritePosition,
bufferCanRead);
return Shared_InputStreamBufferedProxy_Pool::allocateShared(inputStream, memoryLabel, bufferReadPosition, bufferWritePosition, bufferCanRead);
}
data::v_io_size read(void *data, data::v_io_size count) override;

View File

@ -115,14 +115,14 @@ HttpRequestExecutor::execute(const String& method,
auto request = oatpp::web::protocol::http::outgoing::Request::createShared(method, path, headers, body);
request->putHeaderIfNotExists(oatpp::web::protocol::http::Header::HOST, m_connectionProvider->getProperty("host"));
request->putHeaderIfNotExists(oatpp::web::protocol::http::Header::CONNECTION, oatpp::web::protocol::http::Header::Value::CONNECTION_KEEP_ALIVE);
auto ioBuffer = oatpp::data::buffer::IOBuffer::createShared();
oatpp::data::stream::OutputStreamBufferedProxy upStream(connection, ioBuffer, (p_char8)ioBuffer->getData(), ioBuffer->getSize());
oatpp::data::share::MemoryLabel buffer(oatpp::base::StrBuffer::createShared(oatpp::data::buffer::IOBuffer::BUFFER_SIZE));
oatpp::data::stream::OutputStreamBufferedProxy upStream(connection, buffer);
request->send(&upStream);
upStream.flush();
oatpp::web::protocol::http::incoming::ResponseHeadersReader headerReader(ioBuffer->getData(), ioBuffer->getSize(), 4096);
oatpp::web::protocol::http::incoming::ResponseHeadersReader headerReader(buffer, 4096);
oatpp::web::protocol::http::HttpError::Info error;
const auto& result = headerReader.readHeaders(connection, error);
@ -137,7 +137,7 @@ HttpRequestExecutor::execute(const String& method,
}
auto bodyStream = oatpp::data::stream::InputStreamBufferedProxy::createShared(connection,
ioBuffer,
buffer,
result.bufferPosStart,
result.bufferPosEnd,
result.bufferPosStart != result.bufferPosEnd);
@ -169,7 +169,7 @@ HttpRequestExecutor::executeAsync(const String& method,
std::shared_ptr<oatpp::data::stream::OutputStreamBufferedProxy> m_upstream;
private:
std::shared_ptr<oatpp::data::stream::IOStream> m_connection;
std::shared_ptr<oatpp::data::buffer::IOBuffer> m_ioBuffer;
oatpp::data::share::MemoryLabel m_buffer;
public:
ExecutorCoroutine(const std::shared_ptr<oatpp::network::ClientConnectionProvider>& connectionProvider,
@ -205,20 +205,20 @@ HttpRequestExecutor::executeAsync(const String& method,
auto request = oatpp::web::protocol::http::outgoing::Request::createShared(m_method, m_path, m_headers, m_body);
request->putHeaderIfNotExists(Header::HOST, m_connectionProvider->getProperty("host"));
request->putHeaderIfNotExists(Header::CONNECTION, Header::Value::CONNECTION_KEEP_ALIVE);
m_ioBuffer = oatpp::data::buffer::IOBuffer::createShared();
m_upstream = oatpp::data::stream::OutputStreamBufferedProxy::createShared(connection, m_ioBuffer);
m_buffer = oatpp::data::share::MemoryLabel(oatpp::base::StrBuffer::createShared(oatpp::data::buffer::IOBuffer::BUFFER_SIZE));
m_upstream = oatpp::data::stream::OutputStreamBufferedProxy::createShared(connection, m_buffer);
return request->sendAsync(m_upstream).next(m_upstream->flushAsync()).next(yieldTo(&ExecutorCoroutine::readResponse));
}
Action readResponse() {
ResponseHeadersReader headersReader(m_ioBuffer->getData(), m_ioBuffer->getSize(), 4096);
ResponseHeadersReader headersReader(m_buffer, 4096);
return headersReader.readHeadersAsync(m_connection).callbackTo(&ExecutorCoroutine::onHeadersParsed);
}
Action onHeadersParsed(const ResponseHeadersReader::Result& result) {
auto bodyStream = oatpp::data::stream::InputStreamBufferedProxy::createShared(m_connection,
m_ioBuffer,
m_buffer,
result.bufferPosStart,
result.bufferPosEnd,
result.bufferPosStart != result.bufferPosEnd);

View File

@ -37,23 +37,24 @@ data::v_io_size RequestHeadersReader::readHeadersSection(data::stream::InputStre
data::v_io_size res;
while (true) {
v_int32 desiredToRead = m_bufferSize;
v_int32 desiredToRead = m_buffer.getSize();
if(progress + desiredToRead > m_maxHeadersSize) {
desiredToRead = m_maxHeadersSize - progress;
if(desiredToRead <= 0) {
return -1;
}
}
res = stream->peek(m_buffer, desiredToRead);
auto bufferData = m_buffer.getData();
res = stream->peek(bufferData, desiredToRead);
if(res > 0) {
bufferStream->write(m_buffer, res);
bufferStream->write(bufferData, res);
progress += res;
for(v_int32 i = 0; i < res; i ++) {
accumulator <<= 8;
accumulator |= m_buffer[i];
accumulator |= bufferData[i];
if(accumulator == SECTION_END) {
stream->commitReadOffset(i + 1);
return res;
@ -104,8 +105,7 @@ RequestHeadersReader::readHeadersAsync(const std::shared_ptr<data::stream::Input
class ReaderCoroutine : public oatpp::async::CoroutineWithResult<ReaderCoroutine, const Result&> {
private:
std::shared_ptr<data::stream::InputStreamBufferedProxy> m_stream;
p_char8 m_buffer;
v_int32 m_bufferSize;
oatpp::data::share::MemoryLabel m_buffer;
v_int32 m_maxHeadersSize;
v_word32 m_accumulator;
v_int32 m_progress;
@ -114,10 +114,9 @@ RequestHeadersReader::readHeadersAsync(const std::shared_ptr<data::stream::Input
public:
ReaderCoroutine(const std::shared_ptr<data::stream::InputStreamBufferedProxy>& stream,
p_char8 buffer, v_int32 bufferSize, v_int32 maxHeadersSize)
const oatpp::data::share::MemoryLabel& buffer, v_int32 maxHeadersSize)
: m_stream(stream)
, m_buffer(buffer)
, m_bufferSize(bufferSize)
, m_maxHeadersSize(maxHeadersSize)
, m_accumulator(0)
, m_progress(0)
@ -125,23 +124,24 @@ RequestHeadersReader::readHeadersAsync(const std::shared_ptr<data::stream::Input
Action act() override {
v_int32 desiredToRead = m_bufferSize;
v_int32 desiredToRead = m_buffer.getSize();
if(m_progress + desiredToRead > m_maxHeadersSize) {
desiredToRead = m_maxHeadersSize - m_progress;
if(desiredToRead <= 0) {
return error<Error>("[oatpp::web::protocol::http::incoming::RequestHeadersReader::readHeadersAsync()]: Error. Headers section is too large.");
}
}
auto res = m_stream->peek(m_buffer, desiredToRead);
auto bufferData = m_buffer.getData();
auto res = m_stream->peek(bufferData, desiredToRead);
if(res > 0) {
m_bufferStream.write(m_buffer, res);
m_bufferStream.write(bufferData, res);
m_progress += res;
for(v_int32 i = 0; i < res; i ++) {
m_accumulator <<= 8;
m_accumulator |= m_buffer[i];
m_accumulator |= bufferData[i];
if(m_accumulator == SECTION_END) {
m_stream->commitReadOffset(i + 1);
return yieldTo(&ReaderCoroutine::parseHeaders);
@ -185,7 +185,7 @@ RequestHeadersReader::readHeadersAsync(const std::shared_ptr<data::stream::Input
};
return ReaderCoroutine::startForResult(connection, m_buffer, m_bufferSize, m_maxHeadersSize);
return ReaderCoroutine::startForResult(connection, m_buffer, m_maxHeadersSize);
}

View File

@ -65,20 +65,17 @@ private:
oatpp::data::stream::ConsistentOutputStream* bufferStream,
Result& result);
private:
p_char8 m_buffer;
v_int32 m_bufferSize;
oatpp::data::share::MemoryLabel m_buffer;
v_int32 m_maxHeadersSize;
public:
/**
* Constructor.
* @param buffer - buffer to use to read data from stream.
* @param bufferSize - buffer size.
* @param maxHeadersSize - maximum allowed size in bytes of http headers section.
* @param buffer - buffer to use to read data from stream. &id:oatpp::data::share::MemoryLabel;.
* @param maxHeadersSize
*/
RequestHeadersReader(void* buffer, v_int32 bufferSize, v_int32 maxHeadersSize)
: m_buffer((p_char8) buffer)
, m_bufferSize(bufferSize)
RequestHeadersReader(const oatpp::data::share::MemoryLabel& buffer, v_int32 maxHeadersSize)
: m_buffer(buffer)
, m_maxHeadersSize(maxHeadersSize)
{}
@ -88,8 +85,7 @@ public:
* @param error - out parameter &id:oatpp::web::protocol::ProtocolError::Info;.
* @return - &l:RequestHeadersReader::Result;.
*/
Result readHeaders(data::stream::InputStreamBufferedProxy* stream,
http::HttpError::Info& error);
Result readHeaders(data::stream::InputStreamBufferedProxy* stream, http::HttpError::Info& error);
/**
* Read and parse http headers from stream in asynchronous manner.

View File

@ -38,21 +38,22 @@ data::v_io_size ResponseHeadersReader::readHeadersSection(const std::shared_ptr<
data::v_io_size res;
while (true) {
v_int32 desiredToRead = m_bufferSize;
v_int32 desiredToRead = m_buffer.getSize();
if(progress + desiredToRead > m_maxHeadersSize) {
desiredToRead = m_maxHeadersSize - progress;
if(desiredToRead <= 0) {
return -1;
}
}
res = connection->read(m_buffer, desiredToRead);
auto bufferData = m_buffer.getData();
res = connection->read(bufferData, desiredToRead);
if(res > 0) {
bufferStream->write(m_buffer, res);
bufferStream->write(bufferData, res);
for(v_int32 i = 0; i < res; i ++) {
accumulator <<= 8;
accumulator |= m_buffer[i];
accumulator |= bufferData[i];
if(accumulator == sectionEnd) {
result.bufferPosStart = i + 1;
result.bufferPosEnd = (v_int32) res;
@ -101,8 +102,7 @@ ResponseHeadersReader::readHeadersAsync(const std::shared_ptr<oatpp::data::strea
class ReaderCoroutine : public oatpp::async::CoroutineWithResult<ReaderCoroutine, const Result&> {
private:
std::shared_ptr<oatpp::data::stream::IOStream> m_connection;
p_char8 m_buffer;
v_int32 m_bufferSize;
oatpp::data::share::MemoryLabel m_buffer;
v_int32 m_maxHeadersSize;
v_word32 m_accumulator;
v_int32 m_progress;
@ -111,10 +111,9 @@ ResponseHeadersReader::readHeadersAsync(const std::shared_ptr<oatpp::data::strea
public:
ReaderCoroutine(const std::shared_ptr<oatpp::data::stream::IOStream>& connection,
p_char8 buffer, v_int32 bufferSize, v_int32 maxHeadersSize)
const oatpp::data::share::MemoryLabel buffer, v_int32 maxHeadersSize)
: m_connection(connection)
, m_buffer(buffer)
, m_bufferSize(bufferSize)
, m_maxHeadersSize(maxHeadersSize)
, m_accumulator(0)
, m_progress(0)
@ -122,22 +121,22 @@ ResponseHeadersReader::readHeadersAsync(const std::shared_ptr<oatpp::data::strea
Action act() override {
v_int32 desiredToRead = m_bufferSize;
v_int32 desiredToRead = m_buffer.getSize();
if(m_progress + desiredToRead > m_maxHeadersSize) {
desiredToRead = m_maxHeadersSize - m_progress;
if(desiredToRead <= 0) {
return error<Error>("[oatpp::web::protocol::http::incoming::RequestHeadersReader::readHeadersAsync()]: Error. Headers section is too large.");
}
}
auto res = m_connection->read(m_buffer, desiredToRead);
auto bufferData = m_buffer.getData();
auto res = m_connection->read(bufferData, desiredToRead);
if(res > 0) {
m_bufferStream.write(m_buffer, res);
m_bufferStream.write(bufferData, res);
m_progress += res;
for(v_int32 i = 0; i < res; i ++) {
m_accumulator <<= 8;
m_accumulator |= m_buffer[i];
m_accumulator |= bufferData[i];
if(m_accumulator == SECTION_END) {
m_result.bufferPosStart = i + 1;
m_result.bufferPosEnd = (v_int32) res;
@ -180,7 +179,7 @@ ResponseHeadersReader::readHeadersAsync(const std::shared_ptr<oatpp::data::strea
};
return ReaderCoroutine::startForResult(connection, m_buffer, m_bufferSize, m_maxHeadersSize);
return ReaderCoroutine::startForResult(connection, m_buffer, m_maxHeadersSize);
}

View File

@ -73,20 +73,17 @@ private:
oatpp::data::stream::OutputStream* bufferStream,
Result& result);
private:
p_char8 m_buffer;
v_int32 m_bufferSize;
oatpp::data::share::MemoryLabel m_buffer;
v_int32 m_maxHeadersSize;
public:
/**
* Constructor.
* @param buffer - buffer to use to read data from stream.
* @param bufferSize - buffer size.
* @param maxHeadersSize - maximum allowed size in bytes of http headers section.
* @param buffer - buffer to use to read data from stream. &id:oatpp::data::share::MemoryLabel;.
* @param maxHeadersSize
*/
ResponseHeadersReader(void* buffer, v_int32 bufferSize, v_int32 maxHeadersSize)
: m_buffer((p_char8) buffer)
, m_bufferSize(bufferSize)
ResponseHeadersReader(const oatpp::data::share::MemoryLabel& buffer, v_int32 maxHeadersSize)
: m_buffer(buffer)
, m_maxHeadersSize(maxHeadersSize)
{}

View File

@ -71,9 +71,16 @@ void AsyncHttpConnectionHandler::handleConnection(const std::shared_ptr<IOStream
connection->setOutputStreamIOMode(oatpp::data::stream::IOMode::NON_BLOCKING);
connection->setInputStreamIOMode(oatpp::data::stream::IOMode::NON_BLOCKING);
auto bufferMemory = oatpp::base::StrBuffer::createShared(oatpp::data::buffer::IOBuffer::BUFFER_SIZE * 2);
oatpp::data::share::MemoryLabel inBuffer(bufferMemory,
&bufferMemory->getData()[0],
oatpp::data::buffer::IOBuffer::BUFFER_SIZE);
auto inBuffer = oatpp::data::buffer::IOBuffer::createShared();
auto outBuffer = oatpp::data::buffer::IOBuffer::createShared();
oatpp::data::share::MemoryLabel outBuffer(bufferMemory,
&bufferMemory->getData()[oatpp::data::buffer::IOBuffer::BUFFER_SIZE],
oatpp::data::buffer::IOBuffer::BUFFER_SIZE);
auto inStream = oatpp::data::stream::InputStreamBufferedProxy::createShared(connection, inBuffer);
auto outStream = oatpp::data::stream::OutputStreamBufferedProxy::createShared(connection, outBuffer);

View File

@ -55,13 +55,15 @@ HttpConnectionHandler::Task::createShared(HttpRouter* router,
}
void HttpConnectionHandler::Task::run(){
const v_int32 bufferSize = oatpp::data::buffer::IOBuffer::BUFFER_SIZE;
v_char8 inBuffer [bufferSize];
v_char8 outBuffer [bufferSize];
auto inStream = oatpp::data::stream::InputStreamBufferedProxy::createShared(m_connection, inBuffer, bufferSize);
auto outStream = oatpp::data::stream::OutputStreamBufferedProxy::createShared(m_connection, outBuffer, bufferSize);
const v_int32 bufferSize = oatpp::data::buffer::IOBuffer::BUFFER_SIZE;
v_char8 bufferMemory[bufferSize * 2];
oatpp::data::share::MemoryLabel inBuffer(nullptr, bufferMemory, bufferSize);
oatpp::data::share::MemoryLabel outBuffer(nullptr, bufferMemory + bufferSize, bufferSize);
auto inStream = oatpp::data::stream::InputStreamBufferedProxy::createShared(m_connection, inBuffer);
oatpp::data::stream::OutputStreamBufferedProxy outStream(m_connection, outBuffer);
v_int32 connectionState = oatpp::web::protocol::http::outgoing::CommunicationUtils::CONNECTION_STATE_CLOSE;
std::shared_ptr<oatpp::web::protocol::http::outgoing::Response> response;
@ -70,9 +72,9 @@ void HttpConnectionHandler::Task::run(){
response = HttpProcessor::processRequest(m_router, inStream, m_bodyDecoder, m_errorHandler, m_requestInterceptors, connectionState);
if(response) {
outStream->setBufferPosition(0, 0, false);
response->send(outStream.get());
outStream->flush();
outStream.setBufferPosition(0, 0, false);
response->send(&outStream);
outStream.flush();
} else {
return;
}

View File

@ -41,7 +41,8 @@ HttpProcessor::processRequest(HttpRouter* router,
{
const v_int32 bufferSize = 2048;
v_char8 buffer [bufferSize];
RequestHeadersReader headersReader(buffer, bufferSize, 4096);
oatpp::data::share::MemoryLabel bufferLabel(nullptr, buffer, bufferSize);
RequestHeadersReader headersReader(bufferLabel, 4096);
headersReadResult = headersReader.readHeaders(inStream.get(), error);
}
@ -129,7 +130,7 @@ oatpp::async::Action HttpProcessor::Coroutine::onHeadersParsed(const RequestHead
}
HttpProcessor::Coroutine::Action HttpProcessor::Coroutine::act() {
RequestHeadersReader headersReader(m_headerReaderBuffer->getData(), m_headerReaderBuffer->getSize(), 4096);
RequestHeadersReader headersReader(m_headerReaderBuffer, 4096);
return headersReader.readHeadersAsync(m_inStream).callbackTo(&HttpProcessor::Coroutine::onHeadersParsed);
}
@ -143,12 +144,12 @@ HttpProcessor::Coroutine::Action HttpProcessor::Coroutine::onResponse(const std:
}
HttpProcessor::Coroutine::Action HttpProcessor::Coroutine::onResponseFormed() {
m_currentResponse->putHeaderIfNotExists(protocol::http::Header::SERVER, protocol::http::Header::Value::SERVER);
m_connectionState = oatpp::web::protocol::http::outgoing::CommunicationUtils::considerConnectionState(m_currentRequest, m_currentResponse);
m_outStream->setBufferPosition(0, 0, false);
return m_currentResponse->sendAsync(m_outStream).next(m_outStream->flushAsync()).next(yieldTo(&HttpProcessor::Coroutine::onRequestDone));
}
HttpProcessor::Coroutine::Action HttpProcessor::Coroutine::onRequestDone() {

View File

@ -76,7 +76,7 @@ public:
std::shared_ptr<oatpp::data::stream::OutputStreamBufferedProxy> m_outStream;
v_int32 m_connectionState;
private:
oatpp::String m_headerReaderBuffer;
oatpp::data::share::MemoryLabel m_headerReaderBuffer;
oatpp::web::server::HttpRouter::BranchRouter::Route m_currentRoute;
std::shared_ptr<protocol::http::incoming::Request> m_currentRequest;
std::shared_ptr<protocol::http::outgoing::Response> m_currentResponse;
@ -97,7 +97,7 @@ public:
, m_inStream(inStream)
, m_outStream(outStream)
, m_connectionState(oatpp::web::protocol::http::outgoing::CommunicationUtils::CONNECTION_STATE_KEEP_ALIVE)
, m_headerReaderBuffer(2048)
, m_headerReaderBuffer(oatpp::base::StrBuffer::createShared(2048))
{}
Action act() override;