Merge pull request #360 from oatpp/better_connection_handling

server::HttpProcessor: Refactor connection handling.
This commit is contained in:
Leonid Stryzhevskyi 2020-12-21 01:52:32 +02:00 committed by GitHub
commit 3b0e006614
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 187 additions and 151 deletions

View File

@ -31,29 +31,29 @@ bool CommunicationUtils::headerEqualsCI_FAST(const oatpp::data::share::MemoryLab
return size == headerValue.getSize() && oatpp::base::StrBuffer::equalsCI_FAST(headerValue.getData(), value, size);
}
v_int32 CommunicationUtils::considerConnectionState(const std::shared_ptr<protocol::http::incoming::Request>& request,
const std::shared_ptr<protocol::http::outgoing::Response>& response){
void CommunicationUtils::considerConnectionState(const std::shared_ptr<protocol::http::incoming::Request>& request,
const std::shared_ptr<protocol::http::outgoing::Response>& response,
ConnectionState& connectionState)
{
if(connectionState != ConnectionState::ALIVE) {
return;
}
auto outState = response->getHeaders().getAsMemoryLabel<oatpp::data::share::StringKeyLabelCI_FAST>(Header::CONNECTION);
if(outState && outState == Header::Value::CONNECTION_UPGRADE) {
return CONNECTION_STATE_UPGRADE;
connectionState = ConnectionState::DELEGATED;
return;
}
if(request) {
/* Set keep-alive to value specified in the client's request, if no Connection header present in response. */
/* Set keep-alive to value specified in response otherwise */
/* If the connection header is present in the request and its value isn't keep-alive, then close */
auto connection = request->getHeaders().getAsMemoryLabel<oatpp::data::share::StringKeyLabelCI_FAST>(Header::CONNECTION);
if(connection && connection == Header::Value::CONNECTION_KEEP_ALIVE) {
if(outState) {
if(outState == Header::Value::CONNECTION_KEEP_ALIVE) {
return CONNECTION_STATE_KEEP_ALIVE;
} else {
return CONNECTION_STATE_CLOSE;
}
} else {
response->putHeader(Header::CONNECTION, Header::Value::CONNECTION_KEEP_ALIVE);
return CONNECTION_STATE_KEEP_ALIVE;
if(connection) {
if(connection != Header::Value::CONNECTION_KEEP_ALIVE) {
connectionState = ConnectionState::CLOSING;
}
return;
}
/* If protocol == HTTP/1.1 */
@ -61,35 +61,21 @@ v_int32 CommunicationUtils::considerConnectionState(const std::shared_ptr<protoc
/* Set keep-alive to value specified in response otherwise */
auto& protocol = request->getStartingLine().protocol;
if(protocol && headerEqualsCI_FAST(protocol, "HTTP/1.1")) {
if(outState) {
if(outState == Header::Value::CONNECTION_KEEP_ALIVE) {
return CONNECTION_STATE_KEEP_ALIVE;
} else {
return CONNECTION_STATE_CLOSE;
}
} else {
response->putHeader(Header::CONNECTION, Header::Value::CONNECTION_KEEP_ALIVE);
return CONNECTION_STATE_KEEP_ALIVE;
if(outState && outState != Header::Value::CONNECTION_KEEP_ALIVE) {
connectionState = ConnectionState::CLOSING;
}
return;
}
}
/* If protocol != HTTP/1.1 */
/* Set default Connection header value (Close), if no Connection header present in response. */
/* Set keep-alive to value specified in response otherwise */
if(outState) {
if(outState == Header::Value::CONNECTION_KEEP_ALIVE) {
return CONNECTION_STATE_KEEP_ALIVE;
} else {
return CONNECTION_STATE_CLOSE;
}
} else {
response->putHeader(Header::CONNECTION, Header::Value::CONNECTION_CLOSE);
return CONNECTION_STATE_CLOSE;
if(!outState || outState != Header::Value::CONNECTION_KEEP_ALIVE) {
connectionState = ConnectionState::CLOSING;
}
return CONNECTION_STATE_CLOSE;
return;
}

View File

@ -36,20 +36,14 @@ namespace oatpp { namespace web { namespace protocol { namespace http { namespac
*/
class CommunicationUtils {
public:
/**
* Connection state - close.
*/
static constexpr v_int32 CONNECTION_STATE_CLOSE = 0;
/**
* Connection state - keep alive.
*/
static constexpr v_int32 CONNECTION_STATE_KEEP_ALIVE = 1;
enum class ConnectionState : int {
ALIVE = 0, // Continue processing connection.
DELEGATED = 1, // Stop current connection processing as connection was delegated to other processor.
CLOSING = 2, // Move connection to "closing" pool.
DEAD = 3 // Drop immediately
};
/**
* Connection state - upgrade.
*/
static constexpr v_int32 CONNECTION_STATE_UPGRADE = 2;
private:
static bool headerEqualsCI_FAST(const oatpp::data::share::MemoryLabel& headerValue, const char* value);
public:
@ -57,18 +51,13 @@ public:
/**
* Consider keep connection alive taking into account request headers, response headers and protocol version.<br>
* Corresponding header will be set to response if not existed before. <br>
* return one of (CONNECTION_STATE_CLOSE, CONNECTION_STATE_KEEP_ALIVE, CONNECTION_STATE_UPGRADE).
* @param request - `std::shared_ptr` to &id:oatpp::web::protocol::http::incoming::Request;
* @param response - `std::shared_ptr` to &id:oatpp::web::protocol::http::outgoing::Response;
* @return - one of values:
* <ul>
* <li>&l:CommunicationUtils::CONNECTION_STATE_CLOSE;</li>
* <li>&l:CommunicationUtils::CONNECTION_STATE_KEEP_ALIVE;</li>
* <li>&l:CommunicationUtils::CONNECTION_STATE_UPGRADE;</li>
* </ul>
* @param connectionState
*/
static v_int32 considerConnectionState(const std::shared_ptr<protocol::http::incoming::Request>& request,
const std::shared_ptr<protocol::http::outgoing::Response>& response);
static void considerConnectionState(const std::shared_ptr<protocol::http::incoming::Request>& request,
const std::shared_ptr<protocol::http::outgoing::Response>& response,
ConnectionState& connectionState);
static std::shared_ptr<encoding::EncoderProvider> selectEncoder(const std::shared_ptr<http::incoming::Request>& request,
const std::shared_ptr<http::encoding::ProviderCollection>& providers);

View File

@ -25,6 +25,7 @@
#include "HttpProcessor.hpp"
#include "oatpp/web/protocol/http/incoming/SimpleBodyDecoder.hpp"
#include "oatpp/core/data/stream/BufferStream.hpp"
namespace oatpp { namespace web { namespace server {
@ -80,108 +81,137 @@ HttpProcessor::ProcessingResources::ProcessingResources(const std::shared_ptr<Co
, inStream(data::stream::InputStreamBufferedProxy::createShared(connection, base::StrBuffer::createShared(data::buffer::IOBuffer::BUFFER_SIZE)))
{}
bool HttpProcessor::processNextRequest(ProcessingResources& resources) {
oatpp::web::protocol::http::HttpError::Info error;
auto headersReadResult = resources.headersReader.readHeaders(resources.inStream.get(), error);
if(error.status.code != 0) {
auto response = resources.components->errorHandler->handleError(error.status, "Invalid request headers");
response->send(resources.connection.get(), &resources.headersOutBuffer, nullptr);
return false;
}
if(error.ioStatus <= 0) {
return false; // connection is in invalid state. should be dropped
}
auto request = protocol::http::incoming::Request::createShared(resources.connection,
headersReadResult.startingLine,
headersReadResult.headers,
resources.inStream,
resources.components->bodyDecoder);
std::shared_ptr<protocol::http::outgoing::Response>
HttpProcessor::processNextRequest(ProcessingResources& resources,
const std::shared_ptr<protocol::http::incoming::Request>& request,
ConnectionState& connectionState)
{
std::shared_ptr<protocol::http::outgoing::Response> response;
try{
for(auto interceptor : resources.components->requestInterceptors) {
for(auto& interceptor : resources.components->requestInterceptors) {
response = interceptor->intercept(request);
if(response) {
break;
return response;
}
}
if(!response) {
auto route = resources.components->router->getRoute(request->getStartingLine().method, request->getStartingLine().path);
auto route = resources.components->router->getRoute(headersReadResult.startingLine.method, headersReadResult.startingLine.path);
if(!route) {
if(!route) {
response = resources.components->errorHandler->handleError(protocol::http::Status::CODE_404, "Current url has no mapping");
response->send(resources.connection.get(), &resources.headersOutBuffer, nullptr);
return false;
}
data::stream::BufferOutputStream ss;
ss << "No mapping for HTTP-method: '" << request->getStartingLine().method.toString()
<< "', URL: '" << request->getStartingLine().path.toString() << "'";
request->setPathVariables(route.getMatchMap());
response = route.getEndpoint()->handle(request);
connectionState = ConnectionState::CLOSING;
return resources.components->errorHandler->handleError(protocol::http::Status::CODE_404, ss.toString());
}
for(auto interceptor : resources.components->responseInterceptors) {
response = interceptor->intercept(request, response);
if(!response) {
response = resources.components->errorHandler->handleError(protocol::http::Status::CODE_500, "Invalid Response - 'null'.");
response->send(resources.connection.get(), &resources.headersOutBuffer, nullptr);
return false;
}
}
request->setPathVariables(route.getMatchMap());
return route.getEndpoint()->handle(request);
} catch (oatpp::web::protocol::http::HttpError& error) {
response = resources.components->errorHandler->handleError(error.getInfo().status, error.getMessage(), error.getHeaders());
response->send(resources.connection.get(), &resources.headersOutBuffer, nullptr);
return false;
connectionState = ConnectionState::CLOSING;
} catch (std::exception& error) {
response = resources.components->errorHandler->handleError(protocol::http::Status::CODE_500, error.what());
response->send(resources.connection.get(), &resources.headersOutBuffer, nullptr);
return false;
connectionState = ConnectionState::CLOSING;
} catch (...) {
response = resources.components->errorHandler->handleError(protocol::http::Status::CODE_500, "Unknown error");
response->send(resources.connection.get(), &resources.headersOutBuffer, nullptr);
return false;
response = resources.components->errorHandler->handleError(protocol::http::Status::CODE_500, "Unhandled Error");
connectionState = ConnectionState::CLOSING;
}
response->putHeaderIfNotExists(protocol::http::Header::SERVER, protocol::http::Header::Value::SERVER);
auto connectionState = protocol::http::utils::CommunicationUtils::considerConnectionState(request, response);
return response;
}
HttpProcessor::ConnectionState HttpProcessor::processNextRequest(ProcessingResources& resources) {
oatpp::web::protocol::http::HttpError::Info error;
auto headersReadResult = resources.headersReader.readHeaders(resources.inStream.get(), error);
if(error.ioStatus <= 0) {
return ConnectionState::DEAD;
}
ConnectionState connectionState = ConnectionState::ALIVE;
std::shared_ptr<protocol::http::incoming::Request> request;
std::shared_ptr<protocol::http::outgoing::Response> response;
if(error.status.code != 0) {
response = resources.components->errorHandler->handleError(error.status, "Invalid Request Headers");
connectionState = ConnectionState::CLOSING;
} else {
request = protocol::http::incoming::Request::createShared(resources.connection,
headersReadResult.startingLine,
headersReadResult.headers,
resources.inStream,
resources.components->bodyDecoder);
response = processNextRequest(resources, request, connectionState);
try {
for (auto& interceptor : resources.components->responseInterceptors) {
response = interceptor->intercept(request, response);
if (!response) {
response = resources.components->errorHandler->handleError(
protocol::http::Status::CODE_500,
"Response Interceptor returned an Invalid Response - 'null'"
);
connectionState = ConnectionState::CLOSING;
}
}
} catch (...) {
response = resources.components->errorHandler->handleError(
protocol::http::Status::CODE_500,
"Unhandled Error in Response Interceptor"
);
connectionState = ConnectionState::CLOSING;
}
response->putHeaderIfNotExists(protocol::http::Header::SERVER, protocol::http::Header::Value::SERVER);
protocol::http::utils::CommunicationUtils::considerConnectionState(request, response, connectionState);
switch(connectionState) {
case ConnectionState::ALIVE :
response->putHeaderIfNotExists(protocol::http::Header::CONNECTION, protocol::http::Header::Value::CONNECTION_KEEP_ALIVE);
break;
case ConnectionState::CLOSING:
case ConnectionState::DEAD:
response->putHeaderIfNotExists(protocol::http::Header::CONNECTION, protocol::http::Header::Value::CONNECTION_CLOSE);
break;
case ConnectionState::DELEGATED: {
auto handler = response->getConnectionUpgradeHandler();
if(handler) {
handler->handleConnection(resources.connection, response->getConnectionUpgradeParameters());
connectionState = ConnectionState::DELEGATED;
} else {
OATPP_LOGW("[oatpp::web::server::HttpProcessor::processNextRequest()]", "Warning. ConnectionUpgradeHandler not set!");
connectionState = ConnectionState::CLOSING;
}
break;
}
}
}
auto contentEncoderProvider =
protocol::http::utils::CommunicationUtils::selectEncoder(request, resources.components->contentEncodingProviders);
response->send(resources.connection.get(), &resources.headersOutBuffer, contentEncoderProvider.get());
switch(connectionState) {
case protocol::http::utils::CommunicationUtils::CONNECTION_STATE_KEEP_ALIVE: return true;
case protocol::http::utils::CommunicationUtils::CONNECTION_STATE_UPGRADE: {
auto handler = response->getConnectionUpgradeHandler();
if(handler) {
handler->handleConnection(resources.connection, response->getConnectionUpgradeParameters());
} else {
OATPP_LOGW("[oatpp::web::server::HttpProcessor::processNextRequest()]", "Warning. ConnectionUpgradeHandler not set!");
}
return false;
}
}
return false;
return connectionState;
}
@ -200,15 +230,15 @@ void HttpProcessor::Task::run(){
ProcessingResources resources(m_components, m_connection);
bool wantContinue;
ConnectionState connectionState;
try {
do {
wantContinue = HttpProcessor::processNextRequest(resources);
connectionState = HttpProcessor::processNextRequest(resources);
} while (wantContinue);
} while (connectionState == ConnectionState::ALIVE);
} catch (...) {
// DO NOTHING
@ -227,7 +257,7 @@ HttpProcessor::Coroutine::Coroutine(const std::shared_ptr<Components>& component
, m_headersReader(&m_headersInBuffer, components->config->headersReaderChunkSize, components->config->headersReaderMaxSize)
, m_headersOutBuffer(std::make_shared<oatpp::data::stream::BufferOutputStream>(components->config->headersOutBufferInitial))
, m_inStream(data::stream::InputStreamBufferedProxy::createShared(m_connection, base::StrBuffer::createShared(data::buffer::IOBuffer::BUFFER_SIZE)))
, m_connectionState(oatpp::web::protocol::http::utils::CommunicationUtils::CONNECTION_STATE_KEEP_ALIVE)
, m_connectionState(ConnectionState::ALIVE)
{}
HttpProcessor::Coroutine::Action HttpProcessor::Coroutine::act() {
@ -246,7 +276,7 @@ oatpp::async::Action HttpProcessor::Coroutine::onHeadersParsed(const RequestHead
m_inStream,
m_components->bodyDecoder);
for(auto interceptor : m_components->requestInterceptors) {
for(auto& interceptor : m_components->requestInterceptors) {
m_currentResponse = interceptor->intercept(m_currentRequest);
if(m_currentResponse) {
return yieldTo(&HttpProcessor::Coroutine::onResponseFormed);
@ -256,7 +286,12 @@ oatpp::async::Action HttpProcessor::Coroutine::onHeadersParsed(const RequestHead
m_currentRoute = m_components->router->getRoute(headersReadResult.startingLine.method.toString(), headersReadResult.startingLine.path.toString());
if(!m_currentRoute) {
m_currentResponse = m_components->errorHandler->handleError(protocol::http::Status::CODE_404, "Current url has no mapping");
data::stream::BufferOutputStream ss;
ss << "No mapping for HTTP-method: '" << headersReadResult.startingLine.method.toString()
<< "', URL: '" << headersReadResult.startingLine.path.toString() << "'";
m_currentResponse = m_components->errorHandler->handleError(protocol::http::Status::CODE_404, ss.toString());
m_connectionState = ConnectionState::CLOSING;
return yieldTo(&HttpProcessor::Coroutine::onResponseFormed);
}
@ -277,15 +312,43 @@ HttpProcessor::Coroutine::Action HttpProcessor::Coroutine::onResponse(const std:
HttpProcessor::Coroutine::Action HttpProcessor::Coroutine::onResponseFormed() {
for(auto interceptor : m_components->responseInterceptors) {
for(auto& interceptor : m_components->responseInterceptors) {
m_currentResponse = interceptor->intercept(m_currentRequest, m_currentResponse);
if(!m_currentResponse) {
m_currentResponse = m_components->errorHandler->handleError(protocol::http::Status::CODE_500, "Invalid Response - 'null'.");
m_currentResponse = m_components->errorHandler->handleError(
protocol::http::Status::CODE_500,
"Response Interceptor returned an Invalid Response - 'null'"
);
}
}
m_currentResponse->putHeaderIfNotExists(protocol::http::Header::SERVER, protocol::http::Header::Value::SERVER);
m_connectionState = oatpp::web::protocol::http::utils::CommunicationUtils::considerConnectionState(m_currentRequest, m_currentResponse);
oatpp::web::protocol::http::utils::CommunicationUtils::considerConnectionState(m_currentRequest, m_currentResponse, m_connectionState);
switch(m_connectionState) {
case ConnectionState::ALIVE :
m_currentResponse->putHeaderIfNotExists(protocol::http::Header::CONNECTION, protocol::http::Header::Value::CONNECTION_KEEP_ALIVE);
break;
case ConnectionState::CLOSING:
case ConnectionState::DEAD:
m_currentResponse->putHeaderIfNotExists(protocol::http::Header::CONNECTION, protocol::http::Header::Value::CONNECTION_CLOSE);
break;
case ConnectionState::DELEGATED: {
auto handler = m_currentResponse->getConnectionUpgradeHandler();
if(handler) {
handler->handleConnection(m_connection, m_currentResponse->getConnectionUpgradeParameters());
m_connectionState = ConnectionState::DELEGATED;
} else {
OATPP_LOGW("[oatpp::web::server::HttpProcessor::Coroutine::onResponseFormed()]", "Warning. ConnectionUpgradeHandler not set!");
m_connectionState = ConnectionState::CLOSING;
}
break;
}
}
auto contentEncoderProvider =
protocol::http::utils::CommunicationUtils::selectEncoder(m_currentRequest, m_components->contentEncodingProviders);
@ -297,20 +360,12 @@ HttpProcessor::Coroutine::Action HttpProcessor::Coroutine::onResponseFormed() {
HttpProcessor::Coroutine::Action HttpProcessor::Coroutine::onRequestDone() {
if(m_connectionState == oatpp::web::protocol::http::utils::CommunicationUtils::CONNECTION_STATE_KEEP_ALIVE) {
if(m_connectionState == ConnectionState::ALIVE) {
return yieldTo(&HttpProcessor::Coroutine::parseHeaders);
}
if(m_connectionState == oatpp::web::protocol::http::utils::CommunicationUtils::CONNECTION_STATE_UPGRADE) {
auto handler = m_currentResponse->getConnectionUpgradeHandler();
if(handler) {
handler->handleConnection(m_connection, m_currentResponse->getConnectionUpgradeParameters());
} else {
OATPP_LOGW("[oatpp::web::server::HttpProcessor::Coroutine::onRequestDone()]", "Warning. ConnectionUpgradeHandler not set!");
}
}
return finish();
}
HttpProcessor::Coroutine::Action HttpProcessor::Coroutine::handleError(Error* error) {

View File

@ -52,6 +52,7 @@ public:
typedef std::list<std::shared_ptr<web::server::interceptor::RequestInterceptor>> RequestInterceptors;
typedef std::list<std::shared_ptr<web::server::interceptor::ResponseInterceptor>> ResponseInterceptors;
typedef web::protocol::http::incoming::RequestHeadersReader RequestHeadersReader;
typedef protocol::http::utils::CommunicationUtils::ConnectionState ConnectionState;
public:
/**
@ -171,7 +172,12 @@ private:
};
static bool processNextRequest(ProcessingResources& resources);
static
std::shared_ptr<protocol::http::outgoing::Response>
processNextRequest(ProcessingResources& resources,
const std::shared_ptr<protocol::http::incoming::Request>& request,
ConnectionState& connectionState);
static ConnectionState processNextRequest(ProcessingResources& resources);
public:
@ -215,7 +221,7 @@ public:
RequestHeadersReader m_headersReader;
std::shared_ptr<oatpp::data::stream::BufferOutputStream> m_headersOutBuffer;
std::shared_ptr<oatpp::data::stream::InputStreamBufferedProxy> m_inStream;
v_int32 m_connectionState;
ConnectionState m_connectionState;
private:
oatpp::web::server::HttpRouter::BranchRouter::Route m_currentRoute;
std::shared_ptr<protocol::http::incoming::Request> m_currentRequest;