From ea0ec5f0593c848509a5b533bb6201108b3504cc Mon Sep 17 00:00:00 2001 From: lganzzzo Date: Sun, 20 Dec 2020 21:57:18 +0200 Subject: [PATCH] server::HttpProcessor: Refactor connection handling. --- .../http/utils/CommunicationUtils.cpp | 60 ++--- .../http/utils/CommunicationUtils.hpp | 31 +-- src/oatpp/web/server/HttpProcessor.cpp | 237 +++++++++++------- src/oatpp/web/server/HttpProcessor.hpp | 10 +- 4 files changed, 187 insertions(+), 151 deletions(-) diff --git a/src/oatpp/web/protocol/http/utils/CommunicationUtils.cpp b/src/oatpp/web/protocol/http/utils/CommunicationUtils.cpp index f93790e3..c680d4ce 100644 --- a/src/oatpp/web/protocol/http/utils/CommunicationUtils.cpp +++ b/src/oatpp/web/protocol/http/utils/CommunicationUtils.cpp @@ -31,29 +31,29 @@ bool CommunicationUtils::headerEqualsCI_FAST(const oatpp::data::share::MemoryLab return size == headerValue.getSize() && oatpp::base::StrBuffer::equalsCI_FAST(headerValue.getData(), value, size); } -v_int32 CommunicationUtils::considerConnectionState(const std::shared_ptr& request, - const std::shared_ptr& response){ - +void CommunicationUtils::considerConnectionState(const std::shared_ptr& request, + const std::shared_ptr& response, + ConnectionState& connectionState) +{ + + if(connectionState != ConnectionState::ALIVE) { + return; + } + auto outState = response->getHeaders().getAsMemoryLabel(Header::CONNECTION); if(outState && outState == Header::Value::CONNECTION_UPGRADE) { - return CONNECTION_STATE_UPGRADE; + connectionState = ConnectionState::DELEGATED; + return; } if(request) { - /* Set keep-alive to value specified in the client's request, if no Connection header present in response. */ - /* Set keep-alive to value specified in response otherwise */ + /* If the connection header is present in the request and its value isn't keep-alive, then close */ auto connection = request->getHeaders().getAsMemoryLabel(Header::CONNECTION); - if(connection && connection == Header::Value::CONNECTION_KEEP_ALIVE) { - if(outState) { - if(outState == Header::Value::CONNECTION_KEEP_ALIVE) { - return CONNECTION_STATE_KEEP_ALIVE; - } else { - return CONNECTION_STATE_CLOSE; - } - } else { - response->putHeader(Header::CONNECTION, Header::Value::CONNECTION_KEEP_ALIVE); - return CONNECTION_STATE_KEEP_ALIVE; + if(connection) { + if(connection != Header::Value::CONNECTION_KEEP_ALIVE) { + connectionState = ConnectionState::CLOSING; } + return; } /* If protocol == HTTP/1.1 */ @@ -61,35 +61,21 @@ v_int32 CommunicationUtils::considerConnectionState(const std::shared_ptrgetStartingLine().protocol; if(protocol && headerEqualsCI_FAST(protocol, "HTTP/1.1")) { - if(outState) { - if(outState == Header::Value::CONNECTION_KEEP_ALIVE) { - return CONNECTION_STATE_KEEP_ALIVE; - } else { - return CONNECTION_STATE_CLOSE; - } - } else { - response->putHeader(Header::CONNECTION, Header::Value::CONNECTION_KEEP_ALIVE); - return CONNECTION_STATE_KEEP_ALIVE; + if(outState && outState != Header::Value::CONNECTION_KEEP_ALIVE) { + connectionState = ConnectionState::CLOSING; } + return; } - } /* If protocol != HTTP/1.1 */ /* Set default Connection header value (Close), if no Connection header present in response. */ /* Set keep-alive to value specified in response otherwise */ - if(outState) { - if(outState == Header::Value::CONNECTION_KEEP_ALIVE) { - return CONNECTION_STATE_KEEP_ALIVE; - } else { - return CONNECTION_STATE_CLOSE; - } - } else { - response->putHeader(Header::CONNECTION, Header::Value::CONNECTION_CLOSE); - return CONNECTION_STATE_CLOSE; + if(!outState || outState != Header::Value::CONNECTION_KEEP_ALIVE) { + connectionState = ConnectionState::CLOSING; } - - return CONNECTION_STATE_CLOSE; + + return; } diff --git a/src/oatpp/web/protocol/http/utils/CommunicationUtils.hpp b/src/oatpp/web/protocol/http/utils/CommunicationUtils.hpp index 8e251eab..cbee3d54 100644 --- a/src/oatpp/web/protocol/http/utils/CommunicationUtils.hpp +++ b/src/oatpp/web/protocol/http/utils/CommunicationUtils.hpp @@ -36,20 +36,14 @@ namespace oatpp { namespace web { namespace protocol { namespace http { namespac */ class CommunicationUtils { public: - /** - * Connection state - close. - */ - static constexpr v_int32 CONNECTION_STATE_CLOSE = 0; - /** - * Connection state - keep alive. - */ - static constexpr v_int32 CONNECTION_STATE_KEEP_ALIVE = 1; + enum class ConnectionState : int { + ALIVE = 0, // Continue processing connection. + DELEGATED = 1, // Stop current connection processing as connection was delegated to other processor. + CLOSING = 2, // Move connection to "closing" pool. + DEAD = 3 // Drop immediately + }; - /** - * Connection state - upgrade. - */ - static constexpr v_int32 CONNECTION_STATE_UPGRADE = 2; private: static bool headerEqualsCI_FAST(const oatpp::data::share::MemoryLabel& headerValue, const char* value); public: @@ -57,18 +51,13 @@ public: /** * Consider keep connection alive taking into account request headers, response headers and protocol version.
* Corresponding header will be set to response if not existed before.
- * return one of (CONNECTION_STATE_CLOSE, CONNECTION_STATE_KEEP_ALIVE, CONNECTION_STATE_UPGRADE). * @param request - `std::shared_ptr` to &id:oatpp::web::protocol::http::incoming::Request; * @param response - `std::shared_ptr` to &id:oatpp::web::protocol::http::outgoing::Response; - * @return - one of values: - *
    - *
  • &l:CommunicationUtils::CONNECTION_STATE_CLOSE;
  • - *
  • &l:CommunicationUtils::CONNECTION_STATE_KEEP_ALIVE;
  • - *
  • &l:CommunicationUtils::CONNECTION_STATE_UPGRADE;
  • - *
+ * @param connectionState */ - static v_int32 considerConnectionState(const std::shared_ptr& request, - const std::shared_ptr& response); + static void considerConnectionState(const std::shared_ptr& request, + const std::shared_ptr& response, + ConnectionState& connectionState); static std::shared_ptr selectEncoder(const std::shared_ptr& request, const std::shared_ptr& providers); diff --git a/src/oatpp/web/server/HttpProcessor.cpp b/src/oatpp/web/server/HttpProcessor.cpp index 52d44ae4..8c24a78f 100644 --- a/src/oatpp/web/server/HttpProcessor.cpp +++ b/src/oatpp/web/server/HttpProcessor.cpp @@ -25,6 +25,7 @@ #include "HttpProcessor.hpp" #include "oatpp/web/protocol/http/incoming/SimpleBodyDecoder.hpp" +#include "oatpp/core/data/stream/BufferStream.hpp" namespace oatpp { namespace web { namespace server { @@ -80,108 +81,137 @@ HttpProcessor::ProcessingResources::ProcessingResources(const std::shared_ptrerrorHandler->handleError(error.status, "Invalid request headers"); - response->send(resources.connection.get(), &resources.headersOutBuffer, nullptr); - return false; - } - - if(error.ioStatus <= 0) { - return false; // connection is in invalid state. should be dropped - } - - auto request = protocol::http::incoming::Request::createShared(resources.connection, - headersReadResult.startingLine, - headersReadResult.headers, - resources.inStream, - resources.components->bodyDecoder); +std::shared_ptr +HttpProcessor::processNextRequest(ProcessingResources& resources, + const std::shared_ptr& request, + ConnectionState& connectionState) +{ std::shared_ptr response; try{ - for(auto interceptor : resources.components->requestInterceptors) { + for(auto& interceptor : resources.components->requestInterceptors) { response = interceptor->intercept(request); if(response) { - break; + return response; } } - if(!response) { + auto route = resources.components->router->getRoute(request->getStartingLine().method, request->getStartingLine().path); - auto route = resources.components->router->getRoute(headersReadResult.startingLine.method, headersReadResult.startingLine.path); + if(!route) { - if(!route) { - response = resources.components->errorHandler->handleError(protocol::http::Status::CODE_404, "Current url has no mapping"); - response->send(resources.connection.get(), &resources.headersOutBuffer, nullptr); - return false; - } + data::stream::BufferOutputStream ss; + ss << "No mapping for HTTP-method: '" << request->getStartingLine().method.toString() + << "', URL: '" << request->getStartingLine().path.toString() << "'"; - request->setPathVariables(route.getMatchMap()); - response = route.getEndpoint()->handle(request); + connectionState = ConnectionState::CLOSING; + return resources.components->errorHandler->handleError(protocol::http::Status::CODE_404, ss.toString()); } - for(auto interceptor : resources.components->responseInterceptors) { - response = interceptor->intercept(request, response); - if(!response) { - response = resources.components->errorHandler->handleError(protocol::http::Status::CODE_500, "Invalid Response - 'null'."); - response->send(resources.connection.get(), &resources.headersOutBuffer, nullptr); - return false; - } - } + request->setPathVariables(route.getMatchMap()); + return route.getEndpoint()->handle(request); } catch (oatpp::web::protocol::http::HttpError& error) { - response = resources.components->errorHandler->handleError(error.getInfo().status, error.getMessage(), error.getHeaders()); - response->send(resources.connection.get(), &resources.headersOutBuffer, nullptr); - return false; - + connectionState = ConnectionState::CLOSING; } catch (std::exception& error) { - response = resources.components->errorHandler->handleError(protocol::http::Status::CODE_500, error.what()); - response->send(resources.connection.get(), &resources.headersOutBuffer, nullptr); - return false; - + connectionState = ConnectionState::CLOSING; } catch (...) { - response = resources.components->errorHandler->handleError(protocol::http::Status::CODE_500, "Unknown error"); - response->send(resources.connection.get(), &resources.headersOutBuffer, nullptr); - return false; + response = resources.components->errorHandler->handleError(protocol::http::Status::CODE_500, "Unhandled Error"); + connectionState = ConnectionState::CLOSING; } - response->putHeaderIfNotExists(protocol::http::Header::SERVER, protocol::http::Header::Value::SERVER); - auto connectionState = protocol::http::utils::CommunicationUtils::considerConnectionState(request, response); + return response; + +} + +HttpProcessor::ConnectionState HttpProcessor::processNextRequest(ProcessingResources& resources) { + + oatpp::web::protocol::http::HttpError::Info error; + auto headersReadResult = resources.headersReader.readHeaders(resources.inStream.get(), error); + + if(error.ioStatus <= 0) { + return ConnectionState::DEAD; + } + + ConnectionState connectionState = ConnectionState::ALIVE; + std::shared_ptr request; + std::shared_ptr response; + + if(error.status.code != 0) { + response = resources.components->errorHandler->handleError(error.status, "Invalid Request Headers"); + connectionState = ConnectionState::CLOSING; + } else { + + request = protocol::http::incoming::Request::createShared(resources.connection, + headersReadResult.startingLine, + headersReadResult.headers, + resources.inStream, + resources.components->bodyDecoder); + + response = processNextRequest(resources, request, connectionState); + + try { + + for (auto& interceptor : resources.components->responseInterceptors) { + response = interceptor->intercept(request, response); + if (!response) { + response = resources.components->errorHandler->handleError( + protocol::http::Status::CODE_500, + "Response Interceptor returned an Invalid Response - 'null'" + ); + connectionState = ConnectionState::CLOSING; + } + } + + } catch (...) { + response = resources.components->errorHandler->handleError( + protocol::http::Status::CODE_500, + "Unhandled Error in Response Interceptor" + ); + connectionState = ConnectionState::CLOSING; + } + + response->putHeaderIfNotExists(protocol::http::Header::SERVER, protocol::http::Header::Value::SERVER); + protocol::http::utils::CommunicationUtils::considerConnectionState(request, response, connectionState); + + switch(connectionState) { + + case ConnectionState::ALIVE : + response->putHeaderIfNotExists(protocol::http::Header::CONNECTION, protocol::http::Header::Value::CONNECTION_KEEP_ALIVE); + break; + + case ConnectionState::CLOSING: + case ConnectionState::DEAD: + response->putHeaderIfNotExists(protocol::http::Header::CONNECTION, protocol::http::Header::Value::CONNECTION_CLOSE); + break; + + case ConnectionState::DELEGATED: { + auto handler = response->getConnectionUpgradeHandler(); + if(handler) { + handler->handleConnection(resources.connection, response->getConnectionUpgradeParameters()); + connectionState = ConnectionState::DELEGATED; + } else { + OATPP_LOGW("[oatpp::web::server::HttpProcessor::processNextRequest()]", "Warning. ConnectionUpgradeHandler not set!"); + connectionState = ConnectionState::CLOSING; + } + break; + } + + } + + } auto contentEncoderProvider = protocol::http::utils::CommunicationUtils::selectEncoder(request, resources.components->contentEncodingProviders); response->send(resources.connection.get(), &resources.headersOutBuffer, contentEncoderProvider.get()); - switch(connectionState) { - - case protocol::http::utils::CommunicationUtils::CONNECTION_STATE_KEEP_ALIVE: return true; - - case protocol::http::utils::CommunicationUtils::CONNECTION_STATE_UPGRADE: { - - auto handler = response->getConnectionUpgradeHandler(); - if(handler) { - handler->handleConnection(resources.connection, response->getConnectionUpgradeParameters()); - } else { - OATPP_LOGW("[oatpp::web::server::HttpProcessor::processNextRequest()]", "Warning. ConnectionUpgradeHandler not set!"); - } - - return false; - - } - - } - - return false; + return connectionState; } @@ -200,15 +230,15 @@ void HttpProcessor::Task::run(){ ProcessingResources resources(m_components, m_connection); - bool wantContinue; + ConnectionState connectionState; try { do { - wantContinue = HttpProcessor::processNextRequest(resources); + connectionState = HttpProcessor::processNextRequest(resources); - } while (wantContinue); + } while (connectionState == ConnectionState::ALIVE); } catch (...) { // DO NOTHING @@ -227,7 +257,7 @@ HttpProcessor::Coroutine::Coroutine(const std::shared_ptr& component , m_headersReader(&m_headersInBuffer, components->config->headersReaderChunkSize, components->config->headersReaderMaxSize) , m_headersOutBuffer(std::make_shared(components->config->headersOutBufferInitial)) , m_inStream(data::stream::InputStreamBufferedProxy::createShared(m_connection, base::StrBuffer::createShared(data::buffer::IOBuffer::BUFFER_SIZE))) - , m_connectionState(oatpp::web::protocol::http::utils::CommunicationUtils::CONNECTION_STATE_KEEP_ALIVE) + , m_connectionState(ConnectionState::ALIVE) {} HttpProcessor::Coroutine::Action HttpProcessor::Coroutine::act() { @@ -246,7 +276,7 @@ oatpp::async::Action HttpProcessor::Coroutine::onHeadersParsed(const RequestHead m_inStream, m_components->bodyDecoder); - for(auto interceptor : m_components->requestInterceptors) { + for(auto& interceptor : m_components->requestInterceptors) { m_currentResponse = interceptor->intercept(m_currentRequest); if(m_currentResponse) { return yieldTo(&HttpProcessor::Coroutine::onResponseFormed); @@ -256,7 +286,12 @@ oatpp::async::Action HttpProcessor::Coroutine::onHeadersParsed(const RequestHead m_currentRoute = m_components->router->getRoute(headersReadResult.startingLine.method.toString(), headersReadResult.startingLine.path.toString()); if(!m_currentRoute) { - m_currentResponse = m_components->errorHandler->handleError(protocol::http::Status::CODE_404, "Current url has no mapping"); + + data::stream::BufferOutputStream ss; + ss << "No mapping for HTTP-method: '" << headersReadResult.startingLine.method.toString() + << "', URL: '" << headersReadResult.startingLine.path.toString() << "'"; + m_currentResponse = m_components->errorHandler->handleError(protocol::http::Status::CODE_404, ss.toString()); + m_connectionState = ConnectionState::CLOSING; return yieldTo(&HttpProcessor::Coroutine::onResponseFormed); } @@ -277,15 +312,43 @@ HttpProcessor::Coroutine::Action HttpProcessor::Coroutine::onResponse(const std: HttpProcessor::Coroutine::Action HttpProcessor::Coroutine::onResponseFormed() { - for(auto interceptor : m_components->responseInterceptors) { + for(auto& interceptor : m_components->responseInterceptors) { m_currentResponse = interceptor->intercept(m_currentRequest, m_currentResponse); if(!m_currentResponse) { - m_currentResponse = m_components->errorHandler->handleError(protocol::http::Status::CODE_500, "Invalid Response - 'null'."); + m_currentResponse = m_components->errorHandler->handleError( + protocol::http::Status::CODE_500, + "Response Interceptor returned an Invalid Response - 'null'" + ); } } m_currentResponse->putHeaderIfNotExists(protocol::http::Header::SERVER, protocol::http::Header::Value::SERVER); - m_connectionState = oatpp::web::protocol::http::utils::CommunicationUtils::considerConnectionState(m_currentRequest, m_currentResponse); + oatpp::web::protocol::http::utils::CommunicationUtils::considerConnectionState(m_currentRequest, m_currentResponse, m_connectionState); + + switch(m_connectionState) { + + case ConnectionState::ALIVE : + m_currentResponse->putHeaderIfNotExists(protocol::http::Header::CONNECTION, protocol::http::Header::Value::CONNECTION_KEEP_ALIVE); + break; + + case ConnectionState::CLOSING: + case ConnectionState::DEAD: + m_currentResponse->putHeaderIfNotExists(protocol::http::Header::CONNECTION, protocol::http::Header::Value::CONNECTION_CLOSE); + break; + + case ConnectionState::DELEGATED: { + auto handler = m_currentResponse->getConnectionUpgradeHandler(); + if(handler) { + handler->handleConnection(m_connection, m_currentResponse->getConnectionUpgradeParameters()); + m_connectionState = ConnectionState::DELEGATED; + } else { + OATPP_LOGW("[oatpp::web::server::HttpProcessor::Coroutine::onResponseFormed()]", "Warning. ConnectionUpgradeHandler not set!"); + m_connectionState = ConnectionState::CLOSING; + } + break; + } + + } auto contentEncoderProvider = protocol::http::utils::CommunicationUtils::selectEncoder(m_currentRequest, m_components->contentEncodingProviders); @@ -297,20 +360,12 @@ HttpProcessor::Coroutine::Action HttpProcessor::Coroutine::onResponseFormed() { HttpProcessor::Coroutine::Action HttpProcessor::Coroutine::onRequestDone() { - if(m_connectionState == oatpp::web::protocol::http::utils::CommunicationUtils::CONNECTION_STATE_KEEP_ALIVE) { + if(m_connectionState == ConnectionState::ALIVE) { return yieldTo(&HttpProcessor::Coroutine::parseHeaders); } - if(m_connectionState == oatpp::web::protocol::http::utils::CommunicationUtils::CONNECTION_STATE_UPGRADE) { - auto handler = m_currentResponse->getConnectionUpgradeHandler(); - if(handler) { - handler->handleConnection(m_connection, m_currentResponse->getConnectionUpgradeParameters()); - } else { - OATPP_LOGW("[oatpp::web::server::HttpProcessor::Coroutine::onRequestDone()]", "Warning. ConnectionUpgradeHandler not set!"); - } - } - return finish(); + } HttpProcessor::Coroutine::Action HttpProcessor::Coroutine::handleError(Error* error) { diff --git a/src/oatpp/web/server/HttpProcessor.hpp b/src/oatpp/web/server/HttpProcessor.hpp index af315706..faf5f18a 100644 --- a/src/oatpp/web/server/HttpProcessor.hpp +++ b/src/oatpp/web/server/HttpProcessor.hpp @@ -52,6 +52,7 @@ public: typedef std::list> RequestInterceptors; typedef std::list> ResponseInterceptors; typedef web::protocol::http::incoming::RequestHeadersReader RequestHeadersReader; + typedef protocol::http::utils::CommunicationUtils::ConnectionState ConnectionState; public: /** @@ -171,7 +172,12 @@ private: }; - static bool processNextRequest(ProcessingResources& resources); + static + std::shared_ptr + processNextRequest(ProcessingResources& resources, + const std::shared_ptr& request, + ConnectionState& connectionState); + static ConnectionState processNextRequest(ProcessingResources& resources); public: @@ -215,7 +221,7 @@ public: RequestHeadersReader m_headersReader; std::shared_ptr m_headersOutBuffer; std::shared_ptr m_inStream; - v_int32 m_connectionState; + ConnectionState m_connectionState; private: oatpp::web::server::HttpRouter::BranchRouter::Route m_currentRoute; std::shared_ptr m_currentRequest;