refactor oatpp::async::worker::Worker interface

This commit is contained in:
lganzzzo 2019-05-03 17:26:47 +03:00
parent 5ee7a1f88b
commit acc719957a
9 changed files with 167 additions and 79 deletions

View File

@ -33,10 +33,17 @@ namespace oatpp { namespace async {
// Executor::SubmissionProcessor
Executor::SubmissionProcessor::SubmissionProcessor()
: m_isRunning(true)
{}
: worker::Worker(worker::Worker::Type::PROCESSOR)
, m_isRunning(true)
{
m_thread = std::thread(&Executor::SubmissionProcessor::run, this);
}
void Executor::SubmissionProcessor::run(){
oatpp::async::Processor& Executor::SubmissionProcessor::getProcessor() {
return m_processor;
}
void Executor::SubmissionProcessor::run() {
while(m_isRunning) {
m_processor.waitForTasks();
@ -45,44 +52,52 @@ void Executor::SubmissionProcessor::run(){
}
void Executor::SubmissionProcessor::pushTasks(oatpp::collection::FastQueue<AbstractCoroutine>& tasks) {
std::runtime_error("[oatpp::async::Executor::SubmissionProcessor::pushTasks]: Error. This method does nothing.");
}
void Executor::SubmissionProcessor::pushOneTask(AbstractCoroutine* task) {
std::runtime_error("[oatpp::async::Executor::SubmissionProcessor::pushOneTask]: Error. This method does nothing.");
}
void Executor::SubmissionProcessor::stop() {
m_isRunning = false;
m_processor.stop();
}
void Executor::SubmissionProcessor::join() {
m_thread.join();
}
void Executor::SubmissionProcessor::detach() {
m_thread.detach();
}
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// Executor
const v_int32 Executor::THREAD_NUM_DEFAULT = OATPP_ASYNC_EXECUTOR_THREAD_NUM_DEFAULT;
Executor::Executor(v_int32 processorThreads, v_int32 ioThreads, v_int32 timerThreads)
: m_processorThreads(processorThreads)
, m_ioThreads(ioThreads)
, m_timerThreads(timerThreads)
, m_threadsCount(m_processorThreads + ioThreads + timerThreads)
, m_threads(new std::thread[m_threadsCount])
, m_processors(new SubmissionProcessor[m_processorThreads])
Executor::Executor(v_int32 processorWorkersCount, v_int32 ioWorkersCount, v_int32 timerWorkersCount)
: m_balancer(0)
{
v_int32 threadCnt = 0;
for(v_int32 i = 0; i < m_processorThreads; i ++) {
m_threads[threadCnt ++] = std::thread(&SubmissionProcessor::run, &m_processors[i]);
for(v_int32 i = 0; i < processorWorkersCount; i ++) {
m_processorWorkers.push_back(std::make_shared<SubmissionProcessor>());
}
m_allWorkers.insert(m_allWorkers.end(), m_processorWorkers.begin(), m_processorWorkers.end());
std::vector<std::shared_ptr<worker::Worker>> ioWorkers;
for(v_int32 i = 0; i < m_ioThreads; i++) {
std::shared_ptr<worker::Worker> worker = std::make_shared<worker::IOEventWorker>();
ioWorkers.push_back(worker);
m_threads[threadCnt ++] = std::thread(&worker::Worker::run, worker.get());
for(v_int32 i = 0; i < ioWorkersCount; i++) {
ioWorkers.push_back(std::make_shared<worker::IOEventWorker>());
}
linkWorkers(ioWorkers);
std::vector<std::shared_ptr<worker::Worker>> timerWorkers;
for(v_int32 i = 0; i < m_timerThreads; i++) {
std::shared_ptr<worker::Worker> worker = std::make_shared<worker::TimerWorker>();
timerWorkers.push_back(worker);
m_threads[threadCnt ++] = std::thread(&worker::Worker::run, worker.get());
for(v_int32 i = 0; i < timerWorkersCount; i++) {
timerWorkers.push_back(std::make_shared<worker::TimerWorker>());
}
linkWorkers(timerWorkers);
@ -90,44 +105,42 @@ Executor::Executor(v_int32 processorThreads, v_int32 ioThreads, v_int32 timerThr
}
Executor::~Executor() {
delete [] m_processors;
delete [] m_threads;
}
void Executor::linkWorkers(const std::vector<std::shared_ptr<worker::Worker>>& workers) {
m_workers.insert(m_workers.end(), workers.begin(), workers.end());
m_allWorkers.insert(m_allWorkers.end(), workers.begin(), workers.end());
if(m_processorThreads > workers.size() && (m_processorThreads % workers.size()) == 0) {
if(m_processorWorkers.size() > workers.size() && (m_processorWorkers.size() % workers.size()) == 0) {
v_int32 wi = 0;
for(v_int32 i = 0; i < m_processorThreads; i ++) {
auto& p = m_processors[i];
p.getProcessor().addWorker(workers[wi]);
for(v_int32 i = 0; i < m_processorWorkers.size(); i ++) {
auto& p = m_processorWorkers[i];
p->getProcessor().addWorker(workers[wi]);
wi ++;
if(wi == workers.size()) {
wi = 0;
}
}
} else if ((workers.size() % m_processorThreads) == 0) {
} else if ((workers.size() % m_processorWorkers.size()) == 0) {
v_int32 pi = 0;
for(v_int32 i = 0; i < workers.size(); i ++) {
auto& p = m_processors[pi];
p.getProcessor().addWorker(workers[i]);
auto& p = m_processorWorkers[pi];
p->getProcessor().addWorker(workers[i]);
pi ++;
if(pi == m_processorThreads) {
if(pi == m_processorWorkers.size()) {
pi = 0;
}
}
} else {
for(v_int32 i = 0; i < m_processorThreads; i ++) {
auto& p = m_processors[i];
for(v_int32 i = 0; i < m_processorWorkers.size(); i ++) {
auto& p = m_processorWorkers[i];
for(auto& w : workers) {
p.getProcessor().addWorker(w);
p->getProcessor().addWorker(w);
}
}
@ -136,33 +149,33 @@ void Executor::linkWorkers(const std::vector<std::shared_ptr<worker::Worker>>& w
}
void Executor::join() {
for(v_int32 i = 0; i < m_threadsCount; i ++) {
m_threads[i].join();
for(auto& worker : m_allWorkers) {
worker->join();
}
}
void Executor::detach() {
for(v_int32 i = 0; i < m_threadsCount; i ++) {
m_threads[i].detach();
for(auto& worker : m_allWorkers) {
worker->detach();
}
}
void Executor::stop() {
for(v_int32 i = 0; i < m_processorThreads; i ++) {
m_processors[i].stop();
}
for(auto& worker : m_workers) {
for(auto& worker : m_allWorkers) {
worker->stop();
}
}
v_int32 Executor::getTasksCount() {
v_int32 result = 0;
for(v_int32 i = 0; i < m_processorThreads; i ++) {
result += m_processors[i].getProcessor().getTasksCount();
for(auto procWorker : m_processorWorkers) {
result += procWorker->getProcessor().getTasksCount();
}
return result;
}
void Executor::waitTasksFinished(const std::chrono::duration<v_int64, std::micro>& timeout) {

View File

@ -47,26 +47,35 @@ namespace oatpp { namespace async {
class Executor {
private:
class SubmissionProcessor/* : public Worker */{
class SubmissionProcessor : public worker::Worker {
private:
oatpp::async::Processor m_processor;
private:
bool m_isRunning;
private:
std::thread m_thread;
public:
SubmissionProcessor();
public:
void run();
void stop() ;
template<typename CoroutineType, typename ... Args>
void execute(Args... params) {
m_processor.execute<CoroutineType, Args...>(params...);
}
oatpp::async::Processor& getProcessor() {
return m_processor;
}
oatpp::async::Processor& getProcessor();
void pushTasks(oatpp::collection::FastQueue<AbstractCoroutine>& tasks) override;
void pushOneTask(AbstractCoroutine* task) override;
void run();
void stop() override;
void join() override;
void detach() override;
};
@ -76,28 +85,23 @@ public:
*/
static const v_int32 THREAD_NUM_DEFAULT;
private:
v_int32 m_processorThreads;
v_int32 m_ioThreads;
v_int32 m_timerThreads;
v_int32 m_threadsCount;
std::thread* m_threads;
SubmissionProcessor* m_processors;
std::atomic<v_word32> m_balancer;
private:
std::vector<std::shared_ptr<worker::Worker>> m_workers;
std::vector<std::shared_ptr<SubmissionProcessor>> m_processorWorkers;
std::vector<std::shared_ptr<worker::Worker>> m_allWorkers;
private:
void linkWorkers(const std::vector<std::shared_ptr<worker::Worker>>& workers);
public:
/**
* Constructor.
* @param processorThreads - number of data processing threads.
* @param ioThreads - number of I/O threads.
* @param timerThreads - number of timer threads.
* @param processorWorkersCount - number of data processing workers.
* @param ioWorkersCount - number of I/O processing workers.
* @param timerWorkersCount - number of timer processing workers.
*/
Executor(v_int32 processorThreads = THREAD_NUM_DEFAULT,
v_int32 ioThreads = 1,
v_int32 timerThreads = 1);
Executor(v_int32 processorWorkersCount = THREAD_NUM_DEFAULT,
v_int32 ioWorkersCount = 1,
v_int32 timerWorkersCount = 1);
/**
* Non-virtual Destructor.
@ -128,8 +132,8 @@ public:
*/
template<typename CoroutineType, typename ... Args>
void execute(Args... params) {
auto& processor = m_processors[(++ m_balancer) % m_processorThreads];
processor.execute<CoroutineType, Args...>(params...);
auto& processor = m_processorWorkers[(++ m_balancer) % m_processorWorkers.size()];
processor->execute<CoroutineType, Args...>(params...);
}
/**

View File

@ -74,6 +74,8 @@ private:
p_char8 m_inEvents;
v_int32 m_inEventsCount;
p_char8 m_outEvents;
private:
std::thread m_thread;
private:
void consumeBacklog();
void waitEvents();
@ -109,13 +111,23 @@ public:
/**
* Run worker.
*/
void run() override;
void run();
/**
* Break run loop.
*/
void stop() override;
/**
* Join all worker-threads.
*/
void join() override;
/**
* Detach all worker-threads.
*/
void detach() override;
};
}}}

View File

@ -36,7 +36,9 @@ IOEventWorker::IOEventWorker()
, m_inEvents(nullptr)
, m_inEventsCount(0)
, m_outEvents(nullptr)
{}
{
m_thread = std::thread(&IOEventWorker::run, this);
}
IOEventWorker::~IOEventWorker() {
@ -95,4 +97,12 @@ void IOEventWorker::stop() {
triggerWakeup();
}
void IOEventWorker::join() {
m_thread.join();
}
void IOEventWorker::detach() {
m_thread.detach();
}
}}}

View File

@ -33,7 +33,9 @@ namespace oatpp { namespace async { namespace worker {
IOWorker::IOWorker()
: Worker(Type::IO)
, m_running(true)
{}
{
m_thread = std::thread(&IOWorker::run, this);
}
void IOWorker::pushTasks(oatpp::collection::FastQueue<AbstractCoroutine>& tasks) {
{
@ -165,4 +167,12 @@ void IOWorker::stop() {
m_backlogCondition.notify_one();
}
void IOWorker::join() {
m_thread.join();
}
void IOWorker::detach() {
m_thread.detach();
}
}}}

View File

@ -46,6 +46,8 @@ private:
oatpp::collection::FastQueue<AbstractCoroutine> m_queue;
oatpp::concurrency::SpinLock m_backlogLock;
std::condition_variable_any m_backlogCondition;
private:
std::thread m_thread;
private:
void consumeBacklog(bool blockToConsume);
public:
@ -70,13 +72,23 @@ public:
/**
* Run worker.
*/
void run() override;
void run();
/**
* Break run loop.
*/
void stop() override;
/**
* Join all worker-threads.
*/
void join() override;
/**
* Detach all worker-threads.
*/
void detach() override;
};
}}}

View File

@ -34,7 +34,9 @@ TimerWorker::TimerWorker(const std::chrono::duration<v_int64, std::micro>& granu
: Worker(Type::TIMER)
, m_running(true)
, m_granularity(granularity)
{}
{
m_thread = std::thread(&TimerWorker::run, this);
}
void TimerWorker::pushTasks(oatpp::collection::FastQueue<AbstractCoroutine>& tasks) {
{
@ -128,4 +130,12 @@ void TimerWorker::stop() {
m_backlogCondition.notify_one();
}
void TimerWorker::join() {
m_thread.join();
}
void TimerWorker::detach() {
m_thread.detach();
}
}}}

View File

@ -48,6 +48,8 @@ private:
std::condition_variable_any m_backlogCondition;
private:
std::chrono::duration<v_int64, std::micro> m_granularity;
private:
std::thread m_thread;
private:
void consumeBacklog();
public:
@ -73,13 +75,23 @@ public:
/**
* Run worker.
*/
void run() override;
void run();
/**
* Break run loop.
*/
void stop() override;
/**
* Join all worker-threads.
*/
void join() override;
/**
* Detach all worker-threads.
*/
void detach() override;
};
}}}

View File

@ -97,16 +97,21 @@ public:
*/
virtual void pushOneTask(AbstractCoroutine* task) = 0;
/**
* Run worker.
*/
virtual void run() = 0;
/**
* Break run loop.
*/
virtual void stop() = 0;
/**
* Join all worker-threads.
*/
virtual void join() = 0;
/**
* Detach all worker-threads.
*/
virtual void detach() = 0;
/**
* Get worker type.
* @return - one of &l:Worker::Type; values.