From acc719957acb8481b6642523ec386212b7ed3bc4 Mon Sep 17 00:00:00 2001 From: lganzzzo Date: Fri, 3 May 2019 17:26:47 +0300 Subject: [PATCH] refactor oatpp::async::worker::Worker interface --- src/oatpp/core/async/Executor.cpp | 105 ++++++++++-------- src/oatpp/core/async/Executor.hpp | 48 ++++---- src/oatpp/core/async/worker/IOEventWorker.hpp | 14 ++- .../async/worker/IOEventWorker_common.cpp | 12 +- src/oatpp/core/async/worker/IOWorker.cpp | 12 +- src/oatpp/core/async/worker/IOWorker.hpp | 14 ++- src/oatpp/core/async/worker/TimerWorker.cpp | 12 +- src/oatpp/core/async/worker/TimerWorker.hpp | 14 ++- src/oatpp/core/async/worker/Worker.hpp | 15 ++- 9 files changed, 167 insertions(+), 79 deletions(-) diff --git a/src/oatpp/core/async/Executor.cpp b/src/oatpp/core/async/Executor.cpp index eb074c2f..941a8015 100644 --- a/src/oatpp/core/async/Executor.cpp +++ b/src/oatpp/core/async/Executor.cpp @@ -33,10 +33,17 @@ namespace oatpp { namespace async { // Executor::SubmissionProcessor Executor::SubmissionProcessor::SubmissionProcessor() - : m_isRunning(true) -{} + : worker::Worker(worker::Worker::Type::PROCESSOR) + , m_isRunning(true) +{ + m_thread = std::thread(&Executor::SubmissionProcessor::run, this); +} -void Executor::SubmissionProcessor::run(){ +oatpp::async::Processor& Executor::SubmissionProcessor::getProcessor() { + return m_processor; +} + +void Executor::SubmissionProcessor::run() { while(m_isRunning) { m_processor.waitForTasks(); @@ -45,44 +52,52 @@ void Executor::SubmissionProcessor::run(){ } +void Executor::SubmissionProcessor::pushTasks(oatpp::collection::FastQueue& tasks) { + std::runtime_error("[oatpp::async::Executor::SubmissionProcessor::pushTasks]: Error. This method does nothing."); +} + +void Executor::SubmissionProcessor::pushOneTask(AbstractCoroutine* task) { + std::runtime_error("[oatpp::async::Executor::SubmissionProcessor::pushOneTask]: Error. This method does nothing."); +} + void Executor::SubmissionProcessor::stop() { m_isRunning = false; m_processor.stop(); } +void Executor::SubmissionProcessor::join() { + m_thread.join(); +} + +void Executor::SubmissionProcessor::detach() { + m_thread.detach(); +} + //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// // Executor const v_int32 Executor::THREAD_NUM_DEFAULT = OATPP_ASYNC_EXECUTOR_THREAD_NUM_DEFAULT; -Executor::Executor(v_int32 processorThreads, v_int32 ioThreads, v_int32 timerThreads) - : m_processorThreads(processorThreads) - , m_ioThreads(ioThreads) - , m_timerThreads(timerThreads) - , m_threadsCount(m_processorThreads + ioThreads + timerThreads) - , m_threads(new std::thread[m_threadsCount]) - , m_processors(new SubmissionProcessor[m_processorThreads]) +Executor::Executor(v_int32 processorWorkersCount, v_int32 ioWorkersCount, v_int32 timerWorkersCount) + : m_balancer(0) { - v_int32 threadCnt = 0; - for(v_int32 i = 0; i < m_processorThreads; i ++) { - m_threads[threadCnt ++] = std::thread(&SubmissionProcessor::run, &m_processors[i]); + for(v_int32 i = 0; i < processorWorkersCount; i ++) { + m_processorWorkers.push_back(std::make_shared()); } + m_allWorkers.insert(m_allWorkers.end(), m_processorWorkers.begin(), m_processorWorkers.end()); + std::vector> ioWorkers; - for(v_int32 i = 0; i < m_ioThreads; i++) { - std::shared_ptr worker = std::make_shared(); - ioWorkers.push_back(worker); - m_threads[threadCnt ++] = std::thread(&worker::Worker::run, worker.get()); + for(v_int32 i = 0; i < ioWorkersCount; i++) { + ioWorkers.push_back(std::make_shared()); } linkWorkers(ioWorkers); std::vector> timerWorkers; - for(v_int32 i = 0; i < m_timerThreads; i++) { - std::shared_ptr worker = std::make_shared(); - timerWorkers.push_back(worker); - m_threads[threadCnt ++] = std::thread(&worker::Worker::run, worker.get()); + for(v_int32 i = 0; i < timerWorkersCount; i++) { + timerWorkers.push_back(std::make_shared()); } linkWorkers(timerWorkers); @@ -90,44 +105,42 @@ Executor::Executor(v_int32 processorThreads, v_int32 ioThreads, v_int32 timerThr } Executor::~Executor() { - delete [] m_processors; - delete [] m_threads; } void Executor::linkWorkers(const std::vector>& workers) { - m_workers.insert(m_workers.end(), workers.begin(), workers.end()); + m_allWorkers.insert(m_allWorkers.end(), workers.begin(), workers.end()); - if(m_processorThreads > workers.size() && (m_processorThreads % workers.size()) == 0) { + if(m_processorWorkers.size() > workers.size() && (m_processorWorkers.size() % workers.size()) == 0) { v_int32 wi = 0; - for(v_int32 i = 0; i < m_processorThreads; i ++) { - auto& p = m_processors[i]; - p.getProcessor().addWorker(workers[wi]); + for(v_int32 i = 0; i < m_processorWorkers.size(); i ++) { + auto& p = m_processorWorkers[i]; + p->getProcessor().addWorker(workers[wi]); wi ++; if(wi == workers.size()) { wi = 0; } } - } else if ((workers.size() % m_processorThreads) == 0) { + } else if ((workers.size() % m_processorWorkers.size()) == 0) { v_int32 pi = 0; for(v_int32 i = 0; i < workers.size(); i ++) { - auto& p = m_processors[pi]; - p.getProcessor().addWorker(workers[i]); + auto& p = m_processorWorkers[pi]; + p->getProcessor().addWorker(workers[i]); pi ++; - if(pi == m_processorThreads) { + if(pi == m_processorWorkers.size()) { pi = 0; } } } else { - for(v_int32 i = 0; i < m_processorThreads; i ++) { - auto& p = m_processors[i]; + for(v_int32 i = 0; i < m_processorWorkers.size(); i ++) { + auto& p = m_processorWorkers[i]; for(auto& w : workers) { - p.getProcessor().addWorker(w); + p->getProcessor().addWorker(w); } } @@ -136,33 +149,33 @@ void Executor::linkWorkers(const std::vector>& w } void Executor::join() { - for(v_int32 i = 0; i < m_threadsCount; i ++) { - m_threads[i].join(); + for(auto& worker : m_allWorkers) { + worker->join(); } } void Executor::detach() { - for(v_int32 i = 0; i < m_threadsCount; i ++) { - m_threads[i].detach(); + for(auto& worker : m_allWorkers) { + worker->detach(); } } void Executor::stop() { - for(v_int32 i = 0; i < m_processorThreads; i ++) { - m_processors[i].stop(); - } - - for(auto& worker : m_workers) { + for(auto& worker : m_allWorkers) { worker->stop(); } } v_int32 Executor::getTasksCount() { + v_int32 result = 0; - for(v_int32 i = 0; i < m_processorThreads; i ++) { - result += m_processors[i].getProcessor().getTasksCount(); + + for(auto procWorker : m_processorWorkers) { + result += procWorker->getProcessor().getTasksCount(); } + return result; + } void Executor::waitTasksFinished(const std::chrono::duration& timeout) { diff --git a/src/oatpp/core/async/Executor.hpp b/src/oatpp/core/async/Executor.hpp index 3863f77a..ce8f926a 100644 --- a/src/oatpp/core/async/Executor.hpp +++ b/src/oatpp/core/async/Executor.hpp @@ -47,26 +47,35 @@ namespace oatpp { namespace async { class Executor { private: - class SubmissionProcessor/* : public Worker */{ + class SubmissionProcessor : public worker::Worker { private: oatpp::async::Processor m_processor; private: bool m_isRunning; + private: + std::thread m_thread; public: SubmissionProcessor(); public: - - void run(); - void stop() ; template void execute(Args... params) { m_processor.execute(params...); } - oatpp::async::Processor& getProcessor() { - return m_processor; - } + oatpp::async::Processor& getProcessor(); + + void pushTasks(oatpp::collection::FastQueue& tasks) override; + + void pushOneTask(AbstractCoroutine* task) override; + + void run(); + + void stop() override; + + void join() override; + + void detach() override; }; @@ -76,28 +85,23 @@ public: */ static const v_int32 THREAD_NUM_DEFAULT; private: - v_int32 m_processorThreads; - v_int32 m_ioThreads; - v_int32 m_timerThreads; - v_int32 m_threadsCount; - std::thread* m_threads; - SubmissionProcessor* m_processors; std::atomic m_balancer; private: - std::vector> m_workers; + std::vector> m_processorWorkers; + std::vector> m_allWorkers; private: void linkWorkers(const std::vector>& workers); public: /** * Constructor. - * @param processorThreads - number of data processing threads. - * @param ioThreads - number of I/O threads. - * @param timerThreads - number of timer threads. + * @param processorWorkersCount - number of data processing workers. + * @param ioWorkersCount - number of I/O processing workers. + * @param timerWorkersCount - number of timer processing workers. */ - Executor(v_int32 processorThreads = THREAD_NUM_DEFAULT, - v_int32 ioThreads = 1, - v_int32 timerThreads = 1); + Executor(v_int32 processorWorkersCount = THREAD_NUM_DEFAULT, + v_int32 ioWorkersCount = 1, + v_int32 timerWorkersCount = 1); /** * Non-virtual Destructor. @@ -128,8 +132,8 @@ public: */ template void execute(Args... params) { - auto& processor = m_processors[(++ m_balancer) % m_processorThreads]; - processor.execute(params...); + auto& processor = m_processorWorkers[(++ m_balancer) % m_processorWorkers.size()]; + processor->execute(params...); } /** diff --git a/src/oatpp/core/async/worker/IOEventWorker.hpp b/src/oatpp/core/async/worker/IOEventWorker.hpp index 522bc416..e7d4ca35 100644 --- a/src/oatpp/core/async/worker/IOEventWorker.hpp +++ b/src/oatpp/core/async/worker/IOEventWorker.hpp @@ -74,6 +74,8 @@ private: p_char8 m_inEvents; v_int32 m_inEventsCount; p_char8 m_outEvents; +private: + std::thread m_thread; private: void consumeBacklog(); void waitEvents(); @@ -109,13 +111,23 @@ public: /** * Run worker. */ - void run() override; + void run(); /** * Break run loop. */ void stop() override; + /** + * Join all worker-threads. + */ + void join() override; + + /** + * Detach all worker-threads. + */ + void detach() override; + }; }}} diff --git a/src/oatpp/core/async/worker/IOEventWorker_common.cpp b/src/oatpp/core/async/worker/IOEventWorker_common.cpp index f0172c75..0d673bf8 100644 --- a/src/oatpp/core/async/worker/IOEventWorker_common.cpp +++ b/src/oatpp/core/async/worker/IOEventWorker_common.cpp @@ -36,7 +36,9 @@ IOEventWorker::IOEventWorker() , m_inEvents(nullptr) , m_inEventsCount(0) , m_outEvents(nullptr) -{} +{ + m_thread = std::thread(&IOEventWorker::run, this); +} IOEventWorker::~IOEventWorker() { @@ -95,4 +97,12 @@ void IOEventWorker::stop() { triggerWakeup(); } +void IOEventWorker::join() { + m_thread.join(); +} + +void IOEventWorker::detach() { + m_thread.detach(); +} + }}} diff --git a/src/oatpp/core/async/worker/IOWorker.cpp b/src/oatpp/core/async/worker/IOWorker.cpp index b604628d..fb248773 100644 --- a/src/oatpp/core/async/worker/IOWorker.cpp +++ b/src/oatpp/core/async/worker/IOWorker.cpp @@ -33,7 +33,9 @@ namespace oatpp { namespace async { namespace worker { IOWorker::IOWorker() : Worker(Type::IO) , m_running(true) -{} +{ + m_thread = std::thread(&IOWorker::run, this); +} void IOWorker::pushTasks(oatpp::collection::FastQueue& tasks) { { @@ -165,4 +167,12 @@ void IOWorker::stop() { m_backlogCondition.notify_one(); } +void IOWorker::join() { + m_thread.join(); +} + +void IOWorker::detach() { + m_thread.detach(); +} + }}} \ No newline at end of file diff --git a/src/oatpp/core/async/worker/IOWorker.hpp b/src/oatpp/core/async/worker/IOWorker.hpp index b83ccb99..42566a62 100644 --- a/src/oatpp/core/async/worker/IOWorker.hpp +++ b/src/oatpp/core/async/worker/IOWorker.hpp @@ -46,6 +46,8 @@ private: oatpp::collection::FastQueue m_queue; oatpp::concurrency::SpinLock m_backlogLock; std::condition_variable_any m_backlogCondition; +private: + std::thread m_thread; private: void consumeBacklog(bool blockToConsume); public: @@ -70,13 +72,23 @@ public: /** * Run worker. */ - void run() override; + void run(); /** * Break run loop. */ void stop() override; + /** + * Join all worker-threads. + */ + void join() override; + + /** + * Detach all worker-threads. + */ + void detach() override; + }; }}} diff --git a/src/oatpp/core/async/worker/TimerWorker.cpp b/src/oatpp/core/async/worker/TimerWorker.cpp index 30e9a9a7..6eabca72 100644 --- a/src/oatpp/core/async/worker/TimerWorker.cpp +++ b/src/oatpp/core/async/worker/TimerWorker.cpp @@ -34,7 +34,9 @@ TimerWorker::TimerWorker(const std::chrono::duration& granu : Worker(Type::TIMER) , m_running(true) , m_granularity(granularity) -{} +{ + m_thread = std::thread(&TimerWorker::run, this); +} void TimerWorker::pushTasks(oatpp::collection::FastQueue& tasks) { { @@ -128,4 +130,12 @@ void TimerWorker::stop() { m_backlogCondition.notify_one(); } +void TimerWorker::join() { + m_thread.join(); +} + +void TimerWorker::detach() { + m_thread.detach(); +} + }}} \ No newline at end of file diff --git a/src/oatpp/core/async/worker/TimerWorker.hpp b/src/oatpp/core/async/worker/TimerWorker.hpp index 4d3e563c..013747da 100644 --- a/src/oatpp/core/async/worker/TimerWorker.hpp +++ b/src/oatpp/core/async/worker/TimerWorker.hpp @@ -48,6 +48,8 @@ private: std::condition_variable_any m_backlogCondition; private: std::chrono::duration m_granularity; +private: + std::thread m_thread; private: void consumeBacklog(); public: @@ -73,13 +75,23 @@ public: /** * Run worker. */ - void run() override; + void run(); /** * Break run loop. */ void stop() override; + /** + * Join all worker-threads. + */ + void join() override; + + /** + * Detach all worker-threads. + */ + void detach() override; + }; }}} diff --git a/src/oatpp/core/async/worker/Worker.hpp b/src/oatpp/core/async/worker/Worker.hpp index 63b00ac2..20ff018c 100644 --- a/src/oatpp/core/async/worker/Worker.hpp +++ b/src/oatpp/core/async/worker/Worker.hpp @@ -97,16 +97,21 @@ public: */ virtual void pushOneTask(AbstractCoroutine* task) = 0; - /** - * Run worker. - */ - virtual void run() = 0; - /** * Break run loop. */ virtual void stop() = 0; + /** + * Join all worker-threads. + */ + virtual void join() = 0; + + /** + * Detach all worker-threads. + */ + virtual void detach() = 0; + /** * Get worker type. * @return - one of &l:Worker::Type; values.