diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index dd7c2c32..dc729daa 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -23,8 +23,6 @@ add_library(oatpp oatpp/core/async/Worker.hpp oatpp/core/async/IOWorker.cpp oatpp/core/async/IOWorker.hpp - oatpp/core/async/Scheduler.cpp - oatpp/core/async/Scheduler.hpp oatpp/core/async/Executor.cpp oatpp/core/async/Executor.hpp oatpp/core/base/CommandLineArguments.cpp @@ -187,7 +185,7 @@ add_library(oatpp oatpp/web/url/mapping/Subscriber.hpp oatpp/core/parser/ParsingError.cpp oatpp/core/parser/ParsingError.hpp -) + oatpp/core/async/TimerWorker.cpp oatpp/core/async/TimerWorker.hpp) set_target_properties(oatpp PROPERTIES CXX_STANDARD 11 diff --git a/src/oatpp/core/async/Coroutine.cpp b/src/oatpp/core/async/Coroutine.cpp index 4d05bf34..a62735c3 100644 --- a/src/oatpp/core/async/Coroutine.cpp +++ b/src/oatpp/core/async/Coroutine.cpp @@ -165,13 +165,13 @@ AbstractCoroutine::AbstractCoroutine() Action AbstractCoroutine::iterate() { try { - return takeAction(_CP->call(_FP)); + return _CP->call(_FP); } catch (std::exception& e) { *m_propagatedError = std::make_shared(e.what()); - return takeAction(Action::TYPE_ERROR); + return Action::TYPE_ERROR; } catch (...) { *m_propagatedError = ERROR_UNKNOWN; - return takeAction(Action::TYPE_ERROR); + return Action::TYPE_ERROR; } }; diff --git a/src/oatpp/core/async/Coroutine.hpp b/src/oatpp/core/async/Coroutine.hpp index 3e694f15..3d71f012 100644 --- a/src/oatpp/core/async/Coroutine.hpp +++ b/src/oatpp/core/async/Coroutine.hpp @@ -41,6 +41,7 @@ namespace oatpp { namespace async { class AbstractCoroutine; // FWD class Processor; // FWD +class Worker; // FWD class CoroutineStarter; // FWD /** @@ -233,6 +234,7 @@ class AbstractCoroutine : public oatpp::base::Countable { friend oatpp::collection::FastQueue; friend Processor; friend CoroutineStarter; + friend Worker; public: /** * Convenience typedef for Action diff --git a/src/oatpp/core/async/Executor.cpp b/src/oatpp/core/async/Executor.cpp index baf07c07..5dce1a0c 100644 --- a/src/oatpp/core/async/Executor.cpp +++ b/src/oatpp/core/async/Executor.cpp @@ -24,6 +24,9 @@ #include "Executor.hpp" +#include "./IOWorker.hpp" +#include "./TimerWorker.hpp" + namespace oatpp { namespace async { //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// @@ -37,7 +40,7 @@ void Executor::SubmissionProcessor::run(){ while(m_isRunning) { m_processor.waitForTasks(); - while (m_processor.iterate(1000)) {} + while (m_processor.iterate(100)) {} } } @@ -57,8 +60,30 @@ Executor::Executor(v_int32 threadsCount) , m_threads(new std::thread[m_threadsCount]) , m_processors(new SubmissionProcessor[m_threadsCount]) { + + //auto ioWorker = std::make_shared(); + + for(v_int32 i = 0; i < 2; i++) { + m_workers.push_back(std::make_shared()); + } + for(v_int32 i = 0; i < m_threadsCount; i ++) { - m_threads[i] = std::thread(&SubmissionProcessor::run, &m_processors[i]); + + auto& processor = m_processors[i]; + + for(auto& worker : m_workers) { + processor.getProcessor().addWorker(worker); + } + +// for(v_int32 i = 0; i < 1; i++) { +// auto worker = std::make_shared(); +// m_workers.push_back(worker); +// processor.getProcessor().addWorker(worker); +// } + + + m_threads[i] = std::thread(&SubmissionProcessor::run, &processor); + } } @@ -83,6 +108,10 @@ void Executor::stop() { for(v_int32 i = 0; i < m_threadsCount; i ++) { m_processors[i].stop(); } + + for(auto& worker : m_workers) { + worker->stop(); + } } }} diff --git a/src/oatpp/core/async/Executor.hpp b/src/oatpp/core/async/Executor.hpp index f5b180f4..599c0d63 100644 --- a/src/oatpp/core/async/Executor.hpp +++ b/src/oatpp/core/async/Executor.hpp @@ -63,6 +63,10 @@ private: void execute(Args... params) { m_processor.execute(params...); } + + oatpp::async::Processor& getProcessor() { + return m_processor; + } }; @@ -77,6 +81,8 @@ private: std::thread* m_threads; SubmissionProcessor* m_processors; std::atomic m_balancer; +private: + std::vector> m_workers; public: /** @@ -114,9 +120,8 @@ public: */ template void execute(Args... params) { - auto& processor = m_processors[m_balancer % m_threadsCount]; + auto& processor = m_processors[(++ m_balancer) % m_threadsCount]; processor.execute(params...); - m_balancer ++; } }; diff --git a/src/oatpp/core/async/IOWorker.cpp b/src/oatpp/core/async/IOWorker.cpp index edb6cc8c..8a7e8a2f 100644 --- a/src/oatpp/core/async/IOWorker.cpp +++ b/src/oatpp/core/async/IOWorker.cpp @@ -24,44 +24,36 @@ #include "IOWorker.hpp" +#include "Processor.hpp" + #include namespace oatpp { namespace async { -void IOWorker::addTask(const Task& task) { +void IOWorker::pushTasks(oatpp::collection::FastQueue& tasks) { { std::lock_guard guard(m_backlogMutex); - m_backlog.pushBack(task); + oatpp::collection::FastQueue::moveAll(tasks, m_backlog); } m_backlogCondition.notify_one(); } -void IOWorker::queueBacklog() { - - auto curr = m_backlog.getFirstNode(); - while(curr != nullptr) { - m_queue.pushBack(new TaskSlot({curr->getData()})); - curr = curr->getNext(); - } - - m_backlog.clear(); - -} - void IOWorker::consumeBacklog(bool blockToConsume) { if(blockToConsume) { std::unique_lock lock(m_backlogMutex); - while (m_backlog.count() == 0) { + while (m_backlog.first == nullptr && m_running) { m_backlogCondition.wait(lock); } + oatpp::collection::FastQueue::moveAll(m_backlog, m_queue); + } else { std::unique_lock lock(m_backlogMutex, std::try_to_lock); if (lock.owns_lock()) { - queueBacklog(); + oatpp::collection::FastQueue::moveAll(m_backlog, m_queue); } } @@ -75,15 +67,16 @@ void IOWorker::work() { while(m_running) { - auto curr = m_queue.first; - if(curr != nullptr) { + auto CP = m_queue.first; + if(CP != nullptr) { - const Action &action = curr->task.coroutine->iterate(); + Action action = CP->iterate(); if (action.getType() == Action::TYPE_WAIT_FOR_IO) { m_queue.round(); } else { - curr->task.sender->addTask(curr->task); - m_queue.popFrontNoData(); + m_queue.popFront(); + setCoroutineScheduledAction(CP, std::move(action)); + getCoroutineProcessor(CP)->pushOneTaskFromIO(CP); } ++ sleepIteration; @@ -93,7 +86,8 @@ void IOWorker::work() { } ++ consumeIteration; - if(consumeIteration == 100) { + if(consumeIteration == 1000) { + consumeIteration = 0; consumeBacklog(false); } diff --git a/src/oatpp/core/async/IOWorker.hpp b/src/oatpp/core/async/IOWorker.hpp index 6a8e4b2c..c3055199 100644 --- a/src/oatpp/core/async/IOWorker.hpp +++ b/src/oatpp/core/async/IOWorker.hpp @@ -22,8 +22,8 @@ * ***************************************************************************/ -#ifndef oatpp_async_IOEventWorker_hpp -#define oatpp_async_IOEventWorker_hpp +#ifndef oatpp_async_IOWorker_hpp +#define oatpp_async_IOWorker_hpp #include "./Worker.hpp" #include "oatpp/core/collection/LinkedList.hpp" @@ -35,54 +35,41 @@ namespace oatpp { namespace async { class IOWorker : public Worker { -private: - - struct TaskSlot { - - static void* operator new(std::size_t sz) { - return ::operator new(sz); - } - - static void operator delete(void* ptr, std::size_t sz) { - ::operator delete(ptr); - } - - Task task; - TaskSlot* _ref; - - }; - -private: - typedef oatpp::collection::FastQueue Queue; - typedef oatpp::collection::LinkedList Backlog; private: bool m_running; - Backlog m_backlog; - Queue m_queue; + oatpp::collection::FastQueue m_backlog; + oatpp::collection::FastQueue m_queue; std::mutex m_backlogMutex; std::condition_variable m_backlogCondition; private: - void queueBacklog(); void consumeBacklog(bool blockToConsume); public: IOWorker() - : m_running(true) + : Worker(Type::IO) + , m_running(true) { std::thread thread(&IOWorker::work, this); thread.detach(); } - void addTask(const Task& task) override; + ~IOWorker() { + } + + void pushTasks(oatpp::collection::FastQueue& tasks) override; void work(); void stop() override { - m_running = false; + { + std::lock_guard lock(m_backlogMutex); + m_running = false; + } + m_backlogCondition.notify_one(); } }; }} -#endif //oatpp_async_IOEventWorker_hpp +#endif //oatpp_async_IOWorker_hpp diff --git a/src/oatpp/core/async/Processor.cpp b/src/oatpp/core/async/Processor.cpp index 97841e75..f113dd9f 100644 --- a/src/oatpp/core/async/Processor.cpp +++ b/src/oatpp/core/async/Processor.cpp @@ -23,65 +23,105 @@ ***************************************************************************/ #include "Processor.hpp" +#include "./Worker.hpp" namespace oatpp { namespace async { +void Processor::addWorker(const std::shared_ptr& worker) { + + switch(worker->getType()) { + + case Worker::Type::IO: + m_ioWorkers.push_back(worker); + m_ioPopQueues.push_back(collection::FastQueue()); + break; + + case Worker::Type::TIMER: + m_timerWorkers.push_back(worker); + m_timerPopQueues.push_back(collection::FastQueue()); + break; + + default: + break; + + } + +} + +void Processor::popIOTask(AbstractCoroutine* coroutine) { + if(m_ioPopQueues.size() > 0) { + auto &queue = m_ioPopQueues[(++m_ioBalancer) % m_ioPopQueues.size()]; + queue.pushBack(coroutine); + } else { + throw std::runtime_error("[oatpp::async::Processor::popIOTasks()]: Error. Processor has no I/O workers."); + } +} + +void Processor::popTimerTask(AbstractCoroutine* coroutine) { + if(m_timerPopQueues.size() > 0) { + auto &queue = m_timerPopQueues[(++m_timerBalancer) % m_timerPopQueues.size()]; + queue.pushBack(coroutine); + } else { + throw std::runtime_error("[oatpp::async::Processor::popTimerTask()]: Error. Processor has no Timer workers."); + } +} + void Processor::addCoroutine(AbstractCoroutine* coroutine) { if(coroutine->_PP == this) { - const Action& action = coroutine->_SCH_A; + const Action& action = coroutine->takeAction(std::move(coroutine->_SCH_A)); switch(action.m_type) { case Action::TYPE_WAIT_FOR_IO: coroutine->_SCH_A = Action::clone(action); - m_queue.popFront(); - m_sch_pop_io_tmp.pushBack(coroutine); + popIOTask(coroutine); break; case Action::TYPE_WAIT_RETRY: coroutine->_SCH_A = Action::clone(action); - m_queue.popFront(); - m_sch_pop_timer_tmp.pushBack(coroutine); + popTimerTask(coroutine); break; + default: + m_queue.pushBack(coroutine); + } action.m_type = Action::TYPE_NONE; - m_queue.pushBack(coroutine); - } else { throw std::runtime_error("[oatpp::async::processor::addCoroutine()]: Error. Attempt to schedule coroutine to wrong processor."); } } -void Processor::pushTaskFromIO(AbstractCoroutine* coroutine) { - std::lock_guard lock(m_sch_push_io_mutex); - m_sch_push_io.pushBack(coroutine); +void Processor::pushOneTaskFromIO(AbstractCoroutine* coroutine) { + { + std::lock_guard waitLock(m_waitMutex); + std::lock_guard lock(m_sch_push_io_mutex); + m_sch_push_io.pushBack(coroutine); + } m_waitCondition.notify_one(); } -void Processor::pushTaskFromTimer(AbstractCoroutine* coroutine) { - std::lock_guard lock(m_sch_push_timer_mutex); - m_sch_push_timer.pushBack(coroutine); +void Processor::pushOneTaskFromTimer(AbstractCoroutine* coroutine) { + { + std::lock_guard waitLock(m_waitMutex); + std::lock_guard lock(m_sch_push_timer_mutex); + m_sch_push_timer.pushBack(coroutine); + } m_waitCondition.notify_one(); } -void Processor::popIOTasks(oatpp::collection::FastQueue& queue) { - if(m_sch_pop_io.first != nullptr) { - std::lock_guard lock(m_sch_pop_io_mutex); - collection::FastQueue::moveAll(m_sch_pop_io, queue); - } -} - -void Processor::popTimerTasks(oatpp::collection::FastQueue& queue) { - if(m_sch_pop_timer.first != nullptr) { - std::lock_guard lock(m_sch_pop_timer_mutex); - collection::FastQueue::moveAll(m_sch_pop_timer, queue); +void Processor::pushTasksFromTimer(oatpp::collection::FastQueue& tasks) { + { + std::lock_guard waitLock(m_waitMutex); + std::lock_guard lock(m_sch_push_timer_mutex); + collection::FastQueue::moveAll(tasks, m_sch_push_timer); } + m_waitCondition.notify_one(); } void Processor::waitForTasks() { @@ -93,34 +133,33 @@ void Processor::waitForTasks() { } -void Processor::popTmpQueues() { +void Processor::popTasks() { - { - std::lock_guard lock(m_sch_pop_io_mutex); - collection::FastQueue::moveAll(m_sch_pop_io_tmp, m_sch_pop_io); + for(v_int32 i = 0; i < m_ioWorkers.size(); i++) { + auto& worker = m_ioWorkers[i]; + auto& popQueue = m_ioPopQueues[i]; + worker->pushTasks(popQueue); } - { - std::lock_guard lock(m_sch_pop_timer_mutex); - collection::FastQueue::moveAll(m_sch_pop_timer_tmp, m_sch_pop_timer); + for(v_int32 i = 0; i < m_timerWorkers.size(); i++) { + auto& worker = m_timerWorkers[i]; + auto& popQueue = m_timerPopQueues[i]; + worker->pushTasks(popQueue); } } void Processor::pushAllFromQueue(oatpp::collection::FastQueue& pushQueue) { - auto curr = pushQueue.first; - while(curr != nullptr) { - addCoroutine(curr); - curr = curr->_ref; + while(pushQueue.first != nullptr) { + addCoroutine(pushQueue.popFront()); } - pushQueue.first = nullptr; - pushQueue.last = nullptr; - pushQueue.count = 0; } void Processor::consumeAllTasks() { for(auto& submission : m_taskList) { - m_queue.pushBack(submission->createCoroutine()); + auto coroutine = submission->createCoroutine(); + coroutine->_PP = this; + m_queue.pushBack(coroutine); } m_taskList.clear(); } @@ -172,43 +211,48 @@ bool Processor::iterate(v_int32 numIterations) { pushQueues(); for(v_int32 i = 0; i < numIterations; i++) { - - auto CP = m_queue.first; - if(CP == nullptr) { - break; - } - if(CP->finished()) { - m_queue.popFrontNoData(); - } else { - const Action& action = CP->iterate(); + for(v_int32 j = 0; j < 10; j ++) { - switch(action.m_type) { + auto CP = m_queue.first; + if (CP == nullptr) { + break; + } + if (CP->finished()) { + m_queue.popFrontNoData(); + } else { - case Action::TYPE_WAIT_FOR_IO: - CP->_SCH_A = Action::clone(action); - m_queue.popFront(); - m_sch_pop_io_tmp.pushBack(CP); - OATPP_LOGD("Processor", "push to IO"); - break; + const Action &action = CP->takeAction(CP->iterate()); - case Action::TYPE_WAIT_RETRY: - CP->_SCH_A = Action::clone(action); - m_queue.popFront(); - m_sch_pop_timer_tmp.pushBack(CP); - OATPP_LOGD("Processor", "push to Timer"); - break; + switch (action.m_type) { + + case Action::TYPE_WAIT_FOR_IO: + CP->_SCH_A = Action::clone(action); + m_queue.popFront(); + popIOTask(CP); + break; + + case Action::TYPE_WAIT_RETRY: + CP->_SCH_A = Action::clone(action); + m_queue.popFront(); + popTimerTask(CP); + break; + +// default: +// m_queue.round(); + } + + action.m_type = Action::TYPE_NONE; - default: - m_queue.round(); } - action.m_type = Action::TYPE_NONE; - } + + m_queue.round(); + } - popTmpQueues(); + popTasks(); return m_queue.first != nullptr || m_sch_push_io.first != nullptr || @@ -218,6 +262,10 @@ bool Processor::iterate(v_int32 numIterations) { } void Processor::stop() { + { + std::lock_guard lock(m_waitMutex); + m_running = false; + } m_waitCondition.notify_one(); } diff --git a/src/oatpp/core/async/Processor.hpp b/src/oatpp/core/async/Processor.hpp index 1bf0f625..4783ad39 100644 --- a/src/oatpp/core/async/Processor.hpp +++ b/src/oatpp/core/async/Processor.hpp @@ -30,6 +30,7 @@ #include #include +#include namespace oatpp { namespace async { @@ -81,14 +82,6 @@ private: }; -private: - - oatpp::collection::FastQueue m_sch_pop_io; - oatpp::collection::FastQueue m_sch_pop_timer; - - std::mutex m_sch_pop_io_mutex; - std::mutex m_sch_pop_timer_mutex; - private: oatpp::collection::FastQueue m_sch_push_io; @@ -99,8 +92,14 @@ private: private: - oatpp::collection::FastQueue m_sch_pop_io_tmp; - oatpp::collection::FastQueue m_sch_pop_timer_tmp; + std::vector> m_ioWorkers; + std::vector> m_timerWorkers; + + std::vector> m_ioPopQueues; + std::vector> m_timerPopQueues; + + v_word32 m_ioBalancer = 0; + v_word32 m_timerBalancer = 0; private: @@ -122,37 +121,32 @@ private: private: + void popIOTask(AbstractCoroutine* coroutine); + void popTimerTask(AbstractCoroutine* coroutine); + void consumeAllTasks(); void addCoroutine(AbstractCoroutine* coroutine); - void popTmpQueues(); + void popTasks(); void pushAllFromQueue(oatpp::collection::FastQueue& pushQueue); void pushQueues(); public: + void addWorker(const std::shared_ptr& worker); + /** * Return coroutine scheduled for I/O back to owner processor. * @param coroutine */ - void pushTaskFromIO(AbstractCoroutine* coroutine); + void pushOneTaskFromIO(AbstractCoroutine* coroutine); /** * Return coroutine scheduled for Timer back to owner processor. * @param coroutine */ - void pushTaskFromTimer(AbstractCoroutine* coroutine); + void pushOneTaskFromTimer(AbstractCoroutine* coroutine); - /** - * Move all waiting for I/O tasks to specified queue. - * @param queue - */ - void popIOTasks(oatpp::collection::FastQueue& queue); - - /** - * Move all waiting for Timer tasks to specified queue. - * @param queue - */ - void popTimerTasks(oatpp::collection::FastQueue& queue); + void pushTasksFromTimer(oatpp::collection::FastQueue& tasks); /** * Execute Coroutine. diff --git a/src/oatpp/core/async/Scheduler.cpp b/src/oatpp/core/async/Scheduler.cpp deleted file mode 100644 index 29fc9174..00000000 --- a/src/oatpp/core/async/Scheduler.cpp +++ /dev/null @@ -1,25 +0,0 @@ -/*************************************************************************** - * - * Project _____ __ ____ _ _ - * ( _ ) /__\ (_ _)_| |_ _| |_ - * )(_)( /(__)\ )( (_ _)(_ _) - * (_____)(__)(__)(__) |_| |_| - * - * - * Copyright 2018-present, Leonid Stryzhevskyi - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - ***************************************************************************/ - -#include "Scheduler.hpp" diff --git a/src/oatpp/core/async/TimerWorker.cpp b/src/oatpp/core/async/TimerWorker.cpp new file mode 100644 index 00000000..1ce8d90e --- /dev/null +++ b/src/oatpp/core/async/TimerWorker.cpp @@ -0,0 +1,99 @@ +/*************************************************************************** + * + * Project _____ __ ____ _ _ + * ( _ ) /__\ (_ _)_| |_ _| |_ + * )(_)( /(__)\ )( (_ _)(_ _) + * (_____)(__)(__)(__) |_| |_| + * + * + * Copyright 2018-present, Leonid Stryzhevskyi + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + ***************************************************************************/ + +#include "TimerWorker.hpp" + +#include "Processor.hpp" + +#include + +namespace oatpp { namespace async { + +void TimerWorker::pushTasks(oatpp::collection::FastQueue& tasks) { + { + std::lock_guard guard(m_backlogMutex); + oatpp::collection::FastQueue::moveAll(tasks, m_backlog); + } + m_backlogCondition.notify_one(); +} + +void TimerWorker::consumeBacklog(bool blockToConsume) { + + if(blockToConsume) { + + std::unique_lock lock(m_backlogMutex); + while (m_backlog.first == nullptr && m_running) { + m_backlogCondition.wait(lock); + } + oatpp::collection::FastQueue::moveAll(m_backlog, m_queue); + } else { + + std::unique_lock lock(m_backlogMutex, std::try_to_lock); + if (lock.owns_lock()) { + oatpp::collection::FastQueue::moveAll(m_backlog, m_queue); + } + + } + +} + +void TimerWorker::work() { + + v_int32 consumeIteration = 0; + v_int32 roundIteration = 0; + + while(m_running) { + + auto CP = m_queue.first; + if(CP != nullptr) { + + Action action = CP->iterate(); + if (action.getType() == Action::TYPE_WAIT_RETRY) { + ++ roundIteration; + if(roundIteration == 10) { + roundIteration = 0; + m_queue.round(); + } + } else { + roundIteration = 0; + m_queue.popFront(); + setCoroutineScheduledAction(CP, std::move(action)); + getCoroutineProcessor(CP)->pushOneTaskFromTimer(CP); + } + + ++ consumeIteration; + if(consumeIteration == 100) { + consumeIteration = 0; + consumeBacklog(false); + } + + } else { + consumeBacklog(true); + } + + } + +} + +}} \ No newline at end of file diff --git a/src/oatpp/core/async/Scheduler.hpp b/src/oatpp/core/async/TimerWorker.hpp similarity index 50% rename from src/oatpp/core/async/Scheduler.hpp rename to src/oatpp/core/async/TimerWorker.hpp index 5b68bb73..0abe1a7d 100644 --- a/src/oatpp/core/async/Scheduler.hpp +++ b/src/oatpp/core/async/TimerWorker.hpp @@ -22,39 +22,51 @@ * ***************************************************************************/ -#ifndef oatpp_async_Scheduler_hpp -#define oatpp_async_Scheduler_hpp +#ifndef oatpp_async_TimerWorker_hpp +#define oatpp_async_TimerWorker_hpp -#include "oatpp/core/Types.hpp" +#include "./Worker.hpp" +#include "oatpp/core/collection/LinkedList.hpp" -class Scheduler { +#include +#include +#include + +namespace oatpp { namespace async { + +class TimerWorker : public Worker { +private: + bool m_running; + oatpp::collection::FastQueue m_backlog; + oatpp::collection::FastQueue m_queue; + std::mutex m_backlogMutex; + std::condition_variable m_backlogCondition; +private: + void consumeBacklog(bool blockToConsume); public: - enum WorkerType : v_int32 { - - /** - * Worker type - general processor. - */ - PROCESSOR = 0, - - /** - * Worker type - timer processor. - */ - TIMER = 1, - - /** - * Worker type - I/O processor. - */ - IO = 2 - - }; - -private: + TimerWorker() + : Worker(Type::TIMER) + , m_running(true) + { + std::thread thread(&TimerWorker::work, this); + thread.detach(); + } + void pushTasks(oatpp::collection::FastQueue& tasks) override; + void work(); + void stop() override { + { + std::lock_guard lock(m_backlogMutex); + m_running = false; + } + m_backlogCondition.notify_one(); + } }; +}} -#endif //oatpp_async_Scheduler_hpp +#endif //oatpp_async_TimerWorker_hpp diff --git a/src/oatpp/core/async/Worker.cpp b/src/oatpp/core/async/Worker.cpp index 630af964..923167f7 100644 --- a/src/oatpp/core/async/Worker.cpp +++ b/src/oatpp/core/async/Worker.cpp @@ -23,3 +23,15 @@ ***************************************************************************/ #include "Worker.hpp" + +namespace oatpp { namespace async { + +void Worker::setCoroutineScheduledAction(AbstractCoroutine *CP, Action &&action) { + CP->_SCH_A = std::forward(action); +} + +Processor* Worker::getCoroutineProcessor(AbstractCoroutine* CP) { + return CP->_PP; +} + +}} \ No newline at end of file diff --git a/src/oatpp/core/async/Worker.hpp b/src/oatpp/core/async/Worker.hpp index f527015f..014c666e 100644 --- a/src/oatpp/core/async/Worker.hpp +++ b/src/oatpp/core/async/Worker.hpp @@ -32,35 +32,51 @@ namespace oatpp { namespace async { class Worker { public: - struct Task { + enum Type : v_int32 { - mutable Action action; - AbstractCoroutine* coroutine; - Worker* sender; + /** + * Worker type - general processor. + */ + PROCESSOR = 0, - Task(const Task& other) - : action(std::move(other.action)) - , coroutine(other.coroutine) - , sender(other.sender) - {} + /** + * Worker type - timer processor. + */ + TIMER = 1, - Task& operator = (const Task& other) { - action = std::move(other.action); - coroutine = other.coroutine; - sender = other.sender; - return *this; - } + /** + * Worker type - I/O processor. + */ + IO = 2, + + /** + * Number of types in this enum. + */ + TYPES_COUNT = 3 }; +private: + Type m_type; +protected: + void setCoroutineScheduledAction(AbstractCoroutine* CP, Action&& action); + Processor* getCoroutineProcessor(AbstractCoroutine* CP); public: + Worker(Type type) + : m_type(type) + {} + virtual ~Worker() = default; - virtual void addTask(const Task& task) = 0; + virtual void pushTasks(oatpp::collection::FastQueue& tasks) = 0; virtual void stop() = 0; + Type getType() { + return m_type; + } + }; }} diff --git a/src/oatpp/core/collection/FastQueue.hpp b/src/oatpp/core/collection/FastQueue.hpp index cc5e8532..693c883d 100644 --- a/src/oatpp/core/collection/FastQueue.hpp +++ b/src/oatpp/core/collection/FastQueue.hpp @@ -37,6 +37,7 @@ public: FastQueue() : first(nullptr) , last(nullptr) + , count(0) {} ~FastQueue(){ @@ -69,10 +70,12 @@ public: } void round(){ - last->_ref = first; - last = first; - first = first->_ref; - last->_ref = nullptr; + if(count > 1) { + last->_ref = first; + last = first; + first = first->_ref; + last->_ref = nullptr; + } } T* popFront() { @@ -94,56 +97,27 @@ public: delete result; -- count; } - - void removeEntry(T* entry, T* prevEntry){ - - if(prevEntry == nullptr) { - popFrontNoData(); - } else if(entry->_ref == nullptr) { - prevEntry->_ref = nullptr; - last = prevEntry; - delete entry; - -- count; - } else { - prevEntry->_ref = entry->_ref; - delete entry; - -- count; - } - } - - static void moveEntry(FastQueue& fromQueue, FastQueue& toQueue, T* entry, T* prevEntry){ - - if(prevEntry == nullptr) { - toQueue.pushFront(fromQueue.popFront()); - } else if(entry->_ref == nullptr) { - toQueue.pushBack(entry); - fromQueue.last = prevEntry; - prevEntry->_ref = nullptr; - -- fromQueue.count; - } else { - prevEntry->_ref = entry->_ref; - toQueue.pushBack(entry); - -- fromQueue.count; - } - - } static void moveAll(FastQueue& fromQueue, FastQueue& toQueue) { - if(toQueue.last == nullptr) { - toQueue.first = fromQueue.first; - toQueue.last = fromQueue.last; - } else { - toQueue.last->_ref = fromQueue.first; - toQueue.last = fromQueue.last; + if(fromQueue.count > 0) { + + if (toQueue.last == nullptr) { + toQueue.first = fromQueue.first; + toQueue.last = fromQueue.last; + } else { + toQueue.last->_ref = fromQueue.first; + toQueue.last = fromQueue.last; + } + + toQueue.count += fromQueue.count; + fromQueue.count = 0; + + fromQueue.first = nullptr; + fromQueue.last = nullptr; + } - toQueue.count += fromQueue.count; - fromQueue.count = 0; - - fromQueue.first = nullptr; - fromQueue.last = nullptr; - } void clear() {