diff --git a/web/server/AsyncHttpConnectionHandler.cpp b/web/server/AsyncHttpConnectionHandler.cpp index ad736bac..a3363857 100644 --- a/web/server/AsyncHttpConnectionHandler.cpp +++ b/web/server/AsyncHttpConnectionHandler.cpp @@ -35,74 +35,21 @@ #include namespace oatpp { namespace web { namespace server { - -void AsyncHttpConnectionHandler::Task::consumeConnections(oatpp::async::Processor& processor) { - - oatpp::concurrency::SpinLock lock(m_atom); - - auto curr = m_connections.getFirstNode(); - while (curr != nullptr) { - - auto coroutine = HttpProcessor::Coroutine::getBench().obtain(m_router, - m_bodyDecoder, - m_errorHandler, - m_requestInterceptors, - curr->getData()->connection, - curr->getData()->ioBuffer, - curr->getData()->outStream, - curr->getData()->inStream); - - processor.addWaitingCoroutine(coroutine); - curr = curr->getNext(); - } - - m_connections.clear(); - -} - -void AsyncHttpConnectionHandler::Task::run(){ - - oatpp::async::Processor processor; - - while(m_isRunning) { - - /* Load all waiting connections into processor */ - consumeConnections(processor); - - /* Process all, and check for incoming connections once in 1000 iterations */ - while (processor.iterate(1000)) { - consumeConnections(processor); - } - - std::unique_lock lock(m_taskMutex); - if(processor.isEmpty()) { - /* No tasks in the processor. Wait for incoming connections */ - m_taskCondition.wait_for(lock, std::chrono::milliseconds(500)); - } else { - /* There is still something in slow queue. Wait and get back to processing */ - /* Waiting for IO is not Applicable here as slow queue may contain NON-IO tasks */ - //OATPP_LOGD("proc", "waiting slow queue"); - m_taskCondition.wait_for(lock, std::chrono::milliseconds(10)); - } - - } - -} void AsyncHttpConnectionHandler::handleConnection(const std::shared_ptr& connection){ - auto task = m_tasks[m_taskBalancer % m_threadCount]; - auto ioBuffer = oatpp::data::buffer::IOBuffer::createShared(); - auto state = HttpProcessor::ConnectionState::createShared(); - state->connection = connection; - state->ioBuffer = ioBuffer; - state->outStream = oatpp::data::stream::OutputStreamBufferedProxy::createShared(connection, ioBuffer); - state->inStream = oatpp::data::stream::InputStreamBufferedProxy::createShared(connection, ioBuffer); + auto outStream = oatpp::data::stream::OutputStreamBufferedProxy::createShared(connection, ioBuffer); + auto inStream = oatpp::data::stream::InputStreamBufferedProxy::createShared(connection, ioBuffer); - task->addConnection(state); - - m_taskBalancer ++; + m_executor.execute(m_router.get(), + m_bodyDecoder, + m_errorHandler, + &m_requestInterceptors, + connection, + ioBuffer, + outStream, + inStream); } diff --git a/web/server/AsyncHttpConnectionHandler.hpp b/web/server/AsyncHttpConnectionHandler.hpp index 5fc536d2..d712f8a6 100644 --- a/web/server/AsyncHttpConnectionHandler.hpp +++ b/web/server/AsyncHttpConnectionHandler.hpp @@ -26,112 +26,39 @@ #define oatpp_web_server_AsyncHttpConnectionHandler_hpp #include "./HttpProcessor.hpp" - #include "./handler/ErrorHandler.hpp" - #include "./HttpRouter.hpp" #include "oatpp/web/protocol/http/incoming/SimpleBodyDecoder.hpp" -#include "oatpp/web/protocol/http/incoming/Request.hpp" -#include "oatpp/web/protocol/http/outgoing/Response.hpp" - -#include "oatpp/network/server/Server.hpp" +#include "oatpp/network/server/ConnectionHandler.hpp" #include "oatpp/network/Connection.hpp" -#include "oatpp/core/concurrency/Thread.hpp" -#include "oatpp/core/concurrency/Runnable.hpp" - #include "oatpp/core/data/stream/StreamBufferedProxy.hpp" #include "oatpp/core/data/buffer/IOBuffer.hpp" -#include "oatpp/core/async/Processor.hpp" - -#include -#include +#include "oatpp/core/async/Executor.hpp" namespace oatpp { namespace web { namespace server { class AsyncHttpConnectionHandler : public base::Controllable, public network::server::ConnectionHandler { private: - - class Task : public base::Controllable, public concurrency::Runnable { - public: - typedef oatpp::collection::LinkedList> Connections; - private: - void consumeConnections(oatpp::async::Processor& processor); - private: - oatpp::concurrency::SpinLock::Atom m_atom; - Connections m_connections; - private: - bool m_isRunning; - HttpRouter* m_router; - std::shared_ptr m_bodyDecoder; - std::shared_ptr m_errorHandler; - HttpProcessor::RequestInterceptors* m_requestInterceptors; - std::mutex m_taskMutex; - std::condition_variable m_taskCondition; - public: - Task(HttpRouter* router, - const std::shared_ptr& bodyDecoder, - const std::shared_ptr& errorHandler, - HttpProcessor::RequestInterceptors* requestInterceptors) - : m_atom(false) - , m_isRunning(true) - , m_router(router) - , m_bodyDecoder(bodyDecoder) - , m_errorHandler(errorHandler) - , m_requestInterceptors(requestInterceptors) - {} - public: - - static std::shared_ptr createShared(HttpRouter* router, - const std::shared_ptr& bodyDecoder, - const std::shared_ptr& errorHandler, - HttpProcessor::RequestInterceptors* requestInterceptors){ - return std::make_shared(router, bodyDecoder, errorHandler, requestInterceptors); - } - - void run() override; - - void addConnection(const std::shared_ptr& connectionState){ - oatpp::concurrency::SpinLock lock(m_atom); - m_connections.pushBack(connectionState); - m_taskCondition.notify_one(); - } - - void stop() { - m_isRunning = false; - } - - }; - + typedef oatpp::web::protocol::http::incoming::BodyDecoder BodyDecoder; +private: + oatpp::async::Executor m_executor; private: std::shared_ptr m_router; std::shared_ptr m_errorHandler; HttpProcessor::RequestInterceptors m_requestInterceptors; - std::atomic m_taskBalancer; - v_int32 m_threadCount; - std::shared_ptr* m_tasks; + std::shared_ptr m_bodyDecoder; // TODO make bodyDecoder configurable here public: AsyncHttpConnectionHandler(const std::shared_ptr& router, v_int32 threadCount = OATPP_ASYNC_HTTP_CONNECTION_HANDLER_THREAD_NUM_DEFAULT) - : m_router(router) + : m_executor(threadCount) + , m_router(router) , m_errorHandler(handler::DefaultErrorHandler::createShared()) - , m_taskBalancer(0) - , m_threadCount(threadCount) + , m_bodyDecoder(std::make_shared()) { - - // TODO make bodyDecoder configurable here - std::shared_ptr bodyDecoder = - std::make_shared(); - - m_tasks = new std::shared_ptr[m_threadCount]; - for(v_int32 i = 0; i < m_threadCount; i++) { - auto task = Task::createShared(m_router.get(), bodyDecoder, m_errorHandler, &m_requestInterceptors); - m_tasks[i] = task; - concurrency::Thread thread(task); - thread.detach(); - } + m_executor.detach(); // TODO consider making it configurable } public: @@ -139,10 +66,6 @@ public: return std::make_shared(router); } - ~AsyncHttpConnectionHandler(){ - delete [] m_tasks; - } - void setErrorHandler(const std::shared_ptr& errorHandler){ m_errorHandler = errorHandler; if(!m_errorHandler) { @@ -157,9 +80,7 @@ public: void handleConnection(const std::shared_ptr& connection) override; void stop() { - for(v_int32 i = 0; i < m_threadCount; i ++) { - m_tasks[i]->stop(); - } + m_executor.stop(); } };