Feature http-pipelining. Async API support.

This commit is contained in:
lganzzzo 2019-09-19 03:54:44 +03:00
parent 125e18609e
commit b77039dee0
11 changed files with 328 additions and 46 deletions

View File

@ -53,8 +53,6 @@ data::v_io_size RequestHeadersReader::readHeadersSection(data::stream::InputStre
accumulator <<= 8;
accumulator |= m_buffer[i];
if(accumulator == SECTION_END) {
result.bufferPosStart = i + 1;
result.bufferPosEnd = (v_int32) res;
stream->commitReadOffset(i + 1);
return res;
}
@ -98,12 +96,12 @@ RequestHeadersReader::Result RequestHeadersReader::readHeaders(data::stream::Inp
oatpp::async::CoroutineStarterForResult<const RequestHeadersReader::Result&>
RequestHeadersReader::readHeadersAsync(const std::shared_ptr<oatpp::data::stream::IOStream>& connection)
RequestHeadersReader::readHeadersAsync(const std::shared_ptr<data::stream::InputStreamBufferedProxy>& connection)
{
class ReaderCoroutine : public oatpp::async::CoroutineWithResult<ReaderCoroutine, const Result&> {
private:
std::shared_ptr<oatpp::data::stream::IOStream> m_connection;
std::shared_ptr<data::stream::InputStreamBufferedProxy> m_stream;
p_char8 m_buffer;
v_int32 m_bufferSize;
v_int32 m_maxHeadersSize;
@ -113,9 +111,9 @@ RequestHeadersReader::readHeadersAsync(const std::shared_ptr<oatpp::data::stream
oatpp::data::stream::ChunkedBuffer m_bufferStream;
public:
ReaderCoroutine(const std::shared_ptr<oatpp::data::stream::IOStream>& connection,
ReaderCoroutine(const std::shared_ptr<data::stream::InputStreamBufferedProxy>& stream,
p_char8 buffer, v_int32 bufferSize, v_int32 maxHeadersSize)
: m_connection(connection)
: m_stream(stream)
, m_buffer(buffer)
, m_bufferSize(bufferSize)
, m_maxHeadersSize(maxHeadersSize)
@ -133,7 +131,7 @@ RequestHeadersReader::readHeadersAsync(const std::shared_ptr<oatpp::data::stream
}
}
auto res = m_connection->read(m_buffer, desiredToRead);
auto res = m_stream->peek(m_buffer, desiredToRead);
if(res > 0) {
m_bufferStream.write(m_buffer, res);
m_progress += res;
@ -142,16 +140,17 @@ RequestHeadersReader::readHeadersAsync(const std::shared_ptr<oatpp::data::stream
m_accumulator <<= 8;
m_accumulator |= m_buffer[i];
if(m_accumulator == SECTION_END) {
m_result.bufferPosStart = i + 1;
m_result.bufferPosEnd = (v_int32) res;
m_stream->commitReadOffset(i + 1);
return yieldTo(&ReaderCoroutine::parseHeaders);
}
}
m_stream->commitReadOffset(res);
return m_connection->suggestInputStreamAction(res);
return m_stream->suggestInputStreamAction(res);
} else if(res == data::IOError::WAIT_RETRY || res == data::IOError::RETRY) {
return m_connection->suggestInputStreamAction(res);
return m_stream->suggestInputStreamAction(res);
} else if(res == data::IOError::BROKEN_PIPE){
return error(oatpp::data::AsyncIOError::ERROR_BROKEN_PIPE);
} else if(res == data::IOError::ZERO_VALUE){

View File

@ -58,15 +58,6 @@ public:
*/
http::Headers headers;
/**
* This value represents starting position in buffer used to read data from stream for the last read operation.
*/
v_int32 bufferPosStart;
/**
* This value represents end position in buffer used to read data from stream for the last read operation.
*/
v_int32 bufferPosEnd;
};
private:
@ -105,7 +96,7 @@ public:
* @param stream - `std::shared_ptr` to &id:oatpp::data::stream::InputStreamBufferedProxy;.
* @return - &id:oatpp::async::CoroutineStarterForResult;.
*/
oatpp::async::CoroutineStarterForResult<const RequestHeadersReader::Result&> readHeadersAsync(const std::shared_ptr<oatpp::data::stream::IOStream>& connection);
oatpp::async::CoroutineStarterForResult<const RequestHeadersReader::Result&> readHeadersAsync(const std::shared_ptr<data::stream::InputStreamBufferedProxy>& connection);
};

View File

@ -72,18 +72,19 @@ void AsyncHttpConnectionHandler::handleConnection(const std::shared_ptr<IOStream
connection->setOutputStreamIOMode(oatpp::data::stream::IOMode::NON_BLOCKING);
connection->setInputStreamIOMode(oatpp::data::stream::IOMode::NON_BLOCKING);
auto ioBuffer = oatpp::data::buffer::IOBuffer::createShared();
auto outStream = oatpp::data::stream::OutputStreamBufferedProxy::createShared(connection, ioBuffer);
auto inStream = oatpp::data::stream::InputStreamBufferedProxy::createShared(connection, ioBuffer);
auto inBuffer = oatpp::data::buffer::IOBuffer::createShared();
auto outBuffer = oatpp::data::buffer::IOBuffer::createShared();
auto inStream = oatpp::data::stream::InputStreamBufferedProxy::createShared(connection, inBuffer);
auto outStream = oatpp::data::stream::OutputStreamBufferedProxy::createShared(connection, outBuffer);
m_executor->execute<HttpProcessor::Coroutine>(m_router.get(),
m_bodyDecoder,
m_errorHandler,
&m_requestInterceptors,
connection,
ioBuffer,
outStream,
inStream);
inStream,
outStream);
}

View File

@ -109,15 +109,10 @@ oatpp::async::Action HttpProcessor::Coroutine::onHeadersParsed(const RequestHead
return yieldTo(&HttpProcessor::Coroutine::onResponseFormed);
}
auto& bodyStream = m_inStream;
bodyStream->setBufferPosition(headersReadResult.bufferPosStart,
headersReadResult.bufferPosEnd,
headersReadResult.bufferPosStart != headersReadResult.bufferPosEnd);
m_currentRequest = protocol::http::incoming::Request::createShared(headersReadResult.startingLine,
m_currentRoute.matchMap,
headersReadResult.headers,
bodyStream,
m_inStream,
m_bodyDecoder);
auto currInterceptor = m_requestInterceptors->getFirstNode();
@ -134,8 +129,8 @@ oatpp::async::Action HttpProcessor::Coroutine::onHeadersParsed(const RequestHead
}
HttpProcessor::Coroutine::Action HttpProcessor::Coroutine::act() {
RequestHeadersReader headersReader(m_ioBuffer->getData(), m_ioBuffer->getSize(), 4096);
return headersReader.readHeadersAsync(m_connection).callbackTo(&HttpProcessor::Coroutine::onHeadersParsed);
RequestHeadersReader headersReader(m_headerReaderBuffer->getData(), m_headerReaderBuffer->getSize(), 4096);
return headersReader.readHeadersAsync(m_inStream).callbackTo(&HttpProcessor::Coroutine::onHeadersParsed);
}
HttpProcessor::Coroutine::Action HttpProcessor::Coroutine::onRequestFormed() {

View File

@ -72,11 +72,11 @@ public:
std::shared_ptr<handler::ErrorHandler> m_errorHandler;
RequestInterceptors* m_requestInterceptors;
std::shared_ptr<oatpp::data::stream::IOStream> m_connection;
std::shared_ptr<oatpp::data::buffer::IOBuffer> m_ioBuffer;
std::shared_ptr<oatpp::data::stream::OutputStreamBufferedProxy> m_outStream;
std::shared_ptr<oatpp::data::stream::InputStreamBufferedProxy> m_inStream;
std::shared_ptr<oatpp::data::stream::OutputStreamBufferedProxy> m_outStream;
v_int32 m_connectionState;
private:
oatpp::String m_headerReaderBuffer;
oatpp::web::server::HttpRouter::BranchRouter::Route m_currentRoute;
std::shared_ptr<protocol::http::incoming::Request> m_currentRequest;
std::shared_ptr<protocol::http::outgoing::Response> m_currentResponse;
@ -87,18 +87,17 @@ public:
const std::shared_ptr<handler::ErrorHandler>& errorHandler,
RequestInterceptors* requestInterceptors,
const std::shared_ptr<oatpp::data::stream::IOStream>& connection,
const std::shared_ptr<oatpp::data::buffer::IOBuffer>& ioBuffer,
const std::shared_ptr<oatpp::data::stream::OutputStreamBufferedProxy>& outStream,
const std::shared_ptr<oatpp::data::stream::InputStreamBufferedProxy>& inStream)
const std::shared_ptr<oatpp::data::stream::InputStreamBufferedProxy>& inStream,
const std::shared_ptr<oatpp::data::stream::OutputStreamBufferedProxy>& outStream)
: m_router(router)
, m_bodyDecoder(bodyDecoder)
, m_errorHandler(errorHandler)
, m_requestInterceptors(requestInterceptors)
, m_connection(connection)
, m_ioBuffer(ioBuffer)
, m_outStream(outStream)
, m_inStream(inStream)
, m_outStream(outStream)
, m_connectionState(oatpp::web::protocol::http::outgoing::CommunicationUtils::CONNECTION_STATE_KEEP_ALIVE)
, m_headerReaderBuffer(2048)
{}
Action act() override;

View File

@ -45,6 +45,8 @@ add_executable(oatppAllTests
oatpp/web/server/handler/AuthorizationHandlerTest.hpp
oatpp/web/PipelineTest.cpp
oatpp/web/PipelineTest.hpp
oatpp/web/PipelineAsyncTest.cpp
oatpp/web/PipelineAsyncTest.hpp
oatpp/web/FullAsyncTest.cpp
oatpp/web/FullAsyncTest.hpp
oatpp/web/FullTest.cpp

View File

@ -2,7 +2,10 @@
#include "oatpp/web/FullTest.hpp"
#include "oatpp/web/FullAsyncTest.hpp"
#include "oatpp/web/FullAsyncClientTest.hpp"
#include "oatpp/web/PipelineTest.hpp"
#include "oatpp/web/PipelineAsyncTest.hpp"
#include "oatpp/web/server/api/ApiControllerTest.hpp"
#include "oatpp/web/server/handler/AuthorizationHandlerTest.hpp"
@ -87,15 +90,26 @@ void runTests() {
OATPP_RUN_TEST(oatpp::test::web::server::handler::AuthorizationHandlerTest);
*/
// {
//
// oatpp::test::web::PipelineTest test_virtual(0, 3000);
// test_virtual.run();
//
// oatpp::test::web::PipelineTest test_port(8000, 3000);
// test_port.run();
//
// }
{
oatpp::test::web::PipelineTest test_virtual(0, 3000);
oatpp::test::web::PipelineAsyncTest test_virtual(0, 3000);
test_virtual.run();
oatpp::test::web::PipelineTest test_port(8000, 3000);
oatpp::test::web::PipelineAsyncTest test_port(8000, 3000);
test_port.run();
}
/*
{

View File

@ -0,0 +1,227 @@
/***************************************************************************
*
* Project _____ __ ____ _ _
* ( _ ) /__\ (_ _)_| |_ _| |_
* )(_)( /(__)\ )( (_ _)(_ _)
* (_____)(__)(__)(__) |_| |_|
*
*
* Copyright 2018-present, Leonid Stryzhevskyi <lganzzzo@gmail.com>
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
***************************************************************************/
#include "PipelineAsyncTest.hpp"
#include "oatpp/web/app/ControllerAsync.hpp"
#include "oatpp/web/server/AsyncHttpConnectionHandler.hpp"
#include "oatpp/web/server/HttpRouter.hpp"
#include "oatpp/parser/json/mapping/ObjectMapper.hpp"
#include "oatpp/network/server/SimpleTCPConnectionProvider.hpp"
#include "oatpp/network/client/SimpleTCPConnectionProvider.hpp"
#include "oatpp/network/virtual_/client/ConnectionProvider.hpp"
#include "oatpp/network/virtual_/server/ConnectionProvider.hpp"
#include "oatpp/network/virtual_/Interface.hpp"
#include "oatpp/core/data/stream/BufferInputStream.hpp"
#include "oatpp/core/macro/component.hpp"
#include "oatpp-test/web/ClientServerTestRunner.hpp"
namespace oatpp { namespace test { namespace web {
namespace {
class TestComponent {
private:
v_int32 m_port;
public:
TestComponent(v_int32 port)
: m_port(port)
{}
OATPP_CREATE_COMPONENT(std::shared_ptr<oatpp::async::Executor>, executor)([] {
return std::make_shared<oatpp::async::Executor>(1, 1, 1);
}());
OATPP_CREATE_COMPONENT(std::shared_ptr<oatpp::network::virtual_::Interface>, virtualInterface)([] {
return oatpp::network::virtual_::Interface::createShared("virtualhost");
}());
OATPP_CREATE_COMPONENT(std::shared_ptr<oatpp::network::ServerConnectionProvider>, serverConnectionProvider)([this] {
if(m_port == 0) { // Use oatpp virtual interface
OATPP_COMPONENT(std::shared_ptr<oatpp::network::virtual_::Interface>, interface);
return std::static_pointer_cast<oatpp::network::ServerConnectionProvider>(
oatpp::network::virtual_::server::ConnectionProvider::createShared(interface)
);
}
return std::static_pointer_cast<oatpp::network::ServerConnectionProvider>(
oatpp::network::server::SimpleTCPConnectionProvider::createShared(m_port)
);
}());
OATPP_CREATE_COMPONENT(std::shared_ptr<oatpp::web::server::HttpRouter>, httpRouter)([] {
return oatpp::web::server::HttpRouter::createShared();
}());
OATPP_CREATE_COMPONENT(std::shared_ptr<oatpp::network::server::ConnectionHandler>, serverConnectionHandler)([] {
OATPP_COMPONENT(std::shared_ptr<oatpp::web::server::HttpRouter>, router);
OATPP_COMPONENT(std::shared_ptr<oatpp::async::Executor>, executor);
return oatpp::web::server::AsyncHttpConnectionHandler::createShared(router, executor);
}());
OATPP_CREATE_COMPONENT(std::shared_ptr<oatpp::data::mapping::ObjectMapper>, objectMapper)([] {
return oatpp::parser::json::mapping::ObjectMapper::createShared();
}());
OATPP_CREATE_COMPONENT(std::shared_ptr<oatpp::network::ClientConnectionProvider>, clientConnectionProvider)([this] {
if(m_port == 0) {
OATPP_COMPONENT(std::shared_ptr<oatpp::network::virtual_::Interface>, interface);
return std::static_pointer_cast<oatpp::network::ClientConnectionProvider>(
oatpp::network::virtual_::client::ConnectionProvider::createShared(interface)
);
}
return std::static_pointer_cast<oatpp::network::ClientConnectionProvider>(
oatpp::network::client::SimpleTCPConnectionProvider::createShared("127.0.0.1", m_port)
);
}());
};
const char* const SAMPLE_IN =
"GET / HTTP/1.1\r\n"
"Connection: keep-alive\r\n"
"Content-Length: 0\r\n"
"\r\n";
const char* const SAMPLE_OUT =
"HTTP/1.1 200 OK\r\n"
"Content-Length: 20\r\n"
"Connection: keep-alive\r\n"
"Server: oatpp/" OATPP_VERSION "\r\n"
"\r\n"
"Hello World Async!!!";
}
void PipelineAsyncTest::onRun() {
TestComponent component(m_port);
oatpp::test::web::ClientServerTestRunner runner;
runner.addController(app::ControllerAsync::createShared());
runner.run([this, &runner] {
OATPP_COMPONENT(std::shared_ptr<oatpp::network::ClientConnectionProvider>, clientConnectionProvider);
auto connection = clientConnectionProvider->getConnection();
std::thread pipeInThread([this, connection] {
oatpp::data::stream::ChunkedBuffer pipelineStream;
for (v_int32 i = 0; i < m_pipelineSize; i++) {
pipelineStream << SAMPLE_IN;
}
oatpp::data::stream::BufferInputStream inputStream(pipelineStream.toString());
oatpp::data::stream::DefaultWriteCallback writeCallback(connection.get());
oatpp::data::buffer::IOBuffer ioBuffer;
oatpp::data::stream::transfer(&inputStream, &writeCallback, 0, ioBuffer.getData(), ioBuffer.getSize());
});
std::thread pipeOutThread([this, connection] {
connection->setInputStreamIOMode(oatpp::data::stream::IOMode::NON_BLOCKING);
oatpp::data::stream::ChunkedBuffer pipelineStream;
for (v_int32 i = 0; i < m_pipelineSize; i++) {
pipelineStream << SAMPLE_OUT;
}
oatpp::data::stream::ChunkedBuffer receiveStream;
oatpp::data::buffer::IOBuffer ioBuffer;
v_int32 retries = 0;
while(true) {
auto res = connection->read(ioBuffer.getData(), ioBuffer.getSize());
if(res > 0) {
retries = 0;
receiveStream.write(ioBuffer.getData(), res);
if(receiveStream.getSize() >= pipelineStream.getSize()) {
break;
}
} else {
retries ++;
if(retries == 50) {
break;
}
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
}
auto result = receiveStream.toString();
OATPP_ASSERT(result == pipelineStream.toString());
});
pipeOutThread.join();
pipeInThread.join();
///////////////////////////////////////////////////////////////////////////////////////////////////////
// Stop server and unblock accepting thread
connection.reset();
std::this_thread::sleep_for(std::chrono::milliseconds(100));
///////////////////////////////////////////////////////////////////////////////////////////////////////
// Stop server and unblock accepting thread
runner.getServer()->stop();
OATPP_COMPONENT(std::shared_ptr<oatpp::network::ClientConnectionProvider>, connectionProvider);
connectionProvider->getConnection();
///////////////////////////////////////////////////////////////////////////////////////////////////////
}, std::chrono::minutes(10));
OATPP_COMPONENT(std::shared_ptr<oatpp::async::Executor>, executor);
executor->waitTasksFinished();
executor->join();
}
}}}

View File

@ -0,0 +1,50 @@
/***************************************************************************
*
* Project _____ __ ____ _ _
* ( _ ) /__\ (_ _)_| |_ _| |_
* )(_)( /(__)\ )( (_ _)(_ _)
* (_____)(__)(__)(__) |_| |_|
*
*
* Copyright 2018-present, Leonid Stryzhevskyi <lganzzzo@gmail.com>
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
***************************************************************************/
#ifndef oatpp_test_web_PipelineAsyncTest_hpp
#define oatpp_test_web_PipelineAsyncTest_hpp
#include "oatpp-test/UnitTest.hpp"
namespace oatpp { namespace test { namespace web {
class PipelineAsyncTest : public UnitTest {
private:
v_int32 m_port;
v_int32 m_pipelineSize;
public:
PipelineAsyncTest(v_int32 port, v_int32 pipelineSize)
: UnitTest("TEST[web::PipelineAsyncTest]")
, m_port(port)
, m_pipelineSize(pipelineSize)
{}
void onRun() override;
};
}}}
#endif // oatpp_test_web_PipelineAsyncTest_hpp

View File

@ -115,7 +115,7 @@ const char* const SAMPLE_OUT =
"HTTP/1.1 200 OK\r\n"
"Content-Length: 14\r\n"
"Connection: keep-alive\r\n"
"Server: oatpp/0.19.8\r\n"
"Server: oatpp/" OATPP_VERSION "\r\n"
"\r\n"
"Hello World!!!";
@ -154,6 +154,8 @@ void PipelineTest::onRun() {
std::thread pipeOutThread([this, connection] {
connection->setInputStreamIOMode(oatpp::data::stream::IOMode::NON_BLOCKING);
oatpp::data::stream::ChunkedBuffer pipelineStream;
for (v_int32 i = 0; i < m_pipelineSize; i++) {
@ -176,7 +178,7 @@ void PipelineTest::onRun() {
}
} else {
retries ++;
if(retries == 10) {
if(retries == 50) {
break;
}
std::this_thread::sleep_for(std::chrono::milliseconds(100));

View File

@ -37,6 +37,8 @@
#include "oatpp/web/server/api/ApiController.hpp"
#include "oatpp/parser/json/mapping/ObjectMapper.hpp"
#include "oatpp/core/data/stream/Stream.hpp"
#include "oatpp/core/utils/ConversionUtils.hpp"
#include "oatpp/core/macro/codegen.hpp"
#include "oatpp/core/macro/component.hpp"