Feature http-pipelining. Simple API support.

This commit is contained in:
lganzzzo 2019-09-19 03:00:26 +03:00
parent 73d9523a5a
commit 125e18609e
13 changed files with 435 additions and 32 deletions

View File

@ -116,6 +116,93 @@ data::v_io_size FIFOBuffer::read(void *data, data::v_io_size count) {
}
data::v_io_size FIFOBuffer::peek(void *data, data::v_io_size count) {
if(!m_canRead) {
return data::IOError::WAIT_RETRY;
}
if(count == 0) {
return 0;
} else if(count < 0) {
throw std::runtime_error("[oatpp::data::buffer::FIFOBuffer::peek(...)]: count < 0");
}
if(m_readPosition < m_writePosition) {
auto size = m_writePosition - m_readPosition;
if(size > count) {
size = count;
}
std::memcpy(data, &m_buffer[m_readPosition], (size_t)size);
return size;
}
auto tmpReadPos = m_readPosition;
auto size = m_bufferSize - tmpReadPos;
if(size > count){
std::memcpy(data, &m_buffer[tmpReadPos], (size_t)count);
tmpReadPos += count;
return count;
}
std::memcpy(data, &m_buffer[tmpReadPos], (size_t)size);
auto size2 = m_writePosition;
if(size2 > count - size) {
size2 = count - size;
}
std::memcpy(&((p_char8) data)[size], m_buffer, (size_t)size2);
return (size + size2);
}
data::v_io_size FIFOBuffer::commitReadOffset(data::v_io_size count) {
if(!m_canRead) {
return data::IOError::WAIT_RETRY;
}
if(count == 0) {
return 0;
} else if(count < 0) {
throw std::runtime_error("[oatpp::data::buffer::FIFOBuffer::commitReadOffset(...)]: count < 0");
}
if(m_readPosition < m_writePosition) {
auto size = m_writePosition - m_readPosition;
if(size > count) {
size = count;
}
m_readPosition += size;
if(m_readPosition == m_writePosition) {
m_canRead = false;
}
return size;
}
auto size = m_bufferSize - m_readPosition;
if(size > count){
m_readPosition += count;
return count;
}
auto size2 = m_writePosition;
if(size2 > count - size) {
size2 = count - size;
}
m_readPosition = size2;
if(m_readPosition == m_writePosition) {
m_canRead = false;
}
return (size + size2);
}
data::v_io_size FIFOBuffer::write(const void *data, data::v_io_size count) {
if(m_canRead && m_writePosition == m_readPosition) {

View File

@ -93,6 +93,21 @@ public:
*/
data::v_io_size read(void *data, data::v_io_size count);
/**
* Peek up to count of bytes int he buffer
* @param data
* @param count
* @return [1..count], IOErrors.
*/
data::v_io_size peek(void *data, data::v_io_size count);
/**
* Commit read offset
* @param count
* @return [1..count], IOErrors.
*/
data::v_io_size commitReadOffset(data::v_io_size count);
/**
* write up to count bytes from data to buffer
* @param data

View File

@ -72,6 +72,24 @@ data::v_io_size InputStreamBufferedProxy::read(void *data, data::v_io_size count
}
data::v_io_size InputStreamBufferedProxy::peek(void *data, data::v_io_size count) {
if(m_buffer.availableToRead() > 0) {
return m_buffer.peek(data, count);
} else {
auto bytesBuffered = m_buffer.readFromStreamAndWrite(m_inputStream.get(), m_buffer.getBufferSize());
if(bytesBuffered > 0) {
return m_buffer.peek(data, count);
}
return bytesBuffered;
}
}
data::v_io_size InputStreamBufferedProxy::commitReadOffset(data::v_io_size count) {
return m_buffer.commitReadOffset(count);
}
oatpp::async::Action InputStreamBufferedProxy::suggestInputStreamAction(data::v_io_size ioResult) {
return m_inputStream->suggestInputStreamAction(ioResult);
}

View File

@ -175,6 +175,10 @@ public:
data::v_io_size read(void *data, data::v_io_size count) override;
data::v_io_size peek(void *data, data::v_io_size count);
data::v_io_size commitReadOffset(data::v_io_size count);
oatpp::async::Action suggestInputStreamAction(data::v_io_size ioResult) override;
/**

View File

@ -28,8 +28,8 @@
namespace oatpp { namespace web { namespace protocol { namespace http { namespace incoming {
data::v_io_size RequestHeadersReader::readHeadersSection(const std::shared_ptr<oatpp::data::stream::IOStream>& connection,
oatpp::data::stream::OutputStream* bufferStream,
data::v_io_size RequestHeadersReader::readHeadersSection(data::stream::InputStreamBufferedProxy* stream,
oatpp::data::stream::ConsistentOutputStream* bufferStream,
Result& result) {
v_word32 accumulator = 0;
@ -45,7 +45,7 @@ data::v_io_size RequestHeadersReader::readHeadersSection(const std::shared_ptr<o
}
}
res = connection->read(m_buffer, desiredToRead);
res = stream->peek(m_buffer, desiredToRead);
if(res > 0) {
bufferStream->write(m_buffer, res);
@ -55,9 +55,12 @@ data::v_io_size RequestHeadersReader::readHeadersSection(const std::shared_ptr<o
if(accumulator == SECTION_END) {
result.bufferPosStart = i + 1;
result.bufferPosEnd = (v_int32) res;
stream->commitReadOffset(i + 1);
return res;
}
}
stream->commitReadOffset(res);
} else if(res == data::IOError::WAIT_RETRY || res == data::IOError::RETRY) {
continue;
@ -71,13 +74,13 @@ data::v_io_size RequestHeadersReader::readHeadersSection(const std::shared_ptr<o
}
RequestHeadersReader::Result RequestHeadersReader::readHeaders(const std::shared_ptr<oatpp::data::stream::IOStream>& connection,
RequestHeadersReader::Result RequestHeadersReader::readHeaders(data::stream::InputStreamBufferedProxy* stream,
http::HttpError::Info& error) {
RequestHeadersReader::Result result;
oatpp::data::stream::ChunkedBuffer buffer;
error.ioStatus = readHeadersSection(connection, &buffer, result);
error.ioStatus = readHeadersSection(stream, &buffer, result);
if(error.ioStatus > 0) {
auto headersText = buffer.toString();

View File

@ -27,6 +27,7 @@
#include "oatpp/web/protocol/http/Http.hpp"
#include "oatpp/core/async/Coroutine.hpp"
#include "oatpp/core/data/stream/StreamBufferedProxy.hpp"
namespace oatpp { namespace web { namespace protocol { namespace http { namespace incoming {
@ -69,8 +70,8 @@ public:
};
private:
data::v_io_size readHeadersSection(const std::shared_ptr<oatpp::data::stream::IOStream>& connection,
oatpp::data::stream::OutputStream* bufferStream,
data::v_io_size readHeadersSection(data::stream::InputStreamBufferedProxy* stream,
oatpp::data::stream::ConsistentOutputStream* bufferStream,
Result& result);
private:
p_char8 m_buffer;
@ -92,15 +93,16 @@ public:
/**
* Read and parse http headers from stream.
* @param connection - `std::shared_ptr` to &id:oatpp::data::stream::IOStream;.
* @param stream - &id:oatpp::data::stream::InputStreamBufferedProxy;.
* @param error - out parameter &id:oatpp::web::protocol::ProtocolError::Info;.
* @return - &l:RequestHeadersReader::Result;.
*/
Result readHeaders(const std::shared_ptr<oatpp::data::stream::IOStream>& connection, http::HttpError::Info& error);
Result readHeaders(data::stream::InputStreamBufferedProxy* stream,
http::HttpError::Info& error);
/**
* Read and parse http headers from stream in asynchronous manner.
* @param connection - `std::shared_ptr` to &id:oatpp::data::stream::IOStream;.
* @param stream - `std::shared_ptr` to &id:oatpp::data::stream::InputStreamBufferedProxy;.
* @return - &id:oatpp::async::CoroutineStarterForResult;.
*/
oatpp::async::CoroutineStarterForResult<const RequestHeadersReader::Result&> readHeadersAsync(const std::shared_ptr<oatpp::data::stream::IOStream>& connection);

View File

@ -57,16 +57,17 @@ HttpConnectionHandler::Task::createShared(HttpRouter* router,
void HttpConnectionHandler::Task::run(){
const v_int32 bufferSize = oatpp::data::buffer::IOBuffer::BUFFER_SIZE;
v_char8 buffer [bufferSize];
auto outStream = oatpp::data::stream::OutputStreamBufferedProxy::createShared(m_connection, buffer, bufferSize);
auto inStream = oatpp::data::stream::InputStreamBufferedProxy::createShared(m_connection, buffer, bufferSize);
v_char8 inBuffer [bufferSize];
v_char8 outBuffer [bufferSize];
auto inStream = oatpp::data::stream::InputStreamBufferedProxy::createShared(m_connection, inBuffer, bufferSize);
auto outStream = oatpp::data::stream::OutputStreamBufferedProxy::createShared(m_connection, outBuffer, bufferSize);
v_int32 connectionState = oatpp::web::protocol::http::outgoing::CommunicationUtils::CONNECTION_STATE_CLOSE;
std::shared_ptr<oatpp::web::protocol::http::outgoing::Response> response;
do {
response = HttpProcessor::processRequest(m_router, m_connection, m_bodyDecoder, m_errorHandler, m_requestInterceptors, buffer, bufferSize, inStream, connectionState);
response = HttpProcessor::processRequest(m_router, inStream, m_bodyDecoder, m_errorHandler, m_requestInterceptors, connectionState);
if(response) {
outStream->setBufferPosition(0, 0, false);

View File

@ -28,18 +28,22 @@ namespace oatpp { namespace web { namespace server {
std::shared_ptr<protocol::http::outgoing::Response>
HttpProcessor::processRequest(HttpRouter* router,
const std::shared_ptr<oatpp::data::stream::IOStream>& connection,
const std::shared_ptr<oatpp::data::stream::InputStreamBufferedProxy>& inStream,
const std::shared_ptr<const oatpp::web::protocol::http::incoming::BodyDecoder>& bodyDecoder,
const std::shared_ptr<handler::ErrorHandler>& errorHandler,
RequestInterceptors* requestInterceptors,
void* buffer,
v_int32 bufferSize,
const std::shared_ptr<oatpp::data::stream::InputStreamBufferedProxy>& inStream,
v_int32& connectionState) {
RequestHeadersReader headersReader(buffer, bufferSize, 4096);
RequestHeadersReader::Result headersReadResult;
oatpp::web::protocol::http::HttpError::Info error;
auto headersReadResult = headersReader.readHeaders(connection, error);
{
const v_int32 bufferSize = 2048;
v_char8 buffer [bufferSize];
RequestHeadersReader headersReader(buffer, bufferSize, 4096);
headersReadResult = headersReader.readHeaders(inStream.get(), error);
}
if(error.status.code != 0) {
connectionState = oatpp::web::protocol::http::outgoing::CommunicationUtils::CONNECTION_STATE_CLOSE;
@ -59,14 +63,11 @@ HttpProcessor::processRequest(HttpRouter* router,
}
auto& bodyStream = inStream;
bodyStream->setBufferPosition(headersReadResult.bufferPosStart,
headersReadResult.bufferPosEnd,
headersReadResult.bufferPosStart != headersReadResult.bufferPosEnd);
auto request = protocol::http::incoming::Request::createShared(headersReadResult.startingLine,
route.matchMap,
headersReadResult.headers,
bodyStream,
inStream,
bodyDecoder);
std::shared_ptr<protocol::http::outgoing::Response> response;

View File

@ -118,13 +118,10 @@ public:
static std::shared_ptr<protocol::http::outgoing::Response>
processRequest(HttpRouter* router,
const std::shared_ptr<oatpp::data::stream::IOStream>& connection,
const std::shared_ptr<oatpp::data::stream::InputStreamBufferedProxy>& inStream,
const std::shared_ptr<const oatpp::web::protocol::http::incoming::BodyDecoder>& bodyDecoder,
const std::shared_ptr<handler::ErrorHandler>& errorHandler,
RequestInterceptors* requestInterceptors,
void* buffer,
v_int32 bufferSize,
const std::shared_ptr<oatpp::data::stream::InputStreamBufferedProxy>& inStream,
v_int32& connectionState);
};

View File

@ -43,6 +43,8 @@ add_executable(oatppAllTests
oatpp/web/server/api/ApiControllerTest.hpp
oatpp/web/server/handler/AuthorizationHandlerTest.cpp
oatpp/web/server/handler/AuthorizationHandlerTest.hpp
oatpp/web/PipelineTest.cpp
oatpp/web/PipelineTest.hpp
oatpp/web/FullAsyncTest.cpp
oatpp/web/FullAsyncTest.hpp
oatpp/web/FullTest.cpp

View File

@ -2,6 +2,7 @@
#include "oatpp/web/FullTest.hpp"
#include "oatpp/web/FullAsyncTest.hpp"
#include "oatpp/web/FullAsyncClientTest.hpp"
#include "oatpp/web/PipelineTest.hpp"
#include "oatpp/web/server/api/ApiControllerTest.hpp"
#include "oatpp/web/server/handler/AuthorizationHandlerTest.hpp"
@ -84,7 +85,18 @@ void runTests() {
OATPP_RUN_TEST(oatpp::test::web::server::api::ApiControllerTest);
OATPP_RUN_TEST(oatpp::test::web::server::handler::AuthorizationHandlerTest);
*/
{
oatpp::test::web::PipelineTest test_virtual(0, 3000);
test_virtual.run();
oatpp::test::web::PipelineTest test_port(8000, 3000);
test_port.run();
}
/*
{
oatpp::test::web::FullTest test_virtual(0, 1000);
@ -94,7 +106,7 @@ void runTests() {
test_port.run();
}
*/
{
oatpp::test::web::FullAsyncTest test_virtual(0, 1000);
@ -114,7 +126,7 @@ void runTests() {
test_port.run(1);
}
*/
}
}

View File

@ -0,0 +1,211 @@
/***************************************************************************
*
* Project _____ __ ____ _ _
* ( _ ) /__\ (_ _)_| |_ _| |_
* )(_)( /(__)\ )( (_ _)(_ _)
* (_____)(__)(__)(__) |_| |_|
*
*
* Copyright 2018-present, Leonid Stryzhevskyi <lganzzzo@gmail.com>
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
***************************************************************************/
#include "PipelineTest.hpp"
#include "oatpp/web/app/Controller.hpp"
#include "oatpp/web/server/HttpConnectionHandler.hpp"
#include "oatpp/web/server/HttpRouter.hpp"
#include "oatpp/parser/json/mapping/ObjectMapper.hpp"
#include "oatpp/network/server/SimpleTCPConnectionProvider.hpp"
#include "oatpp/network/client/SimpleTCPConnectionProvider.hpp"
#include "oatpp/network/virtual_/client/ConnectionProvider.hpp"
#include "oatpp/network/virtual_/server/ConnectionProvider.hpp"
#include "oatpp/network/virtual_/Interface.hpp"
#include "oatpp/core/data/stream/BufferInputStream.hpp"
#include "oatpp/core/macro/component.hpp"
#include "oatpp-test/web/ClientServerTestRunner.hpp"
namespace oatpp { namespace test { namespace web {
namespace {
class TestComponent {
private:
v_int32 m_port;
public:
TestComponent(v_int32 port)
: m_port(port)
{}
OATPP_CREATE_COMPONENT(std::shared_ptr<oatpp::network::virtual_::Interface>, virtualInterface)([] {
return oatpp::network::virtual_::Interface::createShared("virtualhost");
}());
OATPP_CREATE_COMPONENT(std::shared_ptr<oatpp::network::ServerConnectionProvider>, serverConnectionProvider)([this] {
if(m_port == 0) { // Use oatpp virtual interface
OATPP_COMPONENT(std::shared_ptr<oatpp::network::virtual_::Interface>, interface);
return std::static_pointer_cast<oatpp::network::ServerConnectionProvider>(
oatpp::network::virtual_::server::ConnectionProvider::createShared(interface)
);
}
return std::static_pointer_cast<oatpp::network::ServerConnectionProvider>(
oatpp::network::server::SimpleTCPConnectionProvider::createShared(m_port)
);
}());
OATPP_CREATE_COMPONENT(std::shared_ptr<oatpp::web::server::HttpRouter>, httpRouter)([] {
return oatpp::web::server::HttpRouter::createShared();
}());
OATPP_CREATE_COMPONENT(std::shared_ptr<oatpp::network::server::ConnectionHandler>, serverConnectionHandler)([] {
OATPP_COMPONENT(std::shared_ptr<oatpp::web::server::HttpRouter>, router);
return oatpp::web::server::HttpConnectionHandler::createShared(router);
}());
OATPP_CREATE_COMPONENT(std::shared_ptr<oatpp::data::mapping::ObjectMapper>, objectMapper)([] {
return oatpp::parser::json::mapping::ObjectMapper::createShared();
}());
OATPP_CREATE_COMPONENT(std::shared_ptr<oatpp::network::ClientConnectionProvider>, clientConnectionProvider)([this] {
if(m_port == 0) {
OATPP_COMPONENT(std::shared_ptr<oatpp::network::virtual_::Interface>, interface);
return std::static_pointer_cast<oatpp::network::ClientConnectionProvider>(
oatpp::network::virtual_::client::ConnectionProvider::createShared(interface)
);
}
return std::static_pointer_cast<oatpp::network::ClientConnectionProvider>(
oatpp::network::client::SimpleTCPConnectionProvider::createShared("127.0.0.1", m_port)
);
}());
};
const char* const SAMPLE_IN =
"GET / HTTP/1.1\r\n"
"Connection: keep-alive\r\n"
"Content-Length: 0\r\n"
"\r\n";
const char* const SAMPLE_OUT =
"HTTP/1.1 200 OK\r\n"
"Content-Length: 14\r\n"
"Connection: keep-alive\r\n"
"Server: oatpp/0.19.8\r\n"
"\r\n"
"Hello World!!!";
}
void PipelineTest::onRun() {
TestComponent component(m_port);
oatpp::test::web::ClientServerTestRunner runner;
runner.addController(app::Controller::createShared());
runner.run([this, &runner] {
OATPP_COMPONENT(std::shared_ptr<oatpp::network::ClientConnectionProvider>, clientConnectionProvider);
auto connection = clientConnectionProvider->getConnection();
std::thread pipeInThread([this, connection] {
oatpp::data::stream::ChunkedBuffer pipelineStream;
for (v_int32 i = 0; i < m_pipelineSize; i++) {
pipelineStream << SAMPLE_IN;
}
oatpp::data::stream::BufferInputStream inputStream(pipelineStream.toString());
oatpp::data::stream::DefaultWriteCallback writeCallback(connection.get());
oatpp::data::buffer::IOBuffer ioBuffer;
oatpp::data::stream::transfer(&inputStream, &writeCallback, 0, ioBuffer.getData(), ioBuffer.getSize());
});
std::thread pipeOutThread([this, connection] {
oatpp::data::stream::ChunkedBuffer pipelineStream;
for (v_int32 i = 0; i < m_pipelineSize; i++) {
pipelineStream << SAMPLE_OUT;
}
oatpp::data::stream::ChunkedBuffer receiveStream;
oatpp::data::buffer::IOBuffer ioBuffer;
v_int32 retries = 0;
while(true) {
auto res = connection->read(ioBuffer.getData(), ioBuffer.getSize());
if(res > 0) {
retries = 0;
receiveStream.write(ioBuffer.getData(), res);
if(receiveStream.getSize() >= pipelineStream.getSize()) {
break;
}
} else {
retries ++;
if(retries == 10) {
break;
}
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
}
auto result = receiveStream.toString();
OATPP_ASSERT(result == pipelineStream.toString());
});
pipeOutThread.join();
pipeInThread.join();
///////////////////////////////////////////////////////////////////////////////////////////////////////
// Stop server and unblock accepting thread
runner.getServer()->stop();
OATPP_COMPONENT(std::shared_ptr<oatpp::network::ClientConnectionProvider>, connectionProvider);
connectionProvider->getConnection();
///////////////////////////////////////////////////////////////////////////////////////////////////////
}, std::chrono::minutes(10));
std::this_thread::sleep_for(std::chrono::seconds(1));
}
}}}

View File

@ -0,0 +1,50 @@
/***************************************************************************
*
* Project _____ __ ____ _ _
* ( _ ) /__\ (_ _)_| |_ _| |_
* )(_)( /(__)\ )( (_ _)(_ _)
* (_____)(__)(__)(__) |_| |_|
*
*
* Copyright 2018-present, Leonid Stryzhevskyi <lganzzzo@gmail.com>
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
***************************************************************************/
#ifndef oatpp_test_web_PipelineTest_hpp
#define oatpp_test_web_PipelineTest_hpp
#include "oatpp-test/UnitTest.hpp"
namespace oatpp { namespace test { namespace web {
class PipelineTest : public UnitTest {
private:
v_int32 m_port;
v_int32 m_pipelineSize;
public:
PipelineTest(v_int32 port, v_int32 pipelineSize)
: UnitTest("TEST[web::PipelineTest]")
, m_port(port)
, m_pipelineSize(pipelineSize)
{}
void onRun() override;
};
}}}
#endif // oatpp_test_web_PipelineTest_hpp