draft of async processor with scheduling

This commit is contained in:
lganzzzo 2019-04-14 02:53:47 +03:00
parent 413cc65e73
commit cac219d027
15 changed files with 411 additions and 266 deletions

View File

@ -23,8 +23,6 @@ add_library(oatpp
oatpp/core/async/Worker.hpp
oatpp/core/async/IOWorker.cpp
oatpp/core/async/IOWorker.hpp
oatpp/core/async/Scheduler.cpp
oatpp/core/async/Scheduler.hpp
oatpp/core/async/Executor.cpp
oatpp/core/async/Executor.hpp
oatpp/core/base/CommandLineArguments.cpp
@ -187,7 +185,7 @@ add_library(oatpp
oatpp/web/url/mapping/Subscriber.hpp
oatpp/core/parser/ParsingError.cpp
oatpp/core/parser/ParsingError.hpp
)
oatpp/core/async/TimerWorker.cpp oatpp/core/async/TimerWorker.hpp)
set_target_properties(oatpp PROPERTIES
CXX_STANDARD 11

View File

@ -165,13 +165,13 @@ AbstractCoroutine::AbstractCoroutine()
Action AbstractCoroutine::iterate() {
try {
return takeAction(_CP->call(_FP));
return _CP->call(_FP);
} catch (std::exception& e) {
*m_propagatedError = std::make_shared<Error>(e.what());
return takeAction(Action::TYPE_ERROR);
return Action::TYPE_ERROR;
} catch (...) {
*m_propagatedError = ERROR_UNKNOWN;
return takeAction(Action::TYPE_ERROR);
return Action::TYPE_ERROR;
}
};

View File

@ -41,6 +41,7 @@ namespace oatpp { namespace async {
class AbstractCoroutine; // FWD
class Processor; // FWD
class Worker; // FWD
class CoroutineStarter; // FWD
/**
@ -233,6 +234,7 @@ class AbstractCoroutine : public oatpp::base::Countable {
friend oatpp::collection::FastQueue<AbstractCoroutine>;
friend Processor;
friend CoroutineStarter;
friend Worker;
public:
/**
* Convenience typedef for Action

View File

@ -24,6 +24,9 @@
#include "Executor.hpp"
#include "./IOWorker.hpp"
#include "./TimerWorker.hpp"
namespace oatpp { namespace async {
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
@ -37,7 +40,7 @@ void Executor::SubmissionProcessor::run(){
while(m_isRunning) {
m_processor.waitForTasks();
while (m_processor.iterate(1000)) {}
while (m_processor.iterate(100)) {}
}
}
@ -57,8 +60,30 @@ Executor::Executor(v_int32 threadsCount)
, m_threads(new std::thread[m_threadsCount])
, m_processors(new SubmissionProcessor[m_threadsCount])
{
//auto ioWorker = std::make_shared<IOWorker>();
for(v_int32 i = 0; i < 2; i++) {
m_workers.push_back(std::make_shared<TimerWorker>());
}
for(v_int32 i = 0; i < m_threadsCount; i ++) {
m_threads[i] = std::thread(&SubmissionProcessor::run, &m_processors[i]);
auto& processor = m_processors[i];
for(auto& worker : m_workers) {
processor.getProcessor().addWorker(worker);
}
// for(v_int32 i = 0; i < 1; i++) {
// auto worker = std::make_shared<TimerWorker>();
// m_workers.push_back(worker);
// processor.getProcessor().addWorker(worker);
// }
m_threads[i] = std::thread(&SubmissionProcessor::run, &processor);
}
}
@ -83,6 +108,10 @@ void Executor::stop() {
for(v_int32 i = 0; i < m_threadsCount; i ++) {
m_processors[i].stop();
}
for(auto& worker : m_workers) {
worker->stop();
}
}
}}

View File

@ -63,6 +63,10 @@ private:
void execute(Args... params) {
m_processor.execute<CoroutineType, Args...>(params...);
}
oatpp::async::Processor& getProcessor() {
return m_processor;
}
};
@ -77,6 +81,8 @@ private:
std::thread* m_threads;
SubmissionProcessor* m_processors;
std::atomic<v_word32> m_balancer;
private:
std::vector<std::shared_ptr<Worker>> m_workers;
public:
/**
@ -114,9 +120,8 @@ public:
*/
template<typename CoroutineType, typename ... Args>
void execute(Args... params) {
auto& processor = m_processors[m_balancer % m_threadsCount];
auto& processor = m_processors[(++ m_balancer) % m_threadsCount];
processor.execute<CoroutineType, Args...>(params...);
m_balancer ++;
}
};

View File

@ -24,44 +24,36 @@
#include "IOWorker.hpp"
#include "Processor.hpp"
#include <chrono>
namespace oatpp { namespace async {
void IOWorker::addTask(const Task& task) {
void IOWorker::pushTasks(oatpp::collection::FastQueue<AbstractCoroutine>& tasks) {
{
std::lock_guard<std::mutex> guard(m_backlogMutex);
m_backlog.pushBack(task);
oatpp::collection::FastQueue<AbstractCoroutine>::moveAll(tasks, m_backlog);
}
m_backlogCondition.notify_one();
}
void IOWorker::queueBacklog() {
auto curr = m_backlog.getFirstNode();
while(curr != nullptr) {
m_queue.pushBack(new TaskSlot({curr->getData()}));
curr = curr->getNext();
}
m_backlog.clear();
}
void IOWorker::consumeBacklog(bool blockToConsume) {
if(blockToConsume) {
std::unique_lock<std::mutex> lock(m_backlogMutex);
while (m_backlog.count() == 0) {
while (m_backlog.first == nullptr && m_running) {
m_backlogCondition.wait(lock);
}
oatpp::collection::FastQueue<AbstractCoroutine>::moveAll(m_backlog, m_queue);
} else {
std::unique_lock<std::mutex> lock(m_backlogMutex, std::try_to_lock);
if (lock.owns_lock()) {
queueBacklog();
oatpp::collection::FastQueue<AbstractCoroutine>::moveAll(m_backlog, m_queue);
}
}
@ -75,15 +67,16 @@ void IOWorker::work() {
while(m_running) {
auto curr = m_queue.first;
if(curr != nullptr) {
auto CP = m_queue.first;
if(CP != nullptr) {
const Action &action = curr->task.coroutine->iterate();
Action action = CP->iterate();
if (action.getType() == Action::TYPE_WAIT_FOR_IO) {
m_queue.round();
} else {
curr->task.sender->addTask(curr->task);
m_queue.popFrontNoData();
m_queue.popFront();
setCoroutineScheduledAction(CP, std::move(action));
getCoroutineProcessor(CP)->pushOneTaskFromIO(CP);
}
++ sleepIteration;
@ -93,7 +86,8 @@ void IOWorker::work() {
}
++ consumeIteration;
if(consumeIteration == 100) {
if(consumeIteration == 1000) {
consumeIteration = 0;
consumeBacklog(false);
}

View File

@ -22,8 +22,8 @@
*
***************************************************************************/
#ifndef oatpp_async_IOEventWorker_hpp
#define oatpp_async_IOEventWorker_hpp
#ifndef oatpp_async_IOWorker_hpp
#define oatpp_async_IOWorker_hpp
#include "./Worker.hpp"
#include "oatpp/core/collection/LinkedList.hpp"
@ -35,54 +35,41 @@
namespace oatpp { namespace async {
class IOWorker : public Worker {
private:
struct TaskSlot {
static void* operator new(std::size_t sz) {
return ::operator new(sz);
}
static void operator delete(void* ptr, std::size_t sz) {
::operator delete(ptr);
}
Task task;
TaskSlot* _ref;
};
private:
typedef oatpp::collection::FastQueue<TaskSlot> Queue;
typedef oatpp::collection::LinkedList<Task> Backlog;
private:
bool m_running;
Backlog m_backlog;
Queue m_queue;
oatpp::collection::FastQueue<AbstractCoroutine> m_backlog;
oatpp::collection::FastQueue<AbstractCoroutine> m_queue;
std::mutex m_backlogMutex;
std::condition_variable m_backlogCondition;
private:
void queueBacklog();
void consumeBacklog(bool blockToConsume);
public:
IOWorker()
: m_running(true)
: Worker(Type::IO)
, m_running(true)
{
std::thread thread(&IOWorker::work, this);
thread.detach();
}
void addTask(const Task& task) override;
~IOWorker() {
}
void pushTasks(oatpp::collection::FastQueue<AbstractCoroutine>& tasks) override;
void work();
void stop() override {
m_running = false;
{
std::lock_guard<std::mutex> lock(m_backlogMutex);
m_running = false;
}
m_backlogCondition.notify_one();
}
};
}}
#endif //oatpp_async_IOEventWorker_hpp
#endif //oatpp_async_IOWorker_hpp

View File

@ -23,65 +23,105 @@
***************************************************************************/
#include "Processor.hpp"
#include "./Worker.hpp"
namespace oatpp { namespace async {
void Processor::addWorker(const std::shared_ptr<Worker>& worker) {
switch(worker->getType()) {
case Worker::Type::IO:
m_ioWorkers.push_back(worker);
m_ioPopQueues.push_back(collection::FastQueue<AbstractCoroutine>());
break;
case Worker::Type::TIMER:
m_timerWorkers.push_back(worker);
m_timerPopQueues.push_back(collection::FastQueue<AbstractCoroutine>());
break;
default:
break;
}
}
void Processor::popIOTask(AbstractCoroutine* coroutine) {
if(m_ioPopQueues.size() > 0) {
auto &queue = m_ioPopQueues[(++m_ioBalancer) % m_ioPopQueues.size()];
queue.pushBack(coroutine);
} else {
throw std::runtime_error("[oatpp::async::Processor::popIOTasks()]: Error. Processor has no I/O workers.");
}
}
void Processor::popTimerTask(AbstractCoroutine* coroutine) {
if(m_timerPopQueues.size() > 0) {
auto &queue = m_timerPopQueues[(++m_timerBalancer) % m_timerPopQueues.size()];
queue.pushBack(coroutine);
} else {
throw std::runtime_error("[oatpp::async::Processor::popTimerTask()]: Error. Processor has no Timer workers.");
}
}
void Processor::addCoroutine(AbstractCoroutine* coroutine) {
if(coroutine->_PP == this) {
const Action& action = coroutine->_SCH_A;
const Action& action = coroutine->takeAction(std::move(coroutine->_SCH_A));
switch(action.m_type) {
case Action::TYPE_WAIT_FOR_IO:
coroutine->_SCH_A = Action::clone(action);
m_queue.popFront();
m_sch_pop_io_tmp.pushBack(coroutine);
popIOTask(coroutine);
break;
case Action::TYPE_WAIT_RETRY:
coroutine->_SCH_A = Action::clone(action);
m_queue.popFront();
m_sch_pop_timer_tmp.pushBack(coroutine);
popTimerTask(coroutine);
break;
default:
m_queue.pushBack(coroutine);
}
action.m_type = Action::TYPE_NONE;
m_queue.pushBack(coroutine);
} else {
throw std::runtime_error("[oatpp::async::processor::addCoroutine()]: Error. Attempt to schedule coroutine to wrong processor.");
}
}
void Processor::pushTaskFromIO(AbstractCoroutine* coroutine) {
std::lock_guard<std::mutex> lock(m_sch_push_io_mutex);
m_sch_push_io.pushBack(coroutine);
void Processor::pushOneTaskFromIO(AbstractCoroutine* coroutine) {
{
std::lock_guard<std::mutex> waitLock(m_waitMutex);
std::lock_guard<std::mutex> lock(m_sch_push_io_mutex);
m_sch_push_io.pushBack(coroutine);
}
m_waitCondition.notify_one();
}
void Processor::pushTaskFromTimer(AbstractCoroutine* coroutine) {
std::lock_guard<std::mutex> lock(m_sch_push_timer_mutex);
m_sch_push_timer.pushBack(coroutine);
void Processor::pushOneTaskFromTimer(AbstractCoroutine* coroutine) {
{
std::lock_guard<std::mutex> waitLock(m_waitMutex);
std::lock_guard<std::mutex> lock(m_sch_push_timer_mutex);
m_sch_push_timer.pushBack(coroutine);
}
m_waitCondition.notify_one();
}
void Processor::popIOTasks(oatpp::collection::FastQueue<AbstractCoroutine>& queue) {
if(m_sch_pop_io.first != nullptr) {
std::lock_guard<std::mutex> lock(m_sch_pop_io_mutex);
collection::FastQueue<AbstractCoroutine>::moveAll(m_sch_pop_io, queue);
}
}
void Processor::popTimerTasks(oatpp::collection::FastQueue<AbstractCoroutine>& queue) {
if(m_sch_pop_timer.first != nullptr) {
std::lock_guard<std::mutex> lock(m_sch_pop_timer_mutex);
collection::FastQueue<AbstractCoroutine>::moveAll(m_sch_pop_timer, queue);
void Processor::pushTasksFromTimer(oatpp::collection::FastQueue<AbstractCoroutine>& tasks) {
{
std::lock_guard<std::mutex> waitLock(m_waitMutex);
std::lock_guard<std::mutex> lock(m_sch_push_timer_mutex);
collection::FastQueue<AbstractCoroutine>::moveAll(tasks, m_sch_push_timer);
}
m_waitCondition.notify_one();
}
void Processor::waitForTasks() {
@ -93,34 +133,33 @@ void Processor::waitForTasks() {
}
void Processor::popTmpQueues() {
void Processor::popTasks() {
{
std::lock_guard<std::mutex> lock(m_sch_pop_io_mutex);
collection::FastQueue<AbstractCoroutine>::moveAll(m_sch_pop_io_tmp, m_sch_pop_io);
for(v_int32 i = 0; i < m_ioWorkers.size(); i++) {
auto& worker = m_ioWorkers[i];
auto& popQueue = m_ioPopQueues[i];
worker->pushTasks(popQueue);
}
{
std::lock_guard<std::mutex> lock(m_sch_pop_timer_mutex);
collection::FastQueue<AbstractCoroutine>::moveAll(m_sch_pop_timer_tmp, m_sch_pop_timer);
for(v_int32 i = 0; i < m_timerWorkers.size(); i++) {
auto& worker = m_timerWorkers[i];
auto& popQueue = m_timerPopQueues[i];
worker->pushTasks(popQueue);
}
}
void Processor::pushAllFromQueue(oatpp::collection::FastQueue<AbstractCoroutine>& pushQueue) {
auto curr = pushQueue.first;
while(curr != nullptr) {
addCoroutine(curr);
curr = curr->_ref;
while(pushQueue.first != nullptr) {
addCoroutine(pushQueue.popFront());
}
pushQueue.first = nullptr;
pushQueue.last = nullptr;
pushQueue.count = 0;
}
void Processor::consumeAllTasks() {
for(auto& submission : m_taskList) {
m_queue.pushBack(submission->createCoroutine());
auto coroutine = submission->createCoroutine();
coroutine->_PP = this;
m_queue.pushBack(coroutine);
}
m_taskList.clear();
}
@ -172,43 +211,48 @@ bool Processor::iterate(v_int32 numIterations) {
pushQueues();
for(v_int32 i = 0; i < numIterations; i++) {
auto CP = m_queue.first;
if(CP == nullptr) {
break;
}
if(CP->finished()) {
m_queue.popFrontNoData();
} else {
const Action& action = CP->iterate();
for(v_int32 j = 0; j < 10; j ++) {
switch(action.m_type) {
auto CP = m_queue.first;
if (CP == nullptr) {
break;
}
if (CP->finished()) {
m_queue.popFrontNoData();
} else {
case Action::TYPE_WAIT_FOR_IO:
CP->_SCH_A = Action::clone(action);
m_queue.popFront();
m_sch_pop_io_tmp.pushBack(CP);
OATPP_LOGD("Processor", "push to IO");
break;
const Action &action = CP->takeAction(CP->iterate());
case Action::TYPE_WAIT_RETRY:
CP->_SCH_A = Action::clone(action);
m_queue.popFront();
m_sch_pop_timer_tmp.pushBack(CP);
OATPP_LOGD("Processor", "push to Timer");
break;
switch (action.m_type) {
case Action::TYPE_WAIT_FOR_IO:
CP->_SCH_A = Action::clone(action);
m_queue.popFront();
popIOTask(CP);
break;
case Action::TYPE_WAIT_RETRY:
CP->_SCH_A = Action::clone(action);
m_queue.popFront();
popTimerTask(CP);
break;
// default:
// m_queue.round();
}
action.m_type = Action::TYPE_NONE;
default:
m_queue.round();
}
action.m_type = Action::TYPE_NONE;
}
m_queue.round();
}
popTmpQueues();
popTasks();
return m_queue.first != nullptr ||
m_sch_push_io.first != nullptr ||
@ -218,6 +262,10 @@ bool Processor::iterate(v_int32 numIterations) {
}
void Processor::stop() {
{
std::lock_guard<std::mutex> lock(m_waitMutex);
m_running = false;
}
m_waitCondition.notify_one();
}

View File

@ -30,6 +30,7 @@
#include <mutex>
#include <list>
#include <vector>
namespace oatpp { namespace async {
@ -81,14 +82,6 @@ private:
};
private:
oatpp::collection::FastQueue<AbstractCoroutine> m_sch_pop_io;
oatpp::collection::FastQueue<AbstractCoroutine> m_sch_pop_timer;
std::mutex m_sch_pop_io_mutex;
std::mutex m_sch_pop_timer_mutex;
private:
oatpp::collection::FastQueue<AbstractCoroutine> m_sch_push_io;
@ -99,8 +92,14 @@ private:
private:
oatpp::collection::FastQueue<AbstractCoroutine> m_sch_pop_io_tmp;
oatpp::collection::FastQueue<AbstractCoroutine> m_sch_pop_timer_tmp;
std::vector<std::shared_ptr<Worker>> m_ioWorkers;
std::vector<std::shared_ptr<Worker>> m_timerWorkers;
std::vector<oatpp::collection::FastQueue<AbstractCoroutine>> m_ioPopQueues;
std::vector<oatpp::collection::FastQueue<AbstractCoroutine>> m_timerPopQueues;
v_word32 m_ioBalancer = 0;
v_word32 m_timerBalancer = 0;
private:
@ -122,37 +121,32 @@ private:
private:
void popIOTask(AbstractCoroutine* coroutine);
void popTimerTask(AbstractCoroutine* coroutine);
void consumeAllTasks();
void addCoroutine(AbstractCoroutine* coroutine);
void popTmpQueues();
void popTasks();
void pushAllFromQueue(oatpp::collection::FastQueue<AbstractCoroutine>& pushQueue);
void pushQueues();
public:
void addWorker(const std::shared_ptr<Worker>& worker);
/**
* Return coroutine scheduled for I/O back to owner processor.
* @param coroutine
*/
void pushTaskFromIO(AbstractCoroutine* coroutine);
void pushOneTaskFromIO(AbstractCoroutine* coroutine);
/**
* Return coroutine scheduled for Timer back to owner processor.
* @param coroutine
*/
void pushTaskFromTimer(AbstractCoroutine* coroutine);
void pushOneTaskFromTimer(AbstractCoroutine* coroutine);
/**
* Move all waiting for I/O tasks to specified queue.
* @param queue
*/
void popIOTasks(oatpp::collection::FastQueue<AbstractCoroutine>& queue);
/**
* Move all waiting for Timer tasks to specified queue.
* @param queue
*/
void popTimerTasks(oatpp::collection::FastQueue<AbstractCoroutine>& queue);
void pushTasksFromTimer(oatpp::collection::FastQueue<AbstractCoroutine>& tasks);
/**
* Execute Coroutine.

View File

@ -1,25 +0,0 @@
/***************************************************************************
*
* Project _____ __ ____ _ _
* ( _ ) /__\ (_ _)_| |_ _| |_
* )(_)( /(__)\ )( (_ _)(_ _)
* (_____)(__)(__)(__) |_| |_|
*
*
* Copyright 2018-present, Leonid Stryzhevskyi <lganzzzo@gmail.com>
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
***************************************************************************/
#include "Scheduler.hpp"

View File

@ -0,0 +1,99 @@
/***************************************************************************
*
* Project _____ __ ____ _ _
* ( _ ) /__\ (_ _)_| |_ _| |_
* )(_)( /(__)\ )( (_ _)(_ _)
* (_____)(__)(__)(__) |_| |_|
*
*
* Copyright 2018-present, Leonid Stryzhevskyi <lganzzzo@gmail.com>
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
***************************************************************************/
#include "TimerWorker.hpp"
#include "Processor.hpp"
#include <chrono>
namespace oatpp { namespace async {
void TimerWorker::pushTasks(oatpp::collection::FastQueue<AbstractCoroutine>& tasks) {
{
std::lock_guard<std::mutex> guard(m_backlogMutex);
oatpp::collection::FastQueue<AbstractCoroutine>::moveAll(tasks, m_backlog);
}
m_backlogCondition.notify_one();
}
void TimerWorker::consumeBacklog(bool blockToConsume) {
if(blockToConsume) {
std::unique_lock<std::mutex> lock(m_backlogMutex);
while (m_backlog.first == nullptr && m_running) {
m_backlogCondition.wait(lock);
}
oatpp::collection::FastQueue<AbstractCoroutine>::moveAll(m_backlog, m_queue);
} else {
std::unique_lock<std::mutex> lock(m_backlogMutex, std::try_to_lock);
if (lock.owns_lock()) {
oatpp::collection::FastQueue<AbstractCoroutine>::moveAll(m_backlog, m_queue);
}
}
}
void TimerWorker::work() {
v_int32 consumeIteration = 0;
v_int32 roundIteration = 0;
while(m_running) {
auto CP = m_queue.first;
if(CP != nullptr) {
Action action = CP->iterate();
if (action.getType() == Action::TYPE_WAIT_RETRY) {
++ roundIteration;
if(roundIteration == 10) {
roundIteration = 0;
m_queue.round();
}
} else {
roundIteration = 0;
m_queue.popFront();
setCoroutineScheduledAction(CP, std::move(action));
getCoroutineProcessor(CP)->pushOneTaskFromTimer(CP);
}
++ consumeIteration;
if(consumeIteration == 100) {
consumeIteration = 0;
consumeBacklog(false);
}
} else {
consumeBacklog(true);
}
}
}
}}

View File

@ -22,39 +22,51 @@
*
***************************************************************************/
#ifndef oatpp_async_Scheduler_hpp
#define oatpp_async_Scheduler_hpp
#ifndef oatpp_async_TimerWorker_hpp
#define oatpp_async_TimerWorker_hpp
#include "oatpp/core/Types.hpp"
#include "./Worker.hpp"
#include "oatpp/core/collection/LinkedList.hpp"
class Scheduler {
#include <thread>
#include <mutex>
#include <condition_variable>
namespace oatpp { namespace async {
class TimerWorker : public Worker {
private:
bool m_running;
oatpp::collection::FastQueue<AbstractCoroutine> m_backlog;
oatpp::collection::FastQueue<AbstractCoroutine> m_queue;
std::mutex m_backlogMutex;
std::condition_variable m_backlogCondition;
private:
void consumeBacklog(bool blockToConsume);
public:
enum WorkerType : v_int32 {
/**
* Worker type - general processor.
*/
PROCESSOR = 0,
/**
* Worker type - timer processor.
*/
TIMER = 1,
/**
* Worker type - I/O processor.
*/
IO = 2
};
private:
TimerWorker()
: Worker(Type::TIMER)
, m_running(true)
{
std::thread thread(&TimerWorker::work, this);
thread.detach();
}
void pushTasks(oatpp::collection::FastQueue<AbstractCoroutine>& tasks) override;
void work();
void stop() override {
{
std::lock_guard<std::mutex> lock(m_backlogMutex);
m_running = false;
}
m_backlogCondition.notify_one();
}
};
}}
#endif //oatpp_async_Scheduler_hpp
#endif //oatpp_async_TimerWorker_hpp

View File

@ -23,3 +23,15 @@
***************************************************************************/
#include "Worker.hpp"
namespace oatpp { namespace async {
void Worker::setCoroutineScheduledAction(AbstractCoroutine *CP, Action &&action) {
CP->_SCH_A = std::forward<Action>(action);
}
Processor* Worker::getCoroutineProcessor(AbstractCoroutine* CP) {
return CP->_PP;
}
}}

View File

@ -32,35 +32,51 @@ namespace oatpp { namespace async {
class Worker {
public:
struct Task {
enum Type : v_int32 {
mutable Action action;
AbstractCoroutine* coroutine;
Worker* sender;
/**
* Worker type - general processor.
*/
PROCESSOR = 0,
Task(const Task& other)
: action(std::move(other.action))
, coroutine(other.coroutine)
, sender(other.sender)
{}
/**
* Worker type - timer processor.
*/
TIMER = 1,
Task& operator = (const Task& other) {
action = std::move(other.action);
coroutine = other.coroutine;
sender = other.sender;
return *this;
}
/**
* Worker type - I/O processor.
*/
IO = 2,
/**
* Number of types in this enum.
*/
TYPES_COUNT = 3
};
private:
Type m_type;
protected:
void setCoroutineScheduledAction(AbstractCoroutine* CP, Action&& action);
Processor* getCoroutineProcessor(AbstractCoroutine* CP);
public:
Worker(Type type)
: m_type(type)
{}
virtual ~Worker() = default;
virtual void addTask(const Task& task) = 0;
virtual void pushTasks(oatpp::collection::FastQueue<AbstractCoroutine>& tasks) = 0;
virtual void stop() = 0;
Type getType() {
return m_type;
}
};
}}

View File

@ -37,6 +37,7 @@ public:
FastQueue()
: first(nullptr)
, last(nullptr)
, count(0)
{}
~FastQueue(){
@ -69,10 +70,12 @@ public:
}
void round(){
last->_ref = first;
last = first;
first = first->_ref;
last->_ref = nullptr;
if(count > 1) {
last->_ref = first;
last = first;
first = first->_ref;
last->_ref = nullptr;
}
}
T* popFront() {
@ -94,56 +97,27 @@ public:
delete result;
-- count;
}
void removeEntry(T* entry, T* prevEntry){
if(prevEntry == nullptr) {
popFrontNoData();
} else if(entry->_ref == nullptr) {
prevEntry->_ref = nullptr;
last = prevEntry;
delete entry;
-- count;
} else {
prevEntry->_ref = entry->_ref;
delete entry;
-- count;
}
}
static void moveEntry(FastQueue& fromQueue, FastQueue& toQueue, T* entry, T* prevEntry){
if(prevEntry == nullptr) {
toQueue.pushFront(fromQueue.popFront());
} else if(entry->_ref == nullptr) {
toQueue.pushBack(entry);
fromQueue.last = prevEntry;
prevEntry->_ref = nullptr;
-- fromQueue.count;
} else {
prevEntry->_ref = entry->_ref;
toQueue.pushBack(entry);
-- fromQueue.count;
}
}
static void moveAll(FastQueue& fromQueue, FastQueue& toQueue) {
if(toQueue.last == nullptr) {
toQueue.first = fromQueue.first;
toQueue.last = fromQueue.last;
} else {
toQueue.last->_ref = fromQueue.first;
toQueue.last = fromQueue.last;
if(fromQueue.count > 0) {
if (toQueue.last == nullptr) {
toQueue.first = fromQueue.first;
toQueue.last = fromQueue.last;
} else {
toQueue.last->_ref = fromQueue.first;
toQueue.last = fromQueue.last;
}
toQueue.count += fromQueue.count;
fromQueue.count = 0;
fromQueue.first = nullptr;
fromQueue.last = nullptr;
}
toQueue.count += fromQueue.count;
fromQueue.count = 0;
fromQueue.first = nullptr;
fromQueue.last = nullptr;
}
void clear() {