priorities experiment

This commit is contained in:
lganzzzo 2018-09-21 02:55:42 +03:00
parent 369a994381
commit 4d5c02a2a1
2 changed files with 85 additions and 7 deletions

View File

@ -45,10 +45,18 @@ void HttpConnectionHandler::Task::run(){
auto inStream = oatpp::data::stream::InputStreamBufferedProxy::createShared(m_connection, buffer, bufferSize);
bool keepAlive = true;
do {
std::shared_ptr<protocol::http::outgoing::Response> response;
{
PriorityController::PriorityLine priorityLine = m_priorityController->getLine();
priorityLine.wait();
response = HttpProcessor::processRequest(m_router, m_connection, m_errorHandler, m_requestInterceptors, buffer, bufferSize, inStream, keepAlive);
}
//auto response = HttpProcessor::processRequest(m_router, m_connection, m_errorHandler, m_requestInterceptors, buffer, bufferSize, inStream, keepAlive);
auto response = HttpProcessor::processRequest(m_router, m_connection, m_errorHandler, m_requestInterceptors, buffer, bufferSize, inStream, keepAlive);
if(response) {
outStream->setBufferPosition(0, 0);
response->send(outStream);
@ -62,7 +70,7 @@ void HttpConnectionHandler::Task::run(){
}
void HttpConnectionHandler::handleConnection(const std::shared_ptr<oatpp::data::stream::IOStream>& connection){
concurrency::Thread thread(Task::createShared(m_router.get(), connection, m_errorHandler, &m_requestInterceptors));
concurrency::Thread thread(Task::createShared(m_router.get(), connection, m_errorHandler, &m_requestInterceptors, &m_priorityController));
thread.detach();
}

View File

@ -44,6 +44,71 @@
namespace oatpp { namespace web { namespace server {
class HttpConnectionHandler : public base::Controllable, public network::server::ConnectionHandler {
private:
class PriorityController {
public:
class PriorityLine {
private:
PriorityController* m_controller;
bool m_priority;
public:
PriorityLine(PriorityController* controller)
: m_controller(controller)
, m_priority(controller->obtainPriority())
{
//OATPP_LOGD("line", "created");
}
~PriorityLine() {
if(m_priority) {
m_controller->giveUpPriority();
}
}
void wait() {
if(!m_priority) {
//OATPP_LOGD("line", "waiting thread %d", m_controller->m_counter.load() );
std::this_thread::sleep_for(std::chrono::milliseconds(100));
} else {
//OATPP_LOGD("line", "green!!! thread %d", m_controller->m_counter.load());
}
}
};
private:
oatpp::concurrency::SpinLock::Atom m_atom;
std::atomic<v_int32> m_counter;
v_int32 m_maxPriorityLines;
private:
bool obtainPriority() {
if(m_counter.fetch_add(1) < m_maxPriorityLines) {
return true;
} else {
--m_counter;
return false;
}
}
void giveUpPriority() {
-- m_counter;
}
public:
PriorityController(v_int32 maxPriorityLines)
: m_counter(0)
, m_maxPriorityLines(maxPriorityLines)
{}
PriorityLine getLine() {
return PriorityLine(this);
}
};
private:
class Task : public base::Controllable, public concurrency::Runnable{
private:
@ -51,23 +116,27 @@ private:
std::shared_ptr<oatpp::data::stream::IOStream> m_connection;
std::shared_ptr<handler::ErrorHandler> m_errorHandler;
HttpProcessor::RequestInterceptors* m_requestInterceptors;
PriorityController* m_priorityController;
public:
Task(HttpRouter* router,
const std::shared_ptr<oatpp::data::stream::IOStream>& connection,
const std::shared_ptr<handler::ErrorHandler>& errorHandler,
HttpProcessor::RequestInterceptors* requestInterceptors)
HttpProcessor::RequestInterceptors* requestInterceptors,
PriorityController* priorityController)
: m_router(router)
, m_connection(connection)
, m_errorHandler(errorHandler)
, m_requestInterceptors(requestInterceptors)
, m_priorityController(priorityController)
{}
public:
static std::shared_ptr<Task> createShared(HttpRouter* router,
const std::shared_ptr<oatpp::data::stream::IOStream>& connection,
const std::shared_ptr<handler::ErrorHandler>& errorHandler,
HttpProcessor::RequestInterceptors* requestInterceptors){
return std::make_shared<Task>(router, connection, errorHandler, requestInterceptors);
HttpProcessor::RequestInterceptors* requestInterceptors,
PriorityController* priorityController){
return std::make_shared<Task>(router, connection, errorHandler, requestInterceptors, priorityController);
}
void run() override;
@ -78,11 +147,12 @@ private:
std::shared_ptr<HttpRouter> m_router;
std::shared_ptr<handler::ErrorHandler> m_errorHandler;
HttpProcessor::RequestInterceptors m_requestInterceptors;
PriorityController m_priorityController;
public:
HttpConnectionHandler(const std::shared_ptr<HttpRouter>& router)
: m_router(router)
, m_errorHandler(handler::DefaultErrorHandler::createShared())
, m_priorityController(8)
{}
public: