draft of experimental scheduled AsyncIO

This commit is contained in:
lganzzzo 2019-04-15 03:30:50 +03:00
parent cac219d027
commit 6bdff40792
25 changed files with 408 additions and 164 deletions

View File

@ -23,6 +23,8 @@ add_library(oatpp
oatpp/core/async/Worker.hpp
oatpp/core/async/IOWorker.cpp
oatpp/core/async/IOWorker.hpp
oatpp/core/async/TimerWorker.cpp
oatpp/core/async/TimerWorker.hpp
oatpp/core/async/Executor.cpp
oatpp/core/async/Executor.hpp
oatpp/core/base/CommandLineArguments.cpp
@ -185,7 +187,7 @@ add_library(oatpp
oatpp/web/url/mapping/Subscriber.hpp
oatpp/core/parser/ParsingError.cpp
oatpp/core/parser/ParsingError.hpp
oatpp/core/async/TimerWorker.cpp oatpp/core/async/TimerWorker.hpp)
)
set_target_properties(oatpp PROPERTIES
CXX_STANDARD 11

View File

@ -35,12 +35,28 @@ Action Action::clone(const Action& action) {
return result;
}
Action Action::createWaitIOAction(data::v_io_handle ioHandle) {
Action result(TYPE_WAIT_FOR_IO);
Action Action::createActionByType(v_int32 type) {
return Action(type);
}
Action Action::createIOWaitAction(data::v_io_handle ioHandle) {
Action result(TYPE_IO_WAIT);
result.m_data.ioHandle = ioHandle;
return result;
}
Action Action::createIORepeatAction(data::v_io_handle ioHandle) {
Action result(TYPE_IO_REPEAT);
result.m_data.ioHandle = ioHandle;
return result;
}
Action Action::createWaitRepeatAction(v_int64 timePointMicroseconds) {
Action result(TYPE_WAIT_REPEAT);
result.m_data.timePointMicroseconds = timePointMicroseconds;
return result;
}
Action::Action(AbstractCoroutine* coroutine)
: m_type(TYPE_COROUTINE)
{
@ -181,9 +197,6 @@ Action AbstractCoroutine::takeAction(Action&& action) {
switch (action.m_type) {
case Action::TYPE_REPEAT: return Action::TYPE_REPEAT;//std::forward<oatpp::async::Action>(action);
case Action::TYPE_WAIT_RETRY: return Action::TYPE_WAIT_RETRY;//std::forward<oatpp::async::Action>(action);
case Action::TYPE_COROUTINE:
action.m_data.coroutine->m_parent = _CP;
action.m_data.coroutine->m_propagatedError = m_propagatedError;
@ -233,7 +246,8 @@ Action AbstractCoroutine::takeAction(Action&& action) {
};
throw std::runtime_error("[oatpp::async::AbstractCoroutine::takeAction()]: Error. Unknown Action.");
//throw std::runtime_error("[oatpp::async::AbstractCoroutine::takeAction()]: Error. Unknown Action.");
return std::forward<oatpp::async::Action>(action);
}

View File

@ -35,6 +35,7 @@
#include "oatpp/core/Types.hpp"
#include <chrono>
#include <exception>
namespace oatpp { namespace async {
@ -51,10 +52,14 @@ class Action {
friend Processor;
friend AbstractCoroutine;
friend CoroutineStarter;
friend Worker;
public:
typedef Action (AbstractCoroutine::*FunctionPtr)();
public:
/**
* None - invalid Action.
*/
static constexpr const v_int32 TYPE_NONE = 0;
/**
@ -73,44 +78,78 @@ public:
static constexpr const v_int32 TYPE_REPEAT = 3;
/**
* Indicate that Action is to WAIT and then RETRY call to current method of Coroutine.
* Indicate that Action is to WAIT for some time and then REPEAT call to current method of Coroutine.
*/
static constexpr const v_int32 TYPE_WAIT_RETRY = 4;
static constexpr const v_int32 TYPE_WAIT_REPEAT = 4;
/**
* Indicate that Action is waiting for IO and should be assigned to corresponding worker.
*/
static constexpr const v_int32 TYPE_WAIT_FOR_IO = 5;
static constexpr const v_int32 TYPE_IO_WAIT = 5;
/**
* Indicate that Action is to repeat previously successful I/O operation.
*/
static constexpr const v_int32 TYPE_IO_REPEAT = 6;
/**
* Indicate that Action is to FINISH current Coroutine and return control to a caller-Coroutine.
*/
static constexpr const v_int32 TYPE_FINISH = 6;
static constexpr const v_int32 TYPE_FINISH = 7;
/**
* Indicate that Error occurred.
*/
static constexpr const v_int32 TYPE_ERROR = 7;
static constexpr const v_int32 TYPE_ERROR = 8;
private:
union Data {
FunctionPtr fptr;
AbstractCoroutine* coroutine;
oatpp::data::v_io_handle ioHandle;
v_int64 timePointMicroseconds;
};
private:
mutable v_int32 m_type;
Data m_data;
protected:
/*
* Create Action by type.
* @param type - Action type.
*/
Action(v_int32 type);
public:
static Action clone(const Action& action);
/**
* Create WAIT_FOR_IO Action
* Create action of specific type
* @param type
* @return
*/
static Action createActionByType(v_int32 type);
/**
* Create TYPE_IO_WAIT Action
* @param ioHandle - &id:oatpp::data::v_io_handle;.
* @return - Action.
*/
static Action createWaitIOAction(data::v_io_handle ioHandle);
static Action createIOWaitAction(data::v_io_handle ioHandle = -1);
/**
* Create TYPE_IO_REPEAT Action
* @param ioHandle - &id:oatpp::data::v_io_handle;.
* @return - Action.
*/
static Action createIORepeatAction(data::v_io_handle ioHandle = -1);
/**
* Create TYPE_WAIT_REPEAT Action.
* @param timePointMicroseconds - time since epoch.
* @return
*/
static Action createWaitRepeatAction(v_int64 timePointMicroseconds);
/**
* Constructor. Create start-coroutine Action.
@ -124,12 +163,6 @@ public:
*/
Action(FunctionPtr functionPtr);
/**
* Create Action by type.
* @param type - Action type.
*/
Action(v_int32 type);
/**
* Deleted copy-constructor.
*/
@ -346,6 +379,10 @@ public:
*/
Action error(const std::shared_ptr<const Error>& error);
Action propagateError() {
return Action(Action::TYPE_ERROR);
}
/**
* Convenience method to generate error reporting Action.
* @tparam E - Error class type.
@ -423,23 +460,34 @@ public:
* @return - repeat Action.
*/
Action repeat() const {
return Action::TYPE_REPEAT;
return Action::createActionByType(Action::TYPE_REPEAT);
}
/**
* Convenience method to generate Action of `type == Action::TYPE_WAIT_RETRY`.
* @return - WAIT_RETRY Action.
* Convenience method to generate Action of `type == Action::TYPE_WAIT_REPEAT`.
* @return - TYPE_WAIT_REPEAT Action.
*/
Action waitRetry() const {
return Action::TYPE_WAIT_RETRY;
Action waitRepeat(const std::chrono::duration<v_int64, std::micro>& timeout) const {
auto startTime = std::chrono::system_clock::now();
auto end = startTime + timeout;
std::chrono::microseconds ms = std::chrono::duration_cast<std::chrono::microseconds>(end.time_since_epoch());
return Action::createWaitRepeatAction(ms.count());
}
/**
* Convenience method to generate Action of `type == Action::TYPE_WAIT_FOR_IO`.
* Convenience method to generate Action of `type == Action::TYPE_IO_WAIT`.
* @return - TYPE_WAIT_FOR_IO Action.
*/
Action waitForIO(data::v_io_handle ioHandle) const {
return Action(this, ioHandle);
Action ioWait(data::v_io_handle ioHandle = -1) const {
return Action::createIOWaitAction(ioHandle);
}
/**
* Convenience method to generate Action of `type == Action::TYPE_IO_WAIT`.
* @return - TYPE_IO_REPEAT Action.
*/
Action ioRepeat(data::v_io_handle ioHandle = -1) const {
return Action::createIORepeatAction(ioHandle);
}
/**
@ -447,7 +495,7 @@ public:
* @return - finish Action.
*/
Action finish() const {
return Action::TYPE_FINISH;
return Action::createActionByType(Action::TYPE_FINISH);
}
};
@ -605,11 +653,30 @@ public:
}
/**
* Convenience method to generate Action of `type == Action::TYPE_WAIT_RETRY`.
* @return - WAIT_RETRY Action.
* Convenience method to generate Action of `type == Action::TYPE_WAIT_REPEAT`.
* @return - TYPE_WAIT_REPEAT Action.
*/
Action waitRetry() const {
return Action::TYPE_WAIT_RETRY;
Action waitRepeat(const std::chrono::duration<v_int64, std::micro>& timeout) const {
auto startTime = std::chrono::system_clock::now();
auto end = startTime + timeout;
std::chrono::microseconds ms = std::chrono::duration_cast<std::chrono::microseconds>(end.time_since_epoch());
return Action::createWaitRepeatAction(ms.count());
}
/**
* Convenience method to generate Action of `type == Action::TYPE_IO_WAIT`.
* @return - TYPE_WAIT_FOR_IO Action.
*/
Action ioWait(data::v_io_handle ioHandle = -1) const {
return Action::createIOWaitAction(ioHandle);
}
/**
* Convenience method to generate Action of `type == Action::TYPE_IO_WAIT`.
* @return - TYPE_IO_REPEAT Action.
*/
Action ioRepeat(data::v_io_handle ioHandle = -1) const {
return Action::createIORepeatAction(ioHandle);
}
/**
@ -617,7 +684,7 @@ public:
* @return - repeat Action.
*/
Action repeat() const {
return Action::TYPE_REPEAT;
return Action::createActionByType(Action::TYPE_REPEAT);
}
/**
@ -627,7 +694,7 @@ public:
*/
Action _return(Args... args) {
m_parentReturnAction = getParent()->callWithParams(m_callback, args...);
return Action::TYPE_FINISH;
return Action::createActionByType(Action::TYPE_FINISH);
}
};

View File

@ -55,36 +55,32 @@ void Executor::SubmissionProcessor::stop() {
const v_int32 Executor::THREAD_NUM_DEFAULT = OATPP_ASYNC_EXECUTOR_THREAD_NUM_DEFAULT;
Executor::Executor(v_int32 threadsCount)
: m_threadsCount(threadsCount)
, m_threads(new std::thread[m_threadsCount])
, m_processors(new SubmissionProcessor[m_threadsCount])
Executor::Executor(v_int32 processorThreads, v_int32 ioThreads, v_int32 timerThreads)
: m_processorThreads(processorThreads)
, m_ioThreads(ioThreads)
, m_timerThreads(timerThreads)
, m_threads(new std::thread[m_processorThreads])
, m_processors(new SubmissionProcessor[m_processorThreads])
{
//auto ioWorker = std::make_shared<IOWorker>();
for(v_int32 i = 0; i < 2; i++) {
m_workers.push_back(std::make_shared<TimerWorker>());
for(v_int32 i = 0; i < m_processorThreads; i ++) {
m_threads[i] = std::thread(&SubmissionProcessor::run, &m_processors[i]);
}
for(v_int32 i = 0; i < m_threadsCount; i ++) {
auto& processor = m_processors[i];
for(auto& worker : m_workers) {
processor.getProcessor().addWorker(worker);
}
// for(v_int32 i = 0; i < 1; i++) {
// auto worker = std::make_shared<TimerWorker>();
// m_workers.push_back(worker);
// processor.getProcessor().addWorker(worker);
// }
m_threads[i] = std::thread(&SubmissionProcessor::run, &processor);
std::vector<std::shared_ptr<Worker>> ioWorkers;
for(v_int32 i = 0; i < m_ioThreads; i++) {
ioWorkers.push_back(std::make_shared<IOWorker>());
}
linkWorkers(ioWorkers);
std::vector<std::shared_ptr<Worker>> timerWorkers;
for(v_int32 i = 0; i < m_timerThreads; i++) {
timerWorkers.push_back(std::make_shared<TimerWorker>());
}
linkWorkers(timerWorkers);
}
Executor::~Executor() {
@ -92,20 +88,61 @@ Executor::~Executor() {
delete [] m_threads;
}
void Executor::linkWorkers(const std::vector<std::shared_ptr<Worker>>& workers) {
m_workers.insert(m_workers.end(), workers.begin(), workers.end());
if(m_processorThreads > workers.size() && (m_processorThreads % workers.size()) == 0) {
v_int32 wi = 0;
for(v_int32 i = 0; i < m_processorThreads; i ++) {
auto& p = m_processors[i];
p.getProcessor().addWorker(workers[wi]);
wi ++;
if(wi == workers.size()) {
wi = 0;
}
}
} else if ((workers.size() % m_processorThreads) == 0) {
v_int32 pi = 0;
for(v_int32 i = 0; i < workers.size(); i ++) {
auto& p = m_processors[pi];
p.getProcessor().addWorker(workers[i]);
pi ++;
if(pi == m_processorThreads) {
pi = 0;
}
}
} else {
for(v_int32 i = 0; i < m_processorThreads; i ++) {
auto& p = m_processors[i];
for(auto& w : workers) {
p.getProcessor().addWorker(w);
}
}
}
}
void Executor::join() {
for(v_int32 i = 0; i < m_threadsCount; i ++) {
for(v_int32 i = 0; i < m_processorThreads; i ++) {
m_threads[i].join();
}
}
void Executor::detach() {
for(v_int32 i = 0; i < m_threadsCount; i ++) {
for(v_int32 i = 0; i < m_processorThreads; i ++) {
m_threads[i].detach();
}
}
void Executor::stop() {
for(v_int32 i = 0; i < m_threadsCount; i ++) {
for(v_int32 i = 0; i < m_processorThreads; i ++) {
m_processors[i].stop();
}

View File

@ -76,20 +76,27 @@ public:
*/
static const v_int32 THREAD_NUM_DEFAULT;
private:
v_int32 m_threadsCount;
//std::shared_ptr<oatpp::concurrency::Thread>* m_threads;
v_int32 m_processorThreads;
v_int32 m_ioThreads;
v_int32 m_timerThreads;
std::thread* m_threads;
SubmissionProcessor* m_processors;
std::atomic<v_word32> m_balancer;
private:
std::vector<std::shared_ptr<Worker>> m_workers;
private:
void linkWorkers(const std::vector<std::shared_ptr<Worker>>& workers);
public:
/**
* Constructor.
* @param threadsCount - Number of threads to run coroutines.
* @param processorThreads - number of data processing threads.
* @param ioThreads - number of I/O threads.
* @param timerThreads - number of timer threads.
*/
Executor(v_int32 threadsCount = THREAD_NUM_DEFAULT);
Executor(v_int32 processorThreads = THREAD_NUM_DEFAULT,
v_int32 ioThreads = 1,
v_int32 timerThreads = 1);
/**
* Non-virtual Destructor.
@ -120,7 +127,7 @@ public:
*/
template<typename CoroutineType, typename ... Args>
void execute(Args... params) {
auto& processor = m_processors[(++ m_balancer) % m_threadsCount];
auto& processor = m_processors[(++ m_balancer) % m_processorThreads];
processor.execute<CoroutineType, Args...>(params...);
}

View File

@ -38,6 +38,14 @@ void IOWorker::pushTasks(oatpp::collection::FastQueue<AbstractCoroutine>& tasks)
m_backlogCondition.notify_one();
}
void IOWorker::pushOneTask(AbstractCoroutine* task) {
{
std::lock_guard<std::mutex> guard(m_backlogMutex);
m_backlog.pushBack(task);
}
m_backlogCondition.notify_one();
}
void IOWorker::consumeBacklog(bool blockToConsume) {
if(blockToConsume) {
@ -46,9 +54,7 @@ void IOWorker::consumeBacklog(bool blockToConsume) {
while (m_backlog.first == nullptr && m_running) {
m_backlogCondition.wait(lock);
}
oatpp::collection::FastQueue<AbstractCoroutine>::moveAll(m_backlog, m_queue);
} else {
std::unique_lock<std::mutex> lock(m_backlogMutex, std::try_to_lock);
@ -62,8 +68,8 @@ void IOWorker::consumeBacklog(bool blockToConsume) {
void IOWorker::work() {
v_int32 sleepIteration = 0;
v_int32 consumeIteration = 0;
v_int32 roundIteration = 0;
while(m_running) {
@ -71,28 +77,40 @@ void IOWorker::work() {
if(CP != nullptr) {
Action action = CP->iterate();
if (action.getType() == Action::TYPE_WAIT_FOR_IO) {
m_queue.round();
} else {
m_queue.popFront();
setCoroutineScheduledAction(CP, std::move(action));
getCoroutineProcessor(CP)->pushOneTaskFromIO(CP);
switch(action.getType()) {
case Action::TYPE_IO_WAIT:
roundIteration = 0;
m_queue.round();
break;
case Action::TYPE_IO_REPEAT:
++ roundIteration;
if(roundIteration == 10) {
roundIteration = 0;
m_queue.round();
}
break;
default:
roundIteration = 0;
m_queue.popFront();
setCoroutineScheduledAction(CP, std::move(action));
getCoroutineProcessor(CP)->pushOneTaskFromIO(CP);
break;
}
++ sleepIteration;
if(sleepIteration == 1000) {
sleepIteration = 0;
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
dismissAction(action);
++ consumeIteration;
if(consumeIteration == 1000) {
if(consumeIteration == 100) {
consumeIteration = 0;
consumeBacklog(false);
}
} else {
sleepIteration = 0;
consumeBacklog(true);
}

View File

@ -58,6 +58,8 @@ public:
void pushTasks(oatpp::collection::FastQueue<AbstractCoroutine>& tasks) override;
void pushOneTask(AbstractCoroutine* task) override;
void work();
void stop() override {

View File

@ -52,6 +52,7 @@ void Processor::popIOTask(AbstractCoroutine* coroutine) {
if(m_ioPopQueues.size() > 0) {
auto &queue = m_ioPopQueues[(++m_ioBalancer) % m_ioPopQueues.size()];
queue.pushBack(coroutine);
//m_ioWorkers[(++m_ioBalancer) % m_ioWorkers.size()]->pushOneTask(coroutine);
} else {
throw std::runtime_error("[oatpp::async::Processor::popIOTasks()]: Error. Processor has no I/O workers.");
}
@ -61,6 +62,7 @@ void Processor::popTimerTask(AbstractCoroutine* coroutine) {
if(m_timerPopQueues.size() > 0) {
auto &queue = m_timerPopQueues[(++m_timerBalancer) % m_timerPopQueues.size()];
queue.pushBack(coroutine);
//m_timerWorkers[(++m_timerBalancer) % m_timerWorkers.size()]->pushOneTask(coroutine);
} else {
throw std::runtime_error("[oatpp::async::Processor::popTimerTask()]: Error. Processor has no Timer workers.");
}
@ -74,12 +76,17 @@ void Processor::addCoroutine(AbstractCoroutine* coroutine) {
switch(action.m_type) {
case Action::TYPE_WAIT_FOR_IO:
case Action::TYPE_IO_WAIT:
coroutine->_SCH_A = Action::clone(action);
popIOTask(coroutine);
break;
case Action::TYPE_WAIT_RETRY:
case Action::TYPE_IO_REPEAT:
coroutine->_SCH_A = Action::clone(action);
popIOTask(coroutine);
break;
case Action::TYPE_WAIT_REPEAT:
coroutine->_SCH_A = Action::clone(action);
popTimerTask(coroutine);
break;
@ -100,7 +107,7 @@ void Processor::addCoroutine(AbstractCoroutine* coroutine) {
void Processor::pushOneTaskFromIO(AbstractCoroutine* coroutine) {
{
std::lock_guard<std::mutex> waitLock(m_waitMutex);
std::lock_guard<std::mutex> lock(m_sch_push_io_mutex);
oatpp::concurrency::SpinLock lock(m_sch_push_io_atom);
m_sch_push_io.pushBack(coroutine);
}
m_waitCondition.notify_one();
@ -109,7 +116,7 @@ void Processor::pushOneTaskFromIO(AbstractCoroutine* coroutine) {
void Processor::pushOneTaskFromTimer(AbstractCoroutine* coroutine) {
{
std::lock_guard<std::mutex> waitLock(m_waitMutex);
std::lock_guard<std::mutex> lock(m_sch_push_timer_mutex);
oatpp::concurrency::SpinLock lock(m_sch_push_timer_atom);
m_sch_push_timer.pushBack(coroutine);
}
m_waitCondition.notify_one();
@ -118,7 +125,7 @@ void Processor::pushOneTaskFromTimer(AbstractCoroutine* coroutine) {
void Processor::pushTasksFromTimer(oatpp::collection::FastQueue<AbstractCoroutine>& tasks) {
{
std::lock_guard<std::mutex> waitLock(m_waitMutex);
std::lock_guard<std::mutex> lock(m_sch_push_timer_mutex);
oatpp::concurrency::SpinLock lock(m_sch_push_timer_atom);
collection::FastQueue<AbstractCoroutine>::moveAll(tasks, m_sch_push_timer);
}
m_waitCondition.notify_one();
@ -182,24 +189,24 @@ void Processor::pushQueues() {
if(m_sch_push_io.first != nullptr) {
if (m_sch_push_io.count < MAX_BATCH_SIZE && m_queue.first != nullptr) {
std::unique_lock<std::mutex> lock(m_sch_push_io_mutex, std::try_to_lock);
if (lock.owns_lock()) {
oatpp::concurrency::SpinLock::TryLock lock(m_sch_push_io_atom);
if (lock.ownsLock()) {
pushAllFromQueue(m_sch_push_io);
}
} else {
std::lock_guard<std::mutex> lock(m_sch_push_io_mutex);
oatpp::concurrency::SpinLock lock(m_sch_push_io_atom);
pushAllFromQueue(m_sch_push_io);
}
}
if(m_sch_push_timer.first != nullptr) {
if (m_sch_push_timer.count < MAX_BATCH_SIZE && m_queue.first != nullptr) {
std::unique_lock<std::mutex> lock(m_sch_push_timer_mutex, std::try_to_lock);
if (lock.owns_lock()) {
oatpp::concurrency::SpinLock::TryLock lock(m_sch_push_timer_atom);
if (lock.ownsLock()) {
pushAllFromQueue(m_sch_push_timer);
}
} else {
std::lock_guard<std::mutex> lock(m_sch_push_timer_mutex);
oatpp::concurrency::SpinLock lock(m_sch_push_timer_atom);
pushAllFromQueue(m_sch_push_timer);
}
}
@ -226,13 +233,19 @@ bool Processor::iterate(v_int32 numIterations) {
switch (action.m_type) {
case Action::TYPE_WAIT_FOR_IO:
case Action::TYPE_IO_WAIT:
CP->_SCH_A = Action::clone(action);
m_queue.popFront();
popIOTask(CP);
break;
case Action::TYPE_WAIT_RETRY:
// case Action::TYPE_IO_REPEAT: // DO NOT RESCHEDULE COROUTINE WITH ACTIVE I/O
// CP->_SCH_A = Action::clone(action);
// m_queue.popFront();
// popIOTask(CP);
// break;
case Action::TYPE_WAIT_REPEAT:
CP->_SCH_A = Action::clone(action);
m_queue.popFront();
popTimerTask(CP);

View File

@ -87,8 +87,8 @@ private:
oatpp::collection::FastQueue<AbstractCoroutine> m_sch_push_io;
oatpp::collection::FastQueue<AbstractCoroutine> m_sch_push_timer;
std::mutex m_sch_push_io_mutex;
std::mutex m_sch_push_timer_mutex;
oatpp::concurrency::SpinLock::Atom m_sch_push_io_atom;
oatpp::concurrency::SpinLock::Atom m_sch_push_timer_atom;
private:
@ -132,6 +132,11 @@ private:
public:
Processor()
: m_sch_push_io_atom(false)
, m_sch_push_timer_atom(false)
{}
void addWorker(const std::shared_ptr<Worker>& worker);
/**

View File

@ -38,60 +38,72 @@ void TimerWorker::pushTasks(oatpp::collection::FastQueue<AbstractCoroutine>& tas
m_backlogCondition.notify_one();
}
void TimerWorker::consumeBacklog(bool blockToConsume) {
if(blockToConsume) {
std::unique_lock<std::mutex> lock(m_backlogMutex);
while (m_backlog.first == nullptr && m_running) {
m_backlogCondition.wait(lock);
}
oatpp::collection::FastQueue<AbstractCoroutine>::moveAll(m_backlog, m_queue);
} else {
std::unique_lock<std::mutex> lock(m_backlogMutex, std::try_to_lock);
if (lock.owns_lock()) {
oatpp::collection::FastQueue<AbstractCoroutine>::moveAll(m_backlog, m_queue);
}
void TimerWorker::consumeBacklog() {
std::unique_lock<std::mutex> lock(m_backlogMutex);
while (m_backlog.first == nullptr && m_queue.first == nullptr && m_running) {
m_backlogCondition.wait(lock);
}
oatpp::collection::FastQueue<AbstractCoroutine>::moveAll(m_backlog, m_queue);
}
void TimerWorker::pushOneTask(AbstractCoroutine* task) {
{
std::lock_guard<std::mutex> guard(m_backlogMutex);
m_backlog.pushBack(task);
}
m_backlogCondition.notify_one();
}
void TimerWorker::work() {
v_int32 consumeIteration = 0;
v_int32 roundIteration = 0;
while(m_running) {
auto CP = m_queue.first;
if(CP != nullptr) {
consumeBacklog();
auto curr = m_queue.first;
AbstractCoroutine* prev = nullptr;
std::chrono::microseconds ms = std::chrono::duration_cast<std::chrono::microseconds>(std::chrono::system_clock::now().time_since_epoch());
v_int64 tick = ms.count();
while(curr != nullptr) {
auto next = nextCoroutine(curr);
if(getCoroutineTimePoint(curr) < tick) {
Action action = curr->iterate();
switch(action.getType()) {
case Action::TYPE_WAIT_REPEAT:
setCoroutineScheduledAction(curr, std::move(action));
break;
// case Action::TYPE_IO_WAIT:
// setCoroutineScheduledAction(curr, oatpp::async::Action::createWaitRepeatAction(0));
// break;
default:
m_queue.cutEntry(curr, prev);
setCoroutineScheduledAction(curr, std::move(action));
getCoroutineProcessor(curr)->pushOneTaskFromTimer(curr);
curr = prev;
break;
Action action = CP->iterate();
if (action.getType() == Action::TYPE_WAIT_RETRY) {
++ roundIteration;
if(roundIteration == 10) {
roundIteration = 0;
m_queue.round();
}
} else {
roundIteration = 0;
m_queue.popFront();
setCoroutineScheduledAction(CP, std::move(action));
getCoroutineProcessor(CP)->pushOneTaskFromTimer(CP);
dismissAction(action);
}
++ consumeIteration;
if(consumeIteration == 100) {
consumeIteration = 0;
consumeBacklog(false);
}
} else {
consumeBacklog(true);
prev = curr;
curr = next;
}
std::this_thread::sleep_for(m_granularity);
}
}

View File

@ -31,6 +31,7 @@
#include <thread>
#include <mutex>
#include <condition_variable>
#include <unordered_map>
namespace oatpp { namespace async {
@ -42,12 +43,15 @@ private:
std::mutex m_backlogMutex;
std::condition_variable m_backlogCondition;
private:
void consumeBacklog(bool blockToConsume);
std::chrono::duration<v_int64, std::micro> m_granularity;
private:
void consumeBacklog();
public:
TimerWorker()
TimerWorker(const std::chrono::duration<v_int64, std::micro>& granularity = std::chrono::milliseconds(100))
: Worker(Type::TIMER)
, m_running(true)
, m_granularity(granularity)
{
std::thread thread(&TimerWorker::work, this);
thread.detach();
@ -55,6 +59,8 @@ public:
void pushTasks(oatpp::collection::FastQueue<AbstractCoroutine>& tasks) override;
void pushOneTask(AbstractCoroutine* task) override;
void work();
void stop() override {

View File

@ -34,4 +34,16 @@ Processor* Worker::getCoroutineProcessor(AbstractCoroutine* CP) {
return CP->_PP;
}
v_int64 Worker::getCoroutineTimePoint(AbstractCoroutine* CP) {
return CP->_SCH_A.m_data.timePointMicroseconds;
}
void Worker::dismissAction(Action& action) {
action.m_type = Action::TYPE_NONE;
}
AbstractCoroutine* Worker::nextCoroutine(AbstractCoroutine* CP) {
return CP->_ref;
}
}}

View File

@ -59,8 +59,11 @@ public:
private:
Type m_type;
protected:
void setCoroutineScheduledAction(AbstractCoroutine* CP, Action&& action);
Processor* getCoroutineProcessor(AbstractCoroutine* CP);
static void setCoroutineScheduledAction(AbstractCoroutine* CP, Action&& action);
static Processor* getCoroutineProcessor(AbstractCoroutine* CP);
static v_int64 getCoroutineTimePoint(AbstractCoroutine* CP);
static void dismissAction(Action& action);
static AbstractCoroutine* nextCoroutine(AbstractCoroutine* CP);
public:
Worker(Type type)
@ -70,6 +73,7 @@ public:
virtual ~Worker() = default;
virtual void pushTasks(oatpp::collection::FastQueue<AbstractCoroutine>& tasks) = 0;
virtual void pushOneTask(AbstractCoroutine* task) = 0;
virtual void stop() = 0;

View File

@ -119,6 +119,19 @@ public:
}
}
void cutEntry(T* entry, T* prevEntry){
if(prevEntry == nullptr) {
popFront();
} else if(entry->_ref == nullptr) {
prevEntry->_ref = nullptr;
last = prevEntry;
} else {
prevEntry->_ref = entry->_ref;
}
}
void clear() {
T* curr = first;

View File

@ -50,5 +50,21 @@ void SpinLock::lock(Atom& atom) {
void SpinLock::unlock(Atom& atom) {
std::atomic_store_explicit(&atom, false, std::memory_order_release);
}
SpinLock::TryLock::TryLock(Atom& atom)
: m_atom(&atom)
, m_ownsLock(!std::atomic_exchange_explicit(m_atom, true, std::memory_order_acquire))
{}
SpinLock::TryLock::~TryLock() {
if(m_ownsLock) {
std::atomic_store_explicit(m_atom, false, std::memory_order_release);
}
}
bool SpinLock::TryLock::ownsLock() {
return m_ownsLock;
}
}}

View File

@ -65,6 +65,22 @@ public:
* @param atom - atomic boolean.
*/
static void unlock(Atom& atom);
public:
class TryLock {
private:
Atom* m_atom;
bool m_ownsLock;
public:
TryLock(Atom& atom);
~TryLock();
bool ownsLock();
};
};

View File

@ -245,7 +245,7 @@ oatpp::async::CoroutineStarter ChunkedBuffer::flushToStreamAsync(const std::shar
, m_bytesLeft(chunkedBuffer->m_size)
, m_currData(nullptr)
, m_currDataSize(0)
, m_nextAction(Action::TYPE_FINISH)
, m_nextAction(Action::createActionByType(Action::TYPE_FINISH))
{}
Action act() override {

View File

@ -281,7 +281,7 @@ oatpp::async::CoroutineStarter transferAsync(const std::shared_ptr<InputStream>&
if(m_transferSize == 0) {
return finish();
}
return Action(Action::TYPE_ERROR);
return propagateError();
}
};
@ -295,9 +295,9 @@ namespace {
oatpp::async::Action asyncActionOnIOError(oatpp::async::AbstractCoroutine* coroutine, data::v_io_size res) {
switch (res) {
case IOError::WAIT_RETRY:
return oatpp::async::Action::TYPE_WAIT_RETRY;
return oatpp::async::Action::createIOWaitAction();
case IOError::RETRY:
return oatpp::async::Action::TYPE_REPEAT;
return oatpp::async::Action::createIORepeatAction();
case IOError::BROKEN_PIPE:
return coroutine->error(oatpp::data::AsyncIOError::ERROR_BROKEN_PIPE);
case IOError::ZERO_VALUE:
@ -319,7 +319,7 @@ oatpp::async::Action writeExactSizeDataAsyncInline(oatpp::async::AbstractCorouti
data = &((p_char8) data)[res];
size -= res;
if (size > 0) {
return oatpp::async::Action::TYPE_REPEAT;
return oatpp::async::Action::createIORepeatAction();
}
} else {
return asyncActionOnIOError(coroutine, res);
@ -359,7 +359,7 @@ oatpp::async::Action readExactSizeDataAsyncInline(oatpp::async::AbstractCoroutin
data = &((p_char8) data)[res];
size -= res;
if (size > 0) {
return oatpp::async::Action::TYPE_REPEAT;
return oatpp::async::Action::createIORepeatAction();
}
} else {
return asyncActionOnIOError(coroutine, res);

View File

@ -197,9 +197,9 @@ oatpp::async::CoroutineStarterForResult<const std::shared_ptr<oatpp::data::strea
return _return(oatpp::network::Connection::createShared(m_clientHandle));
}
if(errno == EALREADY || errno == EINPROGRESS) {
return waitRetry();
return ioWait();
} else if(errno == EINTR) {
return repeat();
return ioRepeat();
}
::close(m_clientHandle);

View File

@ -74,7 +74,7 @@ oatpp::async::CoroutineStarterForResult<const std::shared_ptr<oatpp::data::strea
if(m_submission){
return yieldTo(&ConnectCoroutine::obtainSocket);
}
return waitRetry();
return waitRepeat(std::chrono::milliseconds(100));
}
Action obtainSocket() {
@ -89,7 +89,7 @@ oatpp::async::CoroutineStarterForResult<const std::shared_ptr<oatpp::data::strea
return _return(socket);
}
return waitRetry();
return waitRepeat(std::chrono::milliseconds(100));
}
return error<Error>("[oatpp::network::virtual_::client::ConnectionProvider::getConnectionAsync()]: Error. Can't connect.");

View File

@ -146,10 +146,10 @@ RequestHeadersReader::readHeadersAsync(const std::shared_ptr<oatpp::data::stream
}
}
return waitRetry();
return ioRepeat();
} else if(res == data::IOError::WAIT_RETRY || res == data::IOError::RETRY) {
return waitRetry();
return ioWait();
} else if(res == data::IOError::BROKEN_PIPE){
return error(oatpp::data::AsyncIOError::ERROR_BROKEN_PIPE);
} else if(res == data::IOError::ZERO_VALUE){

View File

@ -145,10 +145,10 @@ ResponseHeadersReader::readHeadersAsync(const std::shared_ptr<oatpp::data::strea
}
}
return waitRetry();
return ioRepeat();
} else if(res == data::IOError::WAIT_RETRY || res == data::IOError::RETRY) {
return waitRetry();
return ioWait();
} else if(res == data::IOError::BROKEN_PIPE) {
return error(oatpp::data::AsyncIOError::ERROR_BROKEN_PIPE);
} else if(res == data::IOError::ZERO_VALUE) {

View File

@ -138,8 +138,8 @@ oatpp::async::CoroutineStarter SimpleBodyDecoder::doChunkedDecodingAsync(const s
ChunkedDecoder(const std::shared_ptr<oatpp::data::stream::InputStream>& fromStream,
const std::shared_ptr<oatpp::data::stream::OutputStream>& toStream)
: m_fromStream(fromStream)
, m_toStream(toStream)
: m_fromStream(fromStream)
, m_toStream(toStream)
{}
Action act() override {
@ -151,9 +151,9 @@ oatpp::async::CoroutineStarter SimpleBodyDecoder::doChunkedDecodingAsync(const s
Action readLineChar() {
auto res = m_fromStream->read(&m_lineChar, 1);
if(res == data::IOError::WAIT_RETRY) {
return oatpp::async::Action::TYPE_WAIT_RETRY;
return oatpp::async::Action::createIOWaitAction();
} else if(res == data::IOError::RETRY) {
return oatpp::async::Action::TYPE_REPEAT;
return oatpp::async::Action::createIORepeatAction();
} else if( res < 0) {
return error<Error>("[BodyDecoder::ChunkedDecoder] Can't read line char");
}

View File

@ -70,7 +70,7 @@ ChunkedBufferBody::WriteToStreamCoroutine::WriteToStreamCoroutine(const std::sha
, m_currChunk(m_chunks->getFirstNode())
, m_currData(nullptr)
, m_currDataSize(0)
, m_nextAction(Action::TYPE_FINISH)
, m_nextAction(Action::createActionByType(Action::TYPE_FINISH))
{}
async::Action ChunkedBufferBody::WriteToStreamCoroutine::act() {

View File

@ -180,13 +180,13 @@ HttpProcessor::Coroutine::Action HttpProcessor::Coroutine::handleError(const std
if(error->is<oatpp::data::AsyncIOError>()) {
auto aioe = static_cast<const oatpp::data::AsyncIOError*>(error.get());
if(aioe->getCode() == oatpp::data::IOError::BROKEN_PIPE) {
return Action::TYPE_ERROR; // do not report BROKEN_PIPE error
return propagateError(); // do not report BROKEN_PIPE error
}
}
if(m_currentResponse) {
OATPP_LOGE("[oatpp::web::server::HttpProcessor::Coroutine::handleError()]", "Unhandled error. '%s'. Dropping connection", error->what());
return Action::TYPE_ERROR;
return propagateError();
}
m_currentResponse = m_errorHandler->handleError(protocol::http::Status::CODE_500, error->what());
@ -194,7 +194,7 @@ HttpProcessor::Coroutine::Action HttpProcessor::Coroutine::handleError(const std
}
return Action::TYPE_ERROR;
return propagateError();
}