diff --git a/web/server/HttpConnectionHandler.cpp b/web/server/HttpConnectionHandler.cpp index 6b3be170..52e52767 100644 --- a/web/server/HttpConnectionHandler.cpp +++ b/web/server/HttpConnectionHandler.cpp @@ -45,10 +45,18 @@ void HttpConnectionHandler::Task::run(){ auto inStream = oatpp::data::stream::InputStreamBufferedProxy::createShared(m_connection, buffer, bufferSize); bool keepAlive = true; - do { + + std::shared_ptr response; + + { + PriorityController::PriorityLine priorityLine = m_priorityController->getLine(); + priorityLine.wait(); + + response = HttpProcessor::processRequest(m_router, m_connection, m_errorHandler, m_requestInterceptors, buffer, bufferSize, inStream, keepAlive); + } + //auto response = HttpProcessor::processRequest(m_router, m_connection, m_errorHandler, m_requestInterceptors, buffer, bufferSize, inStream, keepAlive); - auto response = HttpProcessor::processRequest(m_router, m_connection, m_errorHandler, m_requestInterceptors, buffer, bufferSize, inStream, keepAlive); if(response) { outStream->setBufferPosition(0, 0); response->send(outStream); @@ -62,7 +70,7 @@ void HttpConnectionHandler::Task::run(){ } void HttpConnectionHandler::handleConnection(const std::shared_ptr& connection){ - concurrency::Thread thread(Task::createShared(m_router.get(), connection, m_errorHandler, &m_requestInterceptors)); + concurrency::Thread thread(Task::createShared(m_router.get(), connection, m_errorHandler, &m_requestInterceptors, &m_priorityController)); thread.detach(); } diff --git a/web/server/HttpConnectionHandler.hpp b/web/server/HttpConnectionHandler.hpp index 5ea4a6fa..94c669d5 100644 --- a/web/server/HttpConnectionHandler.hpp +++ b/web/server/HttpConnectionHandler.hpp @@ -44,6 +44,71 @@ namespace oatpp { namespace web { namespace server { class HttpConnectionHandler : public base::Controllable, public network::server::ConnectionHandler { +private: + + class PriorityController { + public: + + class PriorityLine { + private: + PriorityController* m_controller; + bool m_priority; + public: + + PriorityLine(PriorityController* controller) + : m_controller(controller) + , m_priority(controller->obtainPriority()) + { + //OATPP_LOGD("line", "created"); + } + + ~PriorityLine() { + if(m_priority) { + m_controller->giveUpPriority(); + } + } + + void wait() { + if(!m_priority) { + //OATPP_LOGD("line", "waiting thread %d", m_controller->m_counter.load() ); + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + } else { + //OATPP_LOGD("line", "green!!! thread %d", m_controller->m_counter.load()); + } + } + + }; + + private: + oatpp::concurrency::SpinLock::Atom m_atom; + std::atomic m_counter; + v_int32 m_maxPriorityLines; + private: + bool obtainPriority() { + if(m_counter.fetch_add(1) < m_maxPriorityLines) { + return true; + } else { + --m_counter; + return false; + } + } + + void giveUpPriority() { + -- m_counter; + } + public: + + PriorityController(v_int32 maxPriorityLines) + : m_counter(0) + , m_maxPriorityLines(maxPriorityLines) + {} + + PriorityLine getLine() { + return PriorityLine(this); + } + + }; + private: class Task : public base::Controllable, public concurrency::Runnable{ private: @@ -51,23 +116,27 @@ private: std::shared_ptr m_connection; std::shared_ptr m_errorHandler; HttpProcessor::RequestInterceptors* m_requestInterceptors; + PriorityController* m_priorityController; public: Task(HttpRouter* router, const std::shared_ptr& connection, const std::shared_ptr& errorHandler, - HttpProcessor::RequestInterceptors* requestInterceptors) + HttpProcessor::RequestInterceptors* requestInterceptors, + PriorityController* priorityController) : m_router(router) , m_connection(connection) , m_errorHandler(errorHandler) , m_requestInterceptors(requestInterceptors) + , m_priorityController(priorityController) {} public: static std::shared_ptr createShared(HttpRouter* router, const std::shared_ptr& connection, const std::shared_ptr& errorHandler, - HttpProcessor::RequestInterceptors* requestInterceptors){ - return std::make_shared(router, connection, errorHandler, requestInterceptors); + HttpProcessor::RequestInterceptors* requestInterceptors, + PriorityController* priorityController){ + return std::make_shared(router, connection, errorHandler, requestInterceptors, priorityController); } void run() override; @@ -78,11 +147,12 @@ private: std::shared_ptr m_router; std::shared_ptr m_errorHandler; HttpProcessor::RequestInterceptors m_requestInterceptors; - + PriorityController m_priorityController; public: HttpConnectionHandler(const std::shared_ptr& router) : m_router(router) , m_errorHandler(handler::DefaultErrorHandler::createShared()) + , m_priorityController(8) {} public: