Utility methods for oatpp::concurrency::Thread

This commit is contained in:
lganzzzo 2018-09-23 23:05:37 +03:00
parent db4531bd3a
commit 8e5f64c916
7 changed files with 82 additions and 114 deletions

View File

@ -43,6 +43,11 @@
*/
//#define OATPP_DISABLE_POOL_ALLOCATIONS
/**
* Predefined value for function oatpp::concurrency::Thread::getHardwareConcurrency();
*/
//#define OATPP_THREAD_HARDWARE_CONCURRENCY 2
/**
* Number of shards of ThreadDistributedMemoryPool (Default pool for many oatpp objects)
* Higher number reduces threads racing for resources on each shard.

View File

@ -32,8 +32,8 @@ oatpp::concurrency::SpinLock::Atom MemoryPool::POOLS_ATOM(false);
std::unordered_map<v_int64, MemoryPool*> MemoryPool::POOLS;
std::atomic<v_int64> MemoryPool::poolIdCounter(0);
ThreadDistributedMemoryPool::ThreadDistributedMemoryPool(const std::string& name, v_int32 entrySize, v_int32 chunkSize)
: m_shardsCount(OATPP_THREAD_DISTRIBUTED_MEM_POOL_SHARDS_COUNT)
ThreadDistributedMemoryPool::ThreadDistributedMemoryPool(const std::string& name, v_int32 entrySize, v_int32 chunkSize, v_word32 shardsCount)
: m_shardsCount(shardsCount)
, m_shards(new MemoryPool*[m_shardsCount])
{
for(v_int32 i = 0; i < m_shardsCount; i++){
@ -49,8 +49,9 @@ ThreadDistributedMemoryPool::~ThreadDistributedMemoryPool(){
}
void* ThreadDistributedMemoryPool::obtain() {
static thread_local v_int16 index = oatpp::concurrency::Thread::getThisThreadId() % m_shardsCount;
return m_shards[index]->obtain();
static std::hash<std::thread::id> hashFunction;
static thread_local v_word32 hash = hashFunction(std::this_thread::get_id()) % m_shardsCount;
return m_shards[hash]->obtain();
}
}}}

View File

@ -171,10 +171,11 @@ public:
class ThreadDistributedMemoryPool {
private:
v_int32 m_shardsCount;
v_word32 m_shardsCount;
MemoryPool** m_shards;
public:
ThreadDistributedMemoryPool(const std::string& name, v_int32 entrySize, v_int32 chunkSize);
ThreadDistributedMemoryPool(const std::string& name, v_int32 entrySize, v_int32 chunkSize,
v_word32 shardsCount = OATPP_THREAD_DISTRIBUTED_MEM_POOL_SHARDS_COUNT);
virtual ~ThreadDistributedMemoryPool();
void* obtain();
};

View File

@ -23,4 +23,55 @@
***************************************************************************/
#include "Thread.hpp"
#include <atomic>
#if defined(_GNU_SOURCE)
#include <pthread.h>
#endif
namespace oatpp { namespace concurrency {
v_int32 Thread::getThreadSuggestedCpuIndex(std::thread::id threadId, v_int32 cpuCount) {
static std::hash<std::thread::id> hashFunction;
static thread_local v_int32 lock = hashFunction(std::this_thread::get_id()) % OATPP_THREAD_DISTRIBUTED_MEM_POOL_SHARDS_COUNT;
return lock % cpuCount;
}
v_int32 Thread::assignThreadToCpu(std::thread::native_handle_type nativeHandle, v_int32 cpuIndex) {
#if defined(_GNU_SOURCE)
cpu_set_t cpuset;
CPU_ZERO(&cpuset);
CPU_SET(cpuIndex, &cpuset);
v_int32 result = pthread_setaffinity_np(nativeHandle, sizeof(cpu_set_t), &cpuset);
if (result != 0) {
OATPP_LOGD("[oatpp::concurrency::Thread::assignThreadToCpu(...)]", "error code - %d", result);
}
return result;
#else
return -1;
#endif
}
v_int32 Thread::calcHardwareConcurrency() {
#if !defined(OATPP_THREAD_HARDWARE_CONCURRENCY)
v_int32 concurrency = std::thread::hardware_concurrency();
if(concurrency == 0) {
concurrency = 1;
}
return concurrency;
#else
return OATPP_THREAD_HARDWARE_CONCURRENCY;
#endif
}
v_int32 Thread::getHardwareConcurrency() {
static v_int32 concurrency = calcHardwareConcurrency();
return concurrency;
}
}}

View File

@ -31,9 +31,7 @@
#include "oatpp/core/base/Controllable.hpp"
#include <thread>
#include <atomic>
namespace oatpp { namespace concurrency {
@ -41,19 +39,12 @@ class Thread : public base::Controllable {
public:
OBJECT_POOL(Thread_Pool, Thread, 32)
SHARED_OBJECT_POOL(Shared_Thread_Pool, Thread, 32)
private:
static v_int32 calcHardwareConcurrency();
public:
static v_word32 getThisThreadId() {
static std::atomic<v_word32> base(0);
static thread_local v_int32 index = (base ++);
return index;
}
static v_word32 getSuggestedCpuIndex(v_word32 cpuCount) {
v_word32 lockIndex = getThisThreadId() % OATPP_THREAD_DISTRIBUTED_MEM_POOL_SHARDS_COUNT;
return lockIndex % cpuCount;
}
static v_int32 getThreadSuggestedCpuIndex(std::thread::id threadId, v_int32 cpuCount);
static v_int32 assignThreadToCpu(std::thread::native_handle_type nativeHandle, v_int32 cpuIndex);
static v_int32 getHardwareConcurrency();
private:
std::thread m_thread;
public:
@ -78,7 +69,7 @@ public:
m_thread.detach();
}
std::thread* getThread() {
std::thread* getStdThread() {
return &m_thread;
}

View File

@ -52,15 +52,7 @@ void HttpConnectionHandler::Task::run(){
bool keepAlive = true;
do {
std::shared_ptr<protocol::http::outgoing::Response> response;
{
PriorityController::PriorityLine priorityLine = m_priorityController->getLine();
priorityLine.wait();
response = HttpProcessor::processRequest(m_router, m_connection, m_errorHandler, m_requestInterceptors, buffer, bufferSize, inStream, keepAlive);
}
//auto response = HttpProcessor::processRequest(m_router, m_connection, m_errorHandler, m_requestInterceptors, buffer, bufferSize, inStream, keepAlive);
auto response = HttpProcessor::processRequest(m_router, m_connection, m_errorHandler, m_requestInterceptors, buffer, bufferSize, inStream, keepAlive);
if(response) {
outStream->setBufferPosition(0, 0);
@ -75,20 +67,17 @@ void HttpConnectionHandler::Task::run(){
}
void HttpConnectionHandler::handleConnection(const std::shared_ptr<oatpp::data::stream::IOStream>& connection){
concurrency::Thread thread(Task::createShared(m_router.get(), connection, m_errorHandler, &m_requestInterceptors, &m_priorityController));
/*
cpu_set_t cpuset;
CPU_ZERO(&cpuset);
CPU_SET(oatpp::concurrency::Thread::getSuggestedCpuIndex(3), &cpuset);
concurrency::Thread thread(Task::createShared(m_router.get(), connection, m_errorHandler, &m_requestInterceptors));
int rc = pthread_setaffinity_np(thread.getThread()->native_handle(), sizeof(cpu_set_t), &cpuset);
if (rc != 0) {
OATPP_LOGD("task", "setting cpu error %d", rc);
v_int32 concurrency = oatpp::concurrency::Thread::getHardwareConcurrency();
if(concurrency > 1) {
concurrency -= 1;
}
*/
v_int32 cpu = oatpp::concurrency::Thread::getThreadSuggestedCpuIndex(thread.getStdThread()->get_id(), concurrency);
oatpp::concurrency::Thread::assignThreadToCpu(thread.getStdThread()->native_handle(), cpu);
thread.detach();
}

View File

@ -46,97 +46,29 @@ namespace oatpp { namespace web { namespace server {
class HttpConnectionHandler : public base::Controllable, public network::server::ConnectionHandler {
private:
class PriorityController {
public:
class PriorityLine {
private:
PriorityController* m_controller;
bool m_priority;
public:
PriorityLine(PriorityController* controller)
: m_controller(controller)
, m_priority(controller->obtainPriority())
{
//OATPP_LOGD("line", "created");
}
~PriorityLine() {
if(m_priority) {
m_controller->giveUpPriority();
}
}
void wait() {
if(!m_priority) {
//OATPP_LOGD("line", "waiting thread %d", m_controller->m_counter.load() );
std::this_thread::sleep_for(std::chrono::milliseconds(100));
} else {
//OATPP_LOGD("line", "green!!! thread %d", m_controller->m_counter.load());
}
}
};
private:
oatpp::concurrency::SpinLock::Atom m_atom;
std::atomic<v_int32> m_counter;
v_int32 m_maxPriorityLines;
private:
bool obtainPriority() {
if(m_counter.fetch_add(1) < m_maxPriorityLines) {
return true;
} else {
--m_counter;
return false;
}
}
void giveUpPriority() {
-- m_counter;
}
public:
PriorityController(v_int32 maxPriorityLines)
: m_counter(0)
, m_maxPriorityLines(maxPriorityLines)
{}
PriorityLine getLine() {
return PriorityLine(this);
}
};
private:
class Task : public base::Controllable, public concurrency::Runnable{
private:
HttpRouter* m_router;
std::shared_ptr<oatpp::data::stream::IOStream> m_connection;
std::shared_ptr<handler::ErrorHandler> m_errorHandler;
HttpProcessor::RequestInterceptors* m_requestInterceptors;
PriorityController* m_priorityController;
public:
Task(HttpRouter* router,
const std::shared_ptr<oatpp::data::stream::IOStream>& connection,
const std::shared_ptr<handler::ErrorHandler>& errorHandler,
HttpProcessor::RequestInterceptors* requestInterceptors,
PriorityController* priorityController)
HttpProcessor::RequestInterceptors* requestInterceptors)
: m_router(router)
, m_connection(connection)
, m_errorHandler(errorHandler)
, m_requestInterceptors(requestInterceptors)
, m_priorityController(priorityController)
{}
public:
static std::shared_ptr<Task> createShared(HttpRouter* router,
const std::shared_ptr<oatpp::data::stream::IOStream>& connection,
const std::shared_ptr<handler::ErrorHandler>& errorHandler,
HttpProcessor::RequestInterceptors* requestInterceptors,
PriorityController* priorityController){
return std::make_shared<Task>(router, connection, errorHandler, requestInterceptors, priorityController);
HttpProcessor::RequestInterceptors* requestInterceptors) {
return std::make_shared<Task>(router, connection, errorHandler, requestInterceptors);
}
void run() override;
@ -147,12 +79,10 @@ private:
std::shared_ptr<HttpRouter> m_router;
std::shared_ptr<handler::ErrorHandler> m_errorHandler;
HttpProcessor::RequestInterceptors m_requestInterceptors;
PriorityController m_priorityController;
public:
HttpConnectionHandler(const std::shared_ptr<HttpRouter>& router)
: m_router(router)
, m_errorHandler(handler::DefaultErrorHandler::createShared())
, m_priorityController(8)
{}
public: