refactor AsyncHttpConnectionHandler using async::Executor

This commit is contained in:
lganzzzo 2019-01-02 15:41:24 +02:00
parent 07c5e5e7b6
commit 81e134c458
2 changed files with 21 additions and 153 deletions

View File

@ -35,74 +35,21 @@
#include <errno.h>
namespace oatpp { namespace web { namespace server {
void AsyncHttpConnectionHandler::Task::consumeConnections(oatpp::async::Processor& processor) {
oatpp::concurrency::SpinLock lock(m_atom);
auto curr = m_connections.getFirstNode();
while (curr != nullptr) {
auto coroutine = HttpProcessor::Coroutine::getBench().obtain(m_router,
m_bodyDecoder,
m_errorHandler,
m_requestInterceptors,
curr->getData()->connection,
curr->getData()->ioBuffer,
curr->getData()->outStream,
curr->getData()->inStream);
processor.addWaitingCoroutine(coroutine);
curr = curr->getNext();
}
m_connections.clear();
}
void AsyncHttpConnectionHandler::Task::run(){
oatpp::async::Processor processor;
while(m_isRunning) {
/* Load all waiting connections into processor */
consumeConnections(processor);
/* Process all, and check for incoming connections once in 1000 iterations */
while (processor.iterate(1000)) {
consumeConnections(processor);
}
std::unique_lock<std::mutex> lock(m_taskMutex);
if(processor.isEmpty()) {
/* No tasks in the processor. Wait for incoming connections */
m_taskCondition.wait_for(lock, std::chrono::milliseconds(500));
} else {
/* There is still something in slow queue. Wait and get back to processing */
/* Waiting for IO is not Applicable here as slow queue may contain NON-IO tasks */
//OATPP_LOGD("proc", "waiting slow queue");
m_taskCondition.wait_for(lock, std::chrono::milliseconds(10));
}
}
}
void AsyncHttpConnectionHandler::handleConnection(const std::shared_ptr<oatpp::data::stream::IOStream>& connection){
auto task = m_tasks[m_taskBalancer % m_threadCount];
auto ioBuffer = oatpp::data::buffer::IOBuffer::createShared();
auto state = HttpProcessor::ConnectionState::createShared();
state->connection = connection;
state->ioBuffer = ioBuffer;
state->outStream = oatpp::data::stream::OutputStreamBufferedProxy::createShared(connection, ioBuffer);
state->inStream = oatpp::data::stream::InputStreamBufferedProxy::createShared(connection, ioBuffer);
auto outStream = oatpp::data::stream::OutputStreamBufferedProxy::createShared(connection, ioBuffer);
auto inStream = oatpp::data::stream::InputStreamBufferedProxy::createShared(connection, ioBuffer);
task->addConnection(state);
m_taskBalancer ++;
m_executor.execute<HttpProcessor::Coroutine>(m_router.get(),
m_bodyDecoder,
m_errorHandler,
&m_requestInterceptors,
connection,
ioBuffer,
outStream,
inStream);
}

View File

@ -26,112 +26,39 @@
#define oatpp_web_server_AsyncHttpConnectionHandler_hpp
#include "./HttpProcessor.hpp"
#include "./handler/ErrorHandler.hpp"
#include "./HttpRouter.hpp"
#include "oatpp/web/protocol/http/incoming/SimpleBodyDecoder.hpp"
#include "oatpp/web/protocol/http/incoming/Request.hpp"
#include "oatpp/web/protocol/http/outgoing/Response.hpp"
#include "oatpp/network/server/Server.hpp"
#include "oatpp/network/server/ConnectionHandler.hpp"
#include "oatpp/network/Connection.hpp"
#include "oatpp/core/concurrency/Thread.hpp"
#include "oatpp/core/concurrency/Runnable.hpp"
#include "oatpp/core/data/stream/StreamBufferedProxy.hpp"
#include "oatpp/core/data/buffer/IOBuffer.hpp"
#include "oatpp/core/async/Processor.hpp"
#include <mutex>
#include <condition_variable>
#include "oatpp/core/async/Executor.hpp"
namespace oatpp { namespace web { namespace server {
class AsyncHttpConnectionHandler : public base::Controllable, public network::server::ConnectionHandler {
private:
class Task : public base::Controllable, public concurrency::Runnable {
public:
typedef oatpp::collection::LinkedList<std::shared_ptr<HttpProcessor::ConnectionState>> Connections;
private:
void consumeConnections(oatpp::async::Processor& processor);
private:
oatpp::concurrency::SpinLock::Atom m_atom;
Connections m_connections;
private:
bool m_isRunning;
HttpRouter* m_router;
std::shared_ptr<const oatpp::web::protocol::http::incoming::BodyDecoder> m_bodyDecoder;
std::shared_ptr<handler::ErrorHandler> m_errorHandler;
HttpProcessor::RequestInterceptors* m_requestInterceptors;
std::mutex m_taskMutex;
std::condition_variable m_taskCondition;
public:
Task(HttpRouter* router,
const std::shared_ptr<const oatpp::web::protocol::http::incoming::BodyDecoder>& bodyDecoder,
const std::shared_ptr<handler::ErrorHandler>& errorHandler,
HttpProcessor::RequestInterceptors* requestInterceptors)
: m_atom(false)
, m_isRunning(true)
, m_router(router)
, m_bodyDecoder(bodyDecoder)
, m_errorHandler(errorHandler)
, m_requestInterceptors(requestInterceptors)
{}
public:
static std::shared_ptr<Task> createShared(HttpRouter* router,
const std::shared_ptr<const oatpp::web::protocol::http::incoming::BodyDecoder>& bodyDecoder,
const std::shared_ptr<handler::ErrorHandler>& errorHandler,
HttpProcessor::RequestInterceptors* requestInterceptors){
return std::make_shared<Task>(router, bodyDecoder, errorHandler, requestInterceptors);
}
void run() override;
void addConnection(const std::shared_ptr<HttpProcessor::ConnectionState>& connectionState){
oatpp::concurrency::SpinLock lock(m_atom);
m_connections.pushBack(connectionState);
m_taskCondition.notify_one();
}
void stop() {
m_isRunning = false;
}
};
typedef oatpp::web::protocol::http::incoming::BodyDecoder BodyDecoder;
private:
oatpp::async::Executor m_executor;
private:
std::shared_ptr<HttpRouter> m_router;
std::shared_ptr<handler::ErrorHandler> m_errorHandler;
HttpProcessor::RequestInterceptors m_requestInterceptors;
std::atomic<v_int32> m_taskBalancer;
v_int32 m_threadCount;
std::shared_ptr<Task>* m_tasks;
std::shared_ptr<const BodyDecoder> m_bodyDecoder; // TODO make bodyDecoder configurable here
public:
AsyncHttpConnectionHandler(const std::shared_ptr<HttpRouter>& router,
v_int32 threadCount = OATPP_ASYNC_HTTP_CONNECTION_HANDLER_THREAD_NUM_DEFAULT)
: m_router(router)
: m_executor(threadCount)
, m_router(router)
, m_errorHandler(handler::DefaultErrorHandler::createShared())
, m_taskBalancer(0)
, m_threadCount(threadCount)
, m_bodyDecoder(std::make_shared<oatpp::web::protocol::http::incoming::SimpleBodyDecoder>())
{
// TODO make bodyDecoder configurable here
std::shared_ptr<const oatpp::web::protocol::http::incoming::BodyDecoder> bodyDecoder =
std::make_shared<oatpp::web::protocol::http::incoming::SimpleBodyDecoder>();
m_tasks = new std::shared_ptr<Task>[m_threadCount];
for(v_int32 i = 0; i < m_threadCount; i++) {
auto task = Task::createShared(m_router.get(), bodyDecoder, m_errorHandler, &m_requestInterceptors);
m_tasks[i] = task;
concurrency::Thread thread(task);
thread.detach();
}
m_executor.detach(); // TODO consider making it configurable
}
public:
@ -139,10 +66,6 @@ public:
return std::make_shared<AsyncHttpConnectionHandler>(router);
}
~AsyncHttpConnectionHandler(){
delete [] m_tasks;
}
void setErrorHandler(const std::shared_ptr<handler::ErrorHandler>& errorHandler){
m_errorHandler = errorHandler;
if(!m_errorHandler) {
@ -157,9 +80,7 @@ public:
void handleConnection(const std::shared_ptr<oatpp::data::stream::IOStream>& connection) override;
void stop() {
for(v_int32 i = 0; i < m_threadCount; i ++) {
m_tasks[i]->stop();
}
m_executor.stop();
}
};