new async::Processor draft

This commit is contained in:
lganzzzo 2019-04-09 03:27:48 +03:00
parent a76b37778b
commit 413cc65e73
7 changed files with 352 additions and 172 deletions

View File

@ -155,6 +155,8 @@ AbstractCoroutine::AbstractCoroutine()
: _CP(this)
, _FP(&AbstractCoroutine::act)
, _ERR(nullptr)
, _PP(nullptr)
, _SCH_A(Action::TYPE_NONE)
, _ref(nullptr)
, m_parent(nullptr)
, m_propagatedError(&_ERR)

View File

@ -265,6 +265,8 @@ private:
AbstractCoroutine* _CP;
FunctionPtr _FP;
std::shared_ptr<const Error> _ERR;
Processor* _PP;
oatpp::async::Action _SCH_A;
AbstractCoroutine* _ref;
private:
AbstractCoroutine* m_parent;

View File

@ -26,60 +26,31 @@
namespace oatpp { namespace async {
const v_int32 Executor::THREAD_NUM_DEFAULT = OATPP_ASYNC_EXECUTOR_THREAD_NUM_DEFAULT;
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// Executor::SubmissionProcessor
Executor::SubmissionProcessor::SubmissionProcessor()
: m_atom(false)
, m_isRunning(true)
: m_isRunning(true)
{}
void Executor::SubmissionProcessor::consumeTasks() {
oatpp::concurrency::SpinLock lock(m_atom);
auto curr = m_pendingTasks.getFirstNode();
while (curr != nullptr) {
m_processor.addWaitingCoroutine(curr->getData()->createCoroutine());
curr = curr->getNext();
}
m_pendingTasks.clear();
}
void Executor::SubmissionProcessor::run(){
while(m_isRunning) {
/* Load all waiting connections into processor */
consumeTasks();
/* Process all, and check for incoming connections once in 1000 iterations */
while (m_processor.iterate(1000)) {
consumeTasks();
}
std::unique_lock<std::mutex> lock(m_taskMutex);
if(m_processor.isEmpty()) {
/* No tasks in the processor. Wait for incoming connections */
m_taskCondition.wait_for(lock, std::chrono::milliseconds(500));
} else {
/* There is still something in slow queue. Wait and get back to processing */
/* Waiting for IO is not Applicable here as slow queue may contain NON-IO tasks */
//OATPP_LOGD("proc", "waiting slow queue");
m_taskCondition.wait_for(lock, std::chrono::milliseconds(10));
}
m_processor.waitForTasks();
while (m_processor.iterate(1000)) {}
}
}
void Executor::SubmissionProcessor::stop() {
m_isRunning = false;
m_processor.stop();
}
void Executor::SubmissionProcessor::addTaskSubmission(const std::shared_ptr<TaskSubmission>& task){
oatpp::concurrency::SpinLock lock(m_atom);
m_pendingTasks.pushBack(task);
m_taskCondition.notify_one();
}
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// Executor
const v_int32 Executor::THREAD_NUM_DEFAULT = OATPP_ASYNC_EXECUTOR_THREAD_NUM_DEFAULT;
Executor::Executor(v_int32 threadsCount)
: m_threadsCount(threadsCount)

View File

@ -47,67 +47,22 @@ namespace oatpp { namespace async {
class Executor {
private:
class TaskSubmission {
public:
virtual ~TaskSubmission() {};
virtual AbstractCoroutine* createCoroutine() = 0;
};
/*
* Sequence generating templates
* used to convert tuple to parameters pack
* Example: expand SequenceGenerator<3>:
* // 2, 2, {} // 1, 1, {2} // 0, 0, {1, 2} // 0, {0, 1, 2}
* where {...} is int...S
*/
template<int ...> struct IndexSequence {};
template<int N, int ...S> struct SequenceGenerator : SequenceGenerator <N - 1, N - 1, S...> {};
template<int ...S>
struct SequenceGenerator<0, S...> {
typedef IndexSequence<S...> type;
};
template<typename CoroutineType, typename ... Args>
class SubmissionTemplate : public TaskSubmission {
private:
std::tuple<Args...> m_params;
public:
SubmissionTemplate(Args... params)
: m_params(std::make_tuple(params...))
{}
virtual AbstractCoroutine* createCoroutine() {
return creator(typename SequenceGenerator<sizeof...(Args)>::type());
}
template<int ...S>
AbstractCoroutine* creator(IndexSequence<S...>) {
return new CoroutineType(std::get<S>(m_params) ...);
}
};
class SubmissionProcessor/* : public Worker */{
private:
typedef oatpp::collection::LinkedList<std::shared_ptr<TaskSubmission>> Tasks;
private:
void consumeTasks();
private:
oatpp::async::Processor m_processor;
oatpp::concurrency::SpinLock::Atom m_atom;
Tasks m_pendingTasks;
private:
bool m_isRunning;
std::mutex m_taskMutex;
std::condition_variable m_taskCondition;
public:
SubmissionProcessor();
public:
void run();
void stop() ;
void addTaskSubmission(const std::shared_ptr<TaskSubmission>& task);
template<typename CoroutineType, typename ... Args>
void execute(Args... params) {
m_processor.execute<CoroutineType, Args...>(params...);
}
};
@ -160,10 +115,7 @@ public:
template<typename CoroutineType, typename ... Args>
void execute(Args... params) {
auto& processor = m_processors[m_balancer % m_threadsCount];
auto submission = std::make_shared<SubmissionTemplate<CoroutineType, Args...>>(params...);
processor.addTaskSubmission(submission);
processor.execute<CoroutineType, Args...>(params...);
m_balancer ++;
}

View File

@ -25,87 +25,200 @@
#include "Processor.hpp"
namespace oatpp { namespace async {
bool Processor::checkWaitingQueue() {
bool hasActions = false;
AbstractCoroutine* curr = m_waitingQueue.first;
AbstractCoroutine* prev = nullptr;
while (curr != nullptr) {
const Action& action = curr->iterate();
if(action.m_type == Action::TYPE_FINISH) {
m_waitingQueue.removeEntry(curr, prev);
if(prev != nullptr) {
curr = prev;
} else {
curr = m_waitingQueue.first;
}
} else if(action.m_type != Action::TYPE_WAIT_RETRY) {
oatpp::collection::FastQueue<AbstractCoroutine>::moveEntry(m_waitingQueue, m_activeQueue, curr, prev);
hasActions = true;
if(prev != nullptr) {
curr = prev;
} else {
curr = m_waitingQueue.first;
}
void Processor::addCoroutine(AbstractCoroutine* coroutine) {
if(coroutine->_PP == this) {
const Action& action = coroutine->_SCH_A;
switch(action.m_type) {
case Action::TYPE_WAIT_FOR_IO:
coroutine->_SCH_A = Action::clone(action);
m_queue.popFront();
m_sch_pop_io_tmp.pushBack(coroutine);
break;
case Action::TYPE_WAIT_RETRY:
coroutine->_SCH_A = Action::clone(action);
m_queue.popFront();
m_sch_pop_timer_tmp.pushBack(coroutine);
break;
}
action.m_type = Action::TYPE_NONE;
prev = curr;
if(curr != nullptr) {
curr = curr->_ref;
}
m_queue.pushBack(coroutine);
} else {
throw std::runtime_error("[oatpp::async::processor::addCoroutine()]: Error. Attempt to schedule coroutine to wrong processor.");
}
return hasActions;
}
bool Processor::considerContinueImmediately() {
bool hasAction = checkWaitingQueue();
if(hasAction) {
m_inactivityTick = 0;
} else if(m_inactivityTick == 0) {
m_inactivityTick = oatpp::base::Environment::getMicroTickCount();
} else if(oatpp::base::Environment::getMicroTickCount() - m_inactivityTick > 1000 * 100 /* 100 millis */) {
return m_activeQueue.first != nullptr;
void Processor::pushTaskFromIO(AbstractCoroutine* coroutine) {
std::lock_guard<std::mutex> lock(m_sch_push_io_mutex);
m_sch_push_io.pushBack(coroutine);
m_waitCondition.notify_one();
}
void Processor::pushTaskFromTimer(AbstractCoroutine* coroutine) {
std::lock_guard<std::mutex> lock(m_sch_push_timer_mutex);
m_sch_push_timer.pushBack(coroutine);
m_waitCondition.notify_one();
}
void Processor::popIOTasks(oatpp::collection::FastQueue<AbstractCoroutine>& queue) {
if(m_sch_pop_io.first != nullptr) {
std::lock_guard<std::mutex> lock(m_sch_pop_io_mutex);
collection::FastQueue<AbstractCoroutine>::moveAll(m_sch_pop_io, queue);
}
return true;
}
void Processor::addCoroutine(AbstractCoroutine* coroutine) {
m_activeQueue.pushBack(coroutine);
void Processor::popTimerTasks(oatpp::collection::FastQueue<AbstractCoroutine>& queue) {
if(m_sch_pop_timer.first != nullptr) {
std::lock_guard<std::mutex> lock(m_sch_pop_timer_mutex);
collection::FastQueue<AbstractCoroutine>::moveAll(m_sch_pop_timer, queue);
}
}
void Processor::addWaitingCoroutine(AbstractCoroutine* coroutine) {
m_waitingQueue.pushBack(coroutine);
void Processor::waitForTasks() {
std::unique_lock<std::mutex> lock(m_waitMutex);
while (m_sch_push_io.first == nullptr && m_sch_push_timer.first == nullptr && m_taskList.empty() && m_running) {
m_waitCondition.wait(lock);
}
}
void Processor::popTmpQueues() {
{
std::lock_guard<std::mutex> lock(m_sch_pop_io_mutex);
collection::FastQueue<AbstractCoroutine>::moveAll(m_sch_pop_io_tmp, m_sch_pop_io);
}
{
std::lock_guard<std::mutex> lock(m_sch_pop_timer_mutex);
collection::FastQueue<AbstractCoroutine>::moveAll(m_sch_pop_timer_tmp, m_sch_pop_timer);
}
}
void Processor::pushAllFromQueue(oatpp::collection::FastQueue<AbstractCoroutine>& pushQueue) {
auto curr = pushQueue.first;
while(curr != nullptr) {
addCoroutine(curr);
curr = curr->_ref;
}
pushQueue.first = nullptr;
pushQueue.last = nullptr;
pushQueue.count = 0;
}
void Processor::consumeAllTasks() {
for(auto& submission : m_taskList) {
m_queue.pushBack(submission->createCoroutine());
}
m_taskList.clear();
}
void Processor::pushQueues() {
static constexpr v_int32 MAX_BATCH_SIZE = 1000;
if(!m_taskList.empty()) {
if (m_taskList.size() < MAX_BATCH_SIZE && m_queue.first != nullptr) {
std::unique_lock<std::mutex> lock(m_taskMutex, std::try_to_lock);
if (lock.owns_lock()) {
consumeAllTasks();
}
} else {
std::lock_guard<std::mutex> lock(m_taskMutex);
consumeAllTasks();
}
}
if(m_sch_push_io.first != nullptr) {
if (m_sch_push_io.count < MAX_BATCH_SIZE && m_queue.first != nullptr) {
std::unique_lock<std::mutex> lock(m_sch_push_io_mutex, std::try_to_lock);
if (lock.owns_lock()) {
pushAllFromQueue(m_sch_push_io);
}
} else {
std::lock_guard<std::mutex> lock(m_sch_push_io_mutex);
pushAllFromQueue(m_sch_push_io);
}
}
if(m_sch_push_timer.first != nullptr) {
if (m_sch_push_timer.count < MAX_BATCH_SIZE && m_queue.first != nullptr) {
std::unique_lock<std::mutex> lock(m_sch_push_timer_mutex, std::try_to_lock);
if (lock.owns_lock()) {
pushAllFromQueue(m_sch_push_timer);
}
} else {
std::lock_guard<std::mutex> lock(m_sch_push_timer_mutex);
pushAllFromQueue(m_sch_push_timer);
}
}
}
bool Processor::iterate(v_int32 numIterations) {
pushQueues();
for(v_int32 i = 0; i < numIterations; i++) {
auto CP = m_activeQueue.first;
auto CP = m_queue.first;
if(CP == nullptr) {
break;
}
if(!CP->finished()) {
const Action& action = CP->iterate();
if(action.m_type == Action::TYPE_WAIT_RETRY) {
m_waitingQueue.pushBack(m_activeQueue.popFront());
} else {
m_activeQueue.round();
}
action.m_type = Action::TYPE_NONE;
if(CP->finished()) {
m_queue.popFrontNoData();
} else {
m_activeQueue.popFrontNoData();
const Action& action = CP->iterate();
switch(action.m_type) {
case Action::TYPE_WAIT_FOR_IO:
CP->_SCH_A = Action::clone(action);
m_queue.popFront();
m_sch_pop_io_tmp.pushBack(CP);
OATPP_LOGD("Processor", "push to IO");
break;
case Action::TYPE_WAIT_RETRY:
CP->_SCH_A = Action::clone(action);
m_queue.popFront();
m_sch_pop_timer_tmp.pushBack(CP);
OATPP_LOGD("Processor", "push to Timer");
break;
default:
m_queue.round();
}
action.m_type = Action::TYPE_NONE;
}
}
popTmpQueues();
return considerContinueImmediately();
return m_queue.first != nullptr ||
m_sch_push_io.first != nullptr ||
m_sch_push_timer.first != nullptr ||
!m_taskList.empty();
}
void Processor::stop() {
m_waitCondition.notify_one();
}
}}

View File

@ -28,6 +28,9 @@
#include "./Coroutine.hpp"
#include "oatpp/core/collection/FastQueue.hpp"
#include <mutex>
#include <list>
namespace oatpp { namespace async {
/**
@ -36,28 +39,139 @@ namespace oatpp { namespace async {
*/
class Processor {
private:
bool checkWaitingQueue();
bool considerContinueImmediately();
class TaskSubmission {
public:
virtual ~TaskSubmission() {};
virtual AbstractCoroutine* createCoroutine() = 0;
};
/*
* Sequence generating templates
* used to convert tuple to parameters pack
* Example: expand SequenceGenerator<3>:
* // 2, 2, {} // 1, 1, {2} // 0, 0, {1, 2} // 0, {0, 1, 2}
* where {...} is int...S
*/
template<int ...> struct IndexSequence {};
template<int N, int ...S> struct SequenceGenerator : SequenceGenerator <N - 1, N - 1, S...> {};
template<int ...S>
struct SequenceGenerator<0, S...> {
typedef IndexSequence<S...> type;
};
template<typename CoroutineType, typename ... Args>
class SubmissionTemplate : public TaskSubmission {
private:
std::tuple<Args...> m_params;
public:
SubmissionTemplate(Args... params)
: m_params(std::make_tuple(params...))
{}
virtual AbstractCoroutine* createCoroutine() {
return creator(typename SequenceGenerator<sizeof...(Args)>::type());
}
template<int ...S>
AbstractCoroutine* creator(IndexSequence<S...>) {
return new CoroutineType(std::get<S>(m_params) ...);
}
};
private:
oatpp::collection::FastQueue<AbstractCoroutine> m_activeQueue;
oatpp::collection::FastQueue<AbstractCoroutine> m_waitingQueue;
oatpp::collection::FastQueue<AbstractCoroutine> m_sch_pop_io;
oatpp::collection::FastQueue<AbstractCoroutine> m_sch_pop_timer;
std::mutex m_sch_pop_io_mutex;
std::mutex m_sch_pop_timer_mutex;
private:
v_int64 m_inactivityTick = 0;
oatpp::collection::FastQueue<AbstractCoroutine> m_sch_push_io;
oatpp::collection::FastQueue<AbstractCoroutine> m_sch_push_timer;
std::mutex m_sch_push_io_mutex;
std::mutex m_sch_push_timer_mutex;
private:
oatpp::collection::FastQueue<AbstractCoroutine> m_sch_pop_io_tmp;
oatpp::collection::FastQueue<AbstractCoroutine> m_sch_pop_timer_tmp;
private:
std::mutex m_taskMutex;
std::list<std::shared_ptr<TaskSubmission>> m_taskList;
private:
oatpp::collection::FastQueue<AbstractCoroutine> m_queue;
private:
std::mutex m_waitMutex;
std::condition_variable m_waitCondition;
private:
bool m_running = true;
private:
void consumeAllTasks();
void addCoroutine(AbstractCoroutine* coroutine);
void popTmpQueues();
void pushAllFromQueue(oatpp::collection::FastQueue<AbstractCoroutine>& pushQueue);
void pushQueues();
public:
/**
* Add Coroutine to processor.
* @param coroutine - pointer to Coroutine.
*/
void addCoroutine(AbstractCoroutine* coroutine);
/**
* Add Coroutine to processor in "waiting queue"
* Return coroutine scheduled for I/O back to owner processor.
* @param coroutine
*/
void addWaitingCoroutine(AbstractCoroutine* coroutine);
void pushTaskFromIO(AbstractCoroutine* coroutine);
/**
* Return coroutine scheduled for Timer back to owner processor.
* @param coroutine
*/
void pushTaskFromTimer(AbstractCoroutine* coroutine);
/**
* Move all waiting for I/O tasks to specified queue.
* @param queue
*/
void popIOTasks(oatpp::collection::FastQueue<AbstractCoroutine>& queue);
/**
* Move all waiting for Timer tasks to specified queue.
* @param queue
*/
void popTimerTasks(oatpp::collection::FastQueue<AbstractCoroutine>& queue);
/**
* Execute Coroutine.
* @tparam CoroutineType - type of coroutine to execute.
* @tparam Args - types of arguments to be passed to Coroutine constructor.
* @param params - actual arguments to be passed to Coroutine constructor.
*/
template<typename CoroutineType, typename ... Args>
void execute(Args... params) {
auto submission = std::make_shared<SubmissionTemplate<CoroutineType, Args...>>(params...);
std::lock_guard<std::mutex> lock(m_taskMutex);
m_taskList.push_back(submission);
m_waitCondition.notify_one();
}
/**
* Sleep and wait for tasks.
*/
void waitForTasks();
/**
* Iterate Coroutines.
@ -67,12 +181,10 @@ public:
bool iterate(v_int32 numIterations);
/**
* Check if there is no more Coroutines in processor.
* @return - `true` if all coroutines in all queues are finished.
* Stop waiting for new tasks.
*/
bool isEmpty() {
return m_activeQueue.first == nullptr && m_waitingQueue.first == nullptr;
}
void stop();
};

View File

@ -45,6 +45,7 @@ public:
T* first;
T* last;
v_int32 count;
void pushFront(T* entry) {
entry->_ref = first;
@ -52,6 +53,7 @@ public:
if(last == nullptr) {
last = first;
}
++ count;
}
void pushBack(T* entry) {
@ -63,6 +65,7 @@ public:
last->_ref = entry;
last = entry;
}
++ count;
}
void round(){
@ -78,6 +81,7 @@ public:
if(first == nullptr) {
last = nullptr;
}
-- count;
return result;
}
@ -88,6 +92,7 @@ public:
last = nullptr;
}
delete result;
-- count;
}
void removeEntry(T* entry, T* prevEntry){
@ -98,9 +103,11 @@ public:
prevEntry->_ref = nullptr;
last = prevEntry;
delete entry;
-- count;
} else {
prevEntry->_ref = entry->_ref;
delete entry;
-- count;
}
}
@ -112,12 +119,32 @@ public:
toQueue.pushBack(entry);
fromQueue.last = prevEntry;
prevEntry->_ref = nullptr;
-- fromQueue.count;
} else {
prevEntry->_ref = entry->_ref;
toQueue.pushBack(entry);
-- fromQueue.count;
}
}
static void moveAll(FastQueue& fromQueue, FastQueue& toQueue) {
if(toQueue.last == nullptr) {
toQueue.first = fromQueue.first;
toQueue.last = fromQueue.last;
} else {
toQueue.last->_ref = fromQueue.first;
toQueue.last = fromQueue.last;
}
toQueue.count += fromQueue.count;
fromQueue.count = 0;
fromQueue.first = nullptr;
fromQueue.last = nullptr;
}
void clear() {
T* curr = first;
@ -128,6 +155,7 @@ public:
}
first = nullptr;
last = nullptr;
count = 0;
}
};