mirror of
https://github.com/oatpp/oatpp.git
synced 2024-11-27 08:30:07 +08:00
async: Fix CoroutineWaitList with timeout
This commit is contained in:
parent
d9fd5854bf
commit
316b079f0a
@ -76,17 +76,17 @@ CoroutineStarter ConditionVariable::wait(Lock* lock, std::function<bool()> condi
|
||||
|
||||
}
|
||||
|
||||
CoroutineStarter ConditionVariable::waitUntil(Lock* lock, std::function<bool()> condition, const std::chrono::steady_clock::time_point& timeoutTime) {
|
||||
CoroutineStarter ConditionVariable::waitUntil(Lock* lock, std::function<bool()> condition, const std::chrono::system_clock::time_point& timeoutTime) {
|
||||
|
||||
class WaitCoroutine : public Coroutine<WaitCoroutine> {
|
||||
private:
|
||||
ConditionVariable* m_cv;
|
||||
oatpp::async::LockGuard m_lockGuard;
|
||||
std::function<bool()> m_condition;
|
||||
std::chrono::steady_clock::time_point m_timeoutTime;
|
||||
std::chrono::system_clock::time_point m_timeoutTime;
|
||||
public:
|
||||
|
||||
WaitCoroutine(ConditionVariable* cv, Lock* lock, std::function<bool()> condition, const std::chrono::steady_clock::time_point& timeoutTime)
|
||||
WaitCoroutine(ConditionVariable* cv, Lock* lock, std::function<bool()> condition, const std::chrono::system_clock::time_point& timeoutTime)
|
||||
: m_cv(cv)
|
||||
, m_lockGuard(lock)
|
||||
, m_condition(condition)
|
||||
@ -108,18 +108,18 @@ CoroutineStarter ConditionVariable::waitUntil(Lock* lock, std::function<bool()>
|
||||
m_lockGuard.unlock();
|
||||
OATPP_LOGD("WaitCoroutine", "UnLocked")
|
||||
} else {
|
||||
if(std::chrono::steady_clock::now() > m_timeoutTime) {
|
||||
if(std::chrono::system_clock::now() > m_timeoutTime) {
|
||||
return finish();
|
||||
}
|
||||
return yieldTo(&WaitCoroutine::act);
|
||||
}
|
||||
|
||||
if(std::chrono::steady_clock::now() > m_timeoutTime) {
|
||||
if(std::chrono::system_clock::now() > m_timeoutTime) {
|
||||
return finish();
|
||||
}
|
||||
|
||||
OATPP_LOGD("WaitCoroutine", "Sleeeeep")
|
||||
return Action::createWaitListActionWithTimeout(&m_cv->m_list, m_timeoutTime);
|
||||
return Action::createWaitListAction(&m_cv->m_list, m_timeoutTime);
|
||||
}
|
||||
|
||||
};
|
||||
@ -129,7 +129,7 @@ CoroutineStarter ConditionVariable::waitUntil(Lock* lock, std::function<bool()>
|
||||
}
|
||||
|
||||
CoroutineStarter ConditionVariable::waitFor(Lock* lock, std::function<bool()> condition, const std::chrono::duration<v_int64, std::micro>& timeout) {
|
||||
return waitUntil(lock, condition, std::chrono::steady_clock::now() + timeout);
|
||||
return waitUntil(lock, condition, std::chrono::system_clock::now() + timeout);
|
||||
}
|
||||
|
||||
void ConditionVariable::notifyFirst() {
|
||||
|
@ -60,7 +60,7 @@ public:
|
||||
ConditionVariable();
|
||||
|
||||
CoroutineStarter wait(Lock* lock, std::function<bool()> condition);
|
||||
CoroutineStarter waitUntil(Lock* lock, std::function<bool()> condition, const std::chrono::steady_clock::time_point& timeoutTime);
|
||||
CoroutineStarter waitUntil(Lock* lock, std::function<bool()> condition, const std::chrono::system_clock::time_point& timeoutTime);
|
||||
CoroutineStarter waitFor(Lock* lock, std::function<bool()>, const std::chrono::duration<v_int64, std::micro>& timeout);
|
||||
|
||||
void notifyFirst();
|
||||
|
@ -30,6 +30,8 @@ namespace oatpp { namespace async {
|
||||
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
|
||||
// Action
|
||||
|
||||
const std::chrono::system_clock::time_point Action::TIME_ZERO = std::chrono::system_clock::from_time_t(0);
|
||||
|
||||
Action Action::clone(const Action& action) {
|
||||
Action result(action.m_type);
|
||||
result.m_data = action.m_data;
|
||||
@ -60,16 +62,10 @@ Action Action::createWaitRepeatAction(v_int64 timePointMicroseconds) {
|
||||
return result;
|
||||
}
|
||||
|
||||
Action Action::createWaitListAction(CoroutineWaitList* waitList) {
|
||||
Action Action::createWaitListAction(CoroutineWaitList* waitList, const std::chrono::system_clock::time_point& timeoutTime) {
|
||||
Action result(TYPE_WAIT_LIST);
|
||||
result.m_data.waitList = waitList;
|
||||
return result;
|
||||
}
|
||||
|
||||
Action Action::createWaitListActionWithTimeout(CoroutineWaitList* waitList, const std::chrono::steady_clock::time_point& timeout) {
|
||||
Action result(TYPE_WAIT_LIST_WITH_TIMEOUT);
|
||||
result.m_data.waitListWithTimeout.waitList = waitList;
|
||||
result.m_data.waitListWithTimeout.timeoutTimeSinceEpochMS = std::chrono::duration_cast<std::chrono::milliseconds>(timeout.time_since_epoch()).count();
|
||||
result.m_data.waitListData.waitList = waitList;
|
||||
result.m_data.waitListData.timePointMicroseconds = std::chrono::duration_cast<std::chrono::microseconds>(timeoutTime.time_since_epoch()).count();
|
||||
return result;
|
||||
}
|
||||
|
||||
|
@ -63,6 +63,8 @@ public:
|
||||
typedef Action (AbstractCoroutine::*FunctionPtr)();
|
||||
public:
|
||||
|
||||
static const std::chrono::system_clock::time_point TIME_ZERO;
|
||||
|
||||
/**
|
||||
* None - invalid Action.
|
||||
*/
|
||||
@ -179,9 +181,9 @@ private:
|
||||
IOEventType ioEventType;
|
||||
};
|
||||
|
||||
struct WaitListWithTimeout {
|
||||
struct WaitListData {
|
||||
CoroutineWaitList* waitList;
|
||||
v_int64 timeoutTimeSinceEpochMS;
|
||||
v_int64 timePointMicroseconds;
|
||||
};
|
||||
|
||||
private:
|
||||
@ -191,8 +193,7 @@ private:
|
||||
Error* error;
|
||||
IOData ioData;
|
||||
v_int64 timePointMicroseconds;
|
||||
CoroutineWaitList* waitList;
|
||||
WaitListWithTimeout waitListWithTimeout;
|
||||
WaitListData waitListData;
|
||||
};
|
||||
private:
|
||||
mutable v_int32 m_type;
|
||||
@ -250,17 +251,10 @@ public:
|
||||
/**
|
||||
* Create TYPE_WAIT_LIST Action.
|
||||
* @param waitList - wait-list to put coroutine on.
|
||||
* @param timeoutTime - latest time point at which the coroutine should be continued.
|
||||
* @return - Action.
|
||||
*/
|
||||
static Action createWaitListAction(CoroutineWaitList* waitList);
|
||||
|
||||
/**
|
||||
* Create TYPE_WAIT_LIST_WITH_TIMEOUT Action.
|
||||
* @param waitList - wait-list to put coroutine on.
|
||||
* @param timeout - latest time point at which the coroutine should be continued.
|
||||
* @return - Action.
|
||||
*/
|
||||
static Action createWaitListActionWithTimeout(CoroutineWaitList* waitList, const std::chrono::steady_clock::time_point& timeout);
|
||||
static Action createWaitListAction(CoroutineWaitList* waitList, const std::chrono::system_clock::time_point& timeoutTime = TIME_ZERO);
|
||||
|
||||
/**
|
||||
* Constructor. Create start-coroutine Action.
|
||||
@ -428,11 +422,11 @@ public:
|
||||
typedef oatpp::async::Error Error;
|
||||
typedef Action (AbstractCoroutine::*FunctionPtr)();
|
||||
private:
|
||||
Processor* _PP;
|
||||
AbstractCoroutine* _CP;
|
||||
FunctionPtr _FP;
|
||||
oatpp::async::Action _SCH_A;
|
||||
CoroutineHandle* _ref;
|
||||
Processor* _PP; // Processor pointer
|
||||
AbstractCoroutine* _CP; // Coroutine pointer
|
||||
FunctionPtr _FP; // Function pointer
|
||||
oatpp::async::Action _SCH_A; // Scheduled action
|
||||
CoroutineHandle* _ref; // pointer to next coroutine handle in list
|
||||
public:
|
||||
|
||||
CoroutineHandle(Processor* processor, AbstractCoroutine* rootCoroutine);
|
||||
|
@ -35,151 +35,66 @@ namespace oatpp { namespace async {
|
||||
CoroutineWaitList::CoroutineWaitList(CoroutineWaitList&& other) {
|
||||
{
|
||||
std::lock_guard<std::mutex> lock{other.m_lock};
|
||||
m_list = std::move(other.m_list);
|
||||
m_coroutines = std::move(other.m_coroutines);
|
||||
}
|
||||
{
|
||||
std::lock_guard<std::mutex> lock{other.m_timeoutsLock};
|
||||
m_coroutinesWithTimeout = std::move(other.m_coroutinesWithTimeout);
|
||||
|
||||
m_timeoutCheckingProcessors = std::move(other.m_timeoutCheckingProcessors);
|
||||
for (const std::pair<Processor*, v_int64>& entry : m_timeoutCheckingProcessors) {
|
||||
Processor* processor = entry.first;
|
||||
processor->removeCoroutineWaitListWithTimeouts(std::addressof(other));
|
||||
processor->addCoroutineWaitListWithTimeouts(this);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
CoroutineWaitList::~CoroutineWaitList() {
|
||||
notifyAll();
|
||||
}
|
||||
|
||||
void CoroutineWaitList::checkCoroutinesForTimeouts() {
|
||||
std::lock_guard<std::mutex> listLock{m_lock};
|
||||
std::lock_guard<std::mutex> lock{m_timeoutsLock};
|
||||
const auto currentTimeSinceEpochMS = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now().time_since_epoch()).count();
|
||||
const auto newEndIt = std::remove_if(std::begin(m_coroutinesWithTimeout), std::end(m_coroutinesWithTimeout), [&](const std::pair<CoroutineHandle*, v_int64>& entry) {
|
||||
return currentTimeSinceEpochMS > entry.second;
|
||||
});
|
||||
|
||||
for (CoroutineHandle* curr = m_list.first, *prev = nullptr; !m_list.empty() && m_list.last->_ref != curr; curr = curr->_ref) {
|
||||
const bool removeFromWaitList = std::any_of(newEndIt, std::end(m_coroutinesWithTimeout), [=](const std::pair<CoroutineHandle*, v_int64>& entry) {
|
||||
return entry.first == curr;
|
||||
});
|
||||
if (!removeFromWaitList) {
|
||||
prev = curr;
|
||||
continue;
|
||||
}
|
||||
|
||||
m_list.cutEntry(curr, prev);
|
||||
|
||||
if (--m_timeoutCheckingProcessors[curr->_PP] <= 0) {
|
||||
curr->_PP->removeCoroutineWaitListWithTimeouts(this);
|
||||
m_timeoutCheckingProcessors.erase(curr->_PP);
|
||||
}
|
||||
curr->_PP->pushOneTask(curr);
|
||||
}
|
||||
|
||||
m_coroutinesWithTimeout.erase(newEndIt, std::end(m_coroutinesWithTimeout));
|
||||
}
|
||||
|
||||
void CoroutineWaitList::setListener(Listener* listener) {
|
||||
m_listener = listener;
|
||||
}
|
||||
|
||||
void CoroutineWaitList::pushFront(CoroutineHandle* coroutine) {
|
||||
void CoroutineWaitList::add(CoroutineHandle* coroutine) {
|
||||
{
|
||||
std::lock_guard<std::mutex> lock(m_lock);
|
||||
m_list.pushFront(coroutine);
|
||||
m_coroutines.insert(coroutine);
|
||||
}
|
||||
if(m_listener != nullptr) {
|
||||
m_listener->onNewItem(*this);
|
||||
}
|
||||
}
|
||||
|
||||
void CoroutineWaitList::pushFront(CoroutineHandle* coroutine, v_int64 timeoutTimeSinceEpochMS) {
|
||||
{
|
||||
std::lock_guard<std::mutex> lock{m_timeoutsLock};
|
||||
m_coroutinesWithTimeout.emplace_back(coroutine, timeoutTimeSinceEpochMS);
|
||||
if (++m_timeoutCheckingProcessors[coroutine->_PP] == 1) {
|
||||
coroutine->_PP->addCoroutineWaitListWithTimeouts(this);
|
||||
}
|
||||
}
|
||||
pushFront(coroutine);
|
||||
}
|
||||
|
||||
void CoroutineWaitList::pushBack(CoroutineHandle* coroutine) {
|
||||
{
|
||||
std::lock_guard<std::mutex> lock(m_lock);
|
||||
m_list.pushBack(coroutine);
|
||||
}
|
||||
if(m_listener != nullptr) {
|
||||
m_listener->onNewItem(*this);
|
||||
}
|
||||
}
|
||||
|
||||
void CoroutineWaitList::pushBack(CoroutineHandle* coroutine, v_int64 timeoutTimeSinceEpochMS) {
|
||||
{
|
||||
std::lock_guard<std::mutex> lock{m_timeoutsLock};
|
||||
m_coroutinesWithTimeout.emplace_back(coroutine, timeoutTimeSinceEpochMS);
|
||||
if (++m_timeoutCheckingProcessors[coroutine->_PP] == 1) {
|
||||
coroutine->_PP->addCoroutineWaitListWithTimeouts(this);
|
||||
}
|
||||
}
|
||||
pushBack(coroutine);
|
||||
}
|
||||
|
||||
void CoroutineWaitList::notifyFirst() {
|
||||
std::lock_guard<std::mutex> lock{m_lock};
|
||||
if(m_list.first) {
|
||||
removeFirstCoroutine();
|
||||
std::lock_guard<std::mutex> lock(m_lock);
|
||||
if(!m_coroutines.empty()) {
|
||||
removeCoroutine(*m_coroutines.begin());
|
||||
}
|
||||
}
|
||||
|
||||
void CoroutineWaitList::notifyAll() {
|
||||
std::lock_guard<std::mutex> lock(m_lock);
|
||||
while (!m_list.empty()) {
|
||||
removeFirstCoroutine();
|
||||
while (!m_coroutines.empty()) {
|
||||
removeCoroutine(*m_coroutines.begin());
|
||||
}
|
||||
}
|
||||
|
||||
void CoroutineWaitList::removeFirstCoroutine() {
|
||||
auto coroutine = m_list.popFront();
|
||||
|
||||
{
|
||||
std::lock_guard<std::mutex> lock{m_timeoutsLock};
|
||||
if (--m_timeoutCheckingProcessors[coroutine->_PP] <= 0) {
|
||||
coroutine->_PP->removeCoroutineWaitListWithTimeouts(this);
|
||||
m_timeoutCheckingProcessors.erase(coroutine->_PP);
|
||||
}
|
||||
}
|
||||
void CoroutineWaitList::removeCoroutine(CoroutineHandle* coroutine) {
|
||||
m_coroutines.erase(coroutine);
|
||||
coroutine->_PP->wakeCoroutine(coroutine);
|
||||
}
|
||||
|
||||
coroutine->_PP->pushOneTask(coroutine);
|
||||
void CoroutineWaitList::forgetCoroutine(CoroutineHandle *coroutine) {
|
||||
std::lock_guard<std::mutex> lock(m_lock);
|
||||
m_coroutines.erase(coroutine);
|
||||
}
|
||||
|
||||
CoroutineWaitList& CoroutineWaitList::operator=(CoroutineWaitList&& other) {
|
||||
if (this == std::addressof(other)) return *this;
|
||||
|
||||
notifyAll();
|
||||
|
||||
{
|
||||
std::lock_guard<std::mutex> otherLock{other.m_lock};
|
||||
std::lock_guard<std::mutex> myLock{m_lock};
|
||||
m_list = std::move(other.m_list);
|
||||
}
|
||||
{
|
||||
std::lock_guard<std::mutex> otherLock{other.m_timeoutsLock};
|
||||
std::lock_guard<std::mutex> myLock{m_timeoutsLock};
|
||||
m_coroutinesWithTimeout = std::move(other.m_coroutinesWithTimeout);
|
||||
|
||||
m_timeoutCheckingProcessors = std::move(other.m_timeoutCheckingProcessors);
|
||||
for (const std::pair<Processor*, v_int64>& entry : m_timeoutCheckingProcessors) {
|
||||
Processor* processor = entry.first;
|
||||
processor->removeCoroutineWaitListWithTimeouts(std::addressof(other));
|
||||
processor->addCoroutineWaitListWithTimeouts(this);
|
||||
}
|
||||
}
|
||||
return *this;
|
||||
if (this == std::addressof(other)) return *this;
|
||||
|
||||
notifyAll();
|
||||
|
||||
{
|
||||
std::lock_guard<std::mutex> otherLock(other.m_lock);
|
||||
std::lock_guard<std::mutex> myLock(m_lock);
|
||||
m_coroutines = std::move(other.m_coroutines);
|
||||
}
|
||||
|
||||
return *this;
|
||||
|
||||
}
|
||||
|
||||
}}
|
@ -27,13 +27,9 @@
|
||||
#define oatpp_async_CoroutineWaitList_hpp
|
||||
|
||||
#include "oatpp/core/async/Coroutine.hpp"
|
||||
#include "oatpp/core/async/utils/FastQueue.hpp"
|
||||
|
||||
#include "oatpp/core/concurrency/SpinLock.hpp"
|
||||
#include <map>
|
||||
#include <unordered_set>
|
||||
#include <mutex>
|
||||
#include <thread>
|
||||
#include <utility>
|
||||
|
||||
namespace oatpp { namespace async {
|
||||
|
||||
@ -60,49 +56,20 @@ public:
|
||||
virtual void onNewItem(CoroutineWaitList& list) = 0;
|
||||
};
|
||||
private:
|
||||
utils::FastQueue<CoroutineHandle> m_list;
|
||||
std::unordered_set<CoroutineHandle*> m_coroutines;
|
||||
std::mutex m_lock;
|
||||
Listener* m_listener = nullptr;
|
||||
|
||||
std::map<Processor*, v_int64> m_timeoutCheckingProcessors;
|
||||
std::vector<std::pair<CoroutineHandle*, v_int64>> m_coroutinesWithTimeout;
|
||||
std::mutex m_timeoutsLock;
|
||||
|
||||
private:
|
||||
void checkCoroutinesForTimeouts();
|
||||
|
||||
void removeFirstCoroutine();
|
||||
|
||||
void removeCoroutine(CoroutineHandle* coroutine); //<-- Calls Processor
|
||||
void forgetCoroutine(CoroutineHandle* coroutine); //<-- Called From Processor
|
||||
protected:
|
||||
/*
|
||||
* Put coroutine on wait-list.
|
||||
* This method should be called by Coroutine Processor only.
|
||||
* @param coroutine
|
||||
*/
|
||||
void pushFront(CoroutineHandle* coroutine);
|
||||
void add(CoroutineHandle* coroutine);
|
||||
|
||||
/*
|
||||
* Put coroutine on wait-list with timeout.
|
||||
* This method should be called by Coroutine Processor only.
|
||||
* @param coroutine
|
||||
* @param timeoutTimeSinceEpochMS
|
||||
*/
|
||||
void pushFront(CoroutineHandle* coroutine, v_int64 timeoutTimeSinceEpochMS);
|
||||
|
||||
/*
|
||||
* Put coroutine on wait-list.
|
||||
* This method should be called by Coroutine Processor only.
|
||||
* @param coroutine
|
||||
*/
|
||||
void pushBack(CoroutineHandle* coroutine);
|
||||
|
||||
/*
|
||||
* Put coroutine on wait-list with timeout.
|
||||
* This method should be called by Coroutine Processor only.
|
||||
* @param coroutine
|
||||
* @param timeoutTimeSinceEpochMS
|
||||
*/
|
||||
void pushBack(CoroutineHandle* coroutine, v_int64 timeoutTimeSinceEpochMS);
|
||||
public:
|
||||
|
||||
/**
|
||||
|
@ -29,38 +29,6 @@
|
||||
|
||||
namespace oatpp { namespace async {
|
||||
|
||||
void Processor::checkCoroutinesForTimeouts() {
|
||||
while (m_running) {
|
||||
{
|
||||
std::unique_lock<std::recursive_mutex> lock{m_coroutineWaitListsWithTimeoutsMutex};
|
||||
while (m_coroutineWaitListsWithTimeouts.empty()) {
|
||||
m_coroutineWaitListsWithTimeoutsCV.wait(lock);
|
||||
if (!m_running) return;
|
||||
}
|
||||
|
||||
const auto coroutineWaitListsWithTimeouts = m_coroutineWaitListsWithTimeouts;
|
||||
for (CoroutineWaitList* waitList : coroutineWaitListsWithTimeouts) {
|
||||
waitList->checkCoroutinesForTimeouts();
|
||||
}
|
||||
}
|
||||
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds{100});
|
||||
}
|
||||
}
|
||||
|
||||
void Processor::addCoroutineWaitListWithTimeouts(CoroutineWaitList* waitList) {
|
||||
{
|
||||
std::lock_guard<std::recursive_mutex> lock{m_coroutineWaitListsWithTimeoutsMutex};
|
||||
m_coroutineWaitListsWithTimeouts.insert(waitList);
|
||||
}
|
||||
m_coroutineWaitListsWithTimeoutsCV.notify_one();
|
||||
}
|
||||
|
||||
void Processor::removeCoroutineWaitListWithTimeouts(CoroutineWaitList* waitList) {
|
||||
std::lock_guard<std::recursive_mutex> lock{m_coroutineWaitListsWithTimeoutsMutex};
|
||||
m_coroutineWaitListsWithTimeouts.erase(waitList);
|
||||
}
|
||||
|
||||
void Processor::addWorker(const std::shared_ptr<worker::Worker>& worker) {
|
||||
|
||||
switch(worker->getType()) {
|
||||
@ -126,13 +94,9 @@ void Processor::addCoroutine(CoroutineHandle* coroutine) {
|
||||
break;
|
||||
|
||||
case Action::TYPE_WAIT_LIST:
|
||||
coroutine->_SCH_A = Action::createActionByType(Action::TYPE_NONE);
|
||||
action.m_data.waitList->pushBack(coroutine);
|
||||
break;
|
||||
|
||||
case Action::TYPE_WAIT_LIST_WITH_TIMEOUT:
|
||||
coroutine->_SCH_A = Action::createActionByType(Action::TYPE_NONE);
|
||||
action.m_data.waitListWithTimeout.waitList->pushBack(coroutine, action.m_data.waitListWithTimeout.timeoutTimeSinceEpochMS);
|
||||
coroutine->_SCH_A = Action::clone(action);
|
||||
putCoroutineToSleep(coroutine);
|
||||
action.m_data.waitListData.waitList->add(coroutine);
|
||||
break;
|
||||
|
||||
default:
|
||||
@ -210,6 +174,56 @@ void Processor::pushQueues() {
|
||||
|
||||
}
|
||||
|
||||
void Processor::putCoroutineToSleep(CoroutineHandle* ch) {
|
||||
if(ch->_SCH_A.m_data.waitListData.timePointMicroseconds == 0) {
|
||||
std::lock_guard<std::mutex> lock(m_sleepMutex);
|
||||
m_sleepNoTimeSet.insert(ch);
|
||||
} else {
|
||||
std::lock_guard<std::mutex> lock(m_sleepMutex);
|
||||
m_sleepTimeSet.insert(ch);
|
||||
m_sleepCV.notify_one();
|
||||
}
|
||||
}
|
||||
|
||||
void Processor::wakeCoroutine(CoroutineHandle* ch) {
|
||||
if(ch->_SCH_A.m_data.waitListData.timePointMicroseconds == 0) {
|
||||
std::lock_guard<std::mutex> lock(m_sleepMutex);
|
||||
m_sleepNoTimeSet.erase(ch);
|
||||
} else {
|
||||
std::lock_guard<std::mutex> lock(m_sleepMutex);
|
||||
m_sleepTimeSet.erase(ch);
|
||||
}
|
||||
ch->_SCH_A = Action::createActionByType(Action::TYPE_NONE);
|
||||
pushOneTask(ch);
|
||||
}
|
||||
|
||||
void Processor::checkCoroutinesSleep() {
|
||||
while (m_running) {
|
||||
{
|
||||
std::unique_lock<std::mutex> lock{m_sleepMutex};
|
||||
while (m_running && m_sleepTimeSet.empty()) {
|
||||
m_sleepCV.wait(lock);
|
||||
}
|
||||
|
||||
auto now = oatpp::base::Environment::getMicroTickCount();
|
||||
for(auto it = m_sleepTimeSet.begin(); it != m_sleepTimeSet.end();) {
|
||||
auto ch = *it;
|
||||
if(ch->_SCH_A.m_data.waitListData.timePointMicroseconds < now) {
|
||||
it = m_sleepTimeSet.erase(it);
|
||||
ch->_SCH_A.m_data.waitListData.waitList->forgetCoroutine(ch);
|
||||
ch->_SCH_A = Action::createActionByType(Action::TYPE_NONE);
|
||||
pushOneTask(ch);
|
||||
} else {
|
||||
it ++;
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
if(m_running) std::this_thread::sleep_for(std::chrono::milliseconds{100});
|
||||
}
|
||||
}
|
||||
|
||||
bool Processor::iterate(v_int32 numIterations) {
|
||||
|
||||
pushQueues();
|
||||
@ -242,15 +256,10 @@ bool Processor::iterate(v_int32 numIterations) {
|
||||
break;
|
||||
|
||||
case Action::TYPE_WAIT_LIST:
|
||||
CP->_SCH_A = Action::createActionByType(Action::TYPE_NONE);
|
||||
CP->_SCH_A = Action::clone(action);
|
||||
m_queue.popFront();
|
||||
action.m_data.waitList->pushBack(CP);
|
||||
break;
|
||||
|
||||
case Action::TYPE_WAIT_LIST_WITH_TIMEOUT:
|
||||
CP->_SCH_A = Action::createActionByType(Action::TYPE_NONE);
|
||||
m_queue.popFront();
|
||||
action.m_data.waitListWithTimeout.waitList->pushBack(CP, action.m_data.waitListWithTimeout.timeoutTimeSinceEpochMS);
|
||||
putCoroutineToSleep(CP);
|
||||
action.m_data.waitListData.waitList->add(CP);
|
||||
break;
|
||||
|
||||
default:
|
||||
@ -274,9 +283,10 @@ void Processor::stop() {
|
||||
m_running = false;
|
||||
}
|
||||
m_taskCondition.notify_one();
|
||||
m_sleepCV.notify_one();
|
||||
|
||||
m_sleepSetTask.join();
|
||||
|
||||
m_coroutineWaitListsWithTimeoutsCV.notify_one();
|
||||
m_coroutineWaitListTimeoutChecker.join();
|
||||
}
|
||||
|
||||
v_int32 Processor::getTasksCount() {
|
||||
|
@ -30,6 +30,7 @@
|
||||
#include "./CoroutineWaitList.hpp"
|
||||
#include "oatpp/core/async/utils/FastQueue.hpp"
|
||||
|
||||
#include <thread>
|
||||
#include <condition_variable>
|
||||
#include <list>
|
||||
#include <mutex>
|
||||
@ -99,6 +100,15 @@ private:
|
||||
v_uint32 m_ioBalancer = 0;
|
||||
v_uint32 m_timerBalancer = 0;
|
||||
|
||||
private:
|
||||
|
||||
std::unordered_set<CoroutineHandle*> m_sleepNoTimeSet;
|
||||
std::unordered_set<CoroutineHandle*> m_sleepTimeSet;
|
||||
std::mutex m_sleepMutex;
|
||||
std::condition_variable m_sleepCV;
|
||||
|
||||
std::thread m_sleepSetTask{&Processor::checkCoroutinesSleep, this};
|
||||
|
||||
private:
|
||||
|
||||
oatpp::concurrency::SpinLock m_taskLock;
|
||||
@ -111,21 +121,8 @@ private:
|
||||
utils::FastQueue<CoroutineHandle> m_queue;
|
||||
|
||||
private:
|
||||
|
||||
std::atomic_bool m_running{true};
|
||||
std::atomic<v_int32> m_tasksCounter{0};
|
||||
|
||||
private:
|
||||
|
||||
std::recursive_mutex m_coroutineWaitListsWithTimeoutsMutex;
|
||||
std::condition_variable_any m_coroutineWaitListsWithTimeoutsCV;
|
||||
std::set<CoroutineWaitList*> m_coroutineWaitListsWithTimeouts;
|
||||
std::thread m_coroutineWaitListTimeoutChecker{&Processor::checkCoroutinesForTimeouts, this};
|
||||
|
||||
void checkCoroutinesForTimeouts();
|
||||
void addCoroutineWaitListWithTimeouts(CoroutineWaitList* waitList);
|
||||
void removeCoroutineWaitListWithTimeouts(CoroutineWaitList* waitList);
|
||||
|
||||
private:
|
||||
|
||||
void popIOTask(CoroutineHandle* coroutine);
|
||||
@ -136,6 +133,10 @@ private:
|
||||
void popTasks();
|
||||
void pushQueues();
|
||||
|
||||
void putCoroutineToSleep(CoroutineHandle* ch);
|
||||
void wakeCoroutine(CoroutineHandle* ch);
|
||||
void checkCoroutinesSleep();
|
||||
|
||||
public:
|
||||
|
||||
Processor() = default;
|
||||
|
@ -172,7 +172,7 @@ public:
|
||||
virtual ~Processor() = default;
|
||||
|
||||
/**
|
||||
* If the client is using the input stream to read data and push it to the processor,
|
||||
* If the client is using the input stream to read data and add it to the processor,
|
||||
* the client MAY ask the processor for a suggested read size.
|
||||
* @return - suggested read size.
|
||||
*/
|
||||
@ -206,7 +206,7 @@ public:
|
||||
ProcessingPipeline(const std::vector<base::ObjectHandle<Processor>>& m_processors);
|
||||
|
||||
/**
|
||||
* If the client is using the input stream to read data and push it to the processor,
|
||||
* If the client is using the input stream to read data and add it to the processor,
|
||||
* the client MAY ask the processor for a suggested read size.
|
||||
* @return - suggested read size.
|
||||
*/
|
||||
|
@ -278,7 +278,7 @@ protected:
|
||||
class GetCoroutine : public oatpp::async::CoroutineWithResult<GetCoroutine, const provider::ResourceHandle<TResource>&> {
|
||||
private:
|
||||
std::shared_ptr<PoolTemplate> m_pool;
|
||||
std::chrono::steady_clock::time_point m_startTime{std::chrono::steady_clock::now()};
|
||||
std::chrono::system_clock::time_point m_startTime{std::chrono::system_clock::now()};
|
||||
public:
|
||||
|
||||
GetCoroutine(const std::shared_ptr<PoolTemplate>& pool)
|
||||
@ -286,7 +286,7 @@ protected:
|
||||
{}
|
||||
|
||||
bool timedout() const noexcept {
|
||||
return m_pool->m_timeout != std::chrono::microseconds::zero() && m_pool->m_timeout < (std::chrono::steady_clock::now() - m_startTime);
|
||||
return m_pool->m_timeout != std::chrono::microseconds::zero() && m_pool->m_timeout < (std::chrono::system_clock::now() - m_startTime);
|
||||
}
|
||||
|
||||
async::Action act() override {
|
||||
@ -301,7 +301,7 @@ protected:
|
||||
guard.unlock();
|
||||
return m_pool->m_timeout == std::chrono::microseconds::zero()
|
||||
? async::Action::createWaitListAction(&m_pool->m_waitList)
|
||||
: async::Action::createWaitListActionWithTimeout(&m_pool->m_waitList, m_startTime + m_pool->m_timeout);
|
||||
: async::Action::createWaitListAction(&m_pool->m_waitList, m_startTime + m_pool->m_timeout);
|
||||
}
|
||||
|
||||
if(!m_pool->m_running) {
|
||||
|
@ -43,7 +43,7 @@ private:
|
||||
public:
|
||||
|
||||
/**
|
||||
* If the client is using the input stream to read data and push it to the processor,
|
||||
* If the client is using the input stream to read data and add it to the processor,
|
||||
* the client MAY ask the processor for a suggested read size.
|
||||
* @return - suggested read size.
|
||||
*/
|
||||
@ -82,7 +82,7 @@ public:
|
||||
DecoderChunked();
|
||||
|
||||
/**
|
||||
* If the client is using the input stream to read data and push it to the processor,
|
||||
* If the client is using the input stream to read data and add it to the processor,
|
||||
* the client MAY ask the processor for a suggested read size.
|
||||
* @return - suggested read size.
|
||||
*/
|
||||
|
@ -74,11 +74,13 @@ namespace {
|
||||
void runTests() {
|
||||
|
||||
oatpp::base::Environment::printCompilationConfig();
|
||||
/*
|
||||
|
||||
OATPP_LOGD("Tests", "coroutine handle size=%d", sizeof(oatpp::async::CoroutineHandle));
|
||||
OATPP_LOGD("Tests", "coroutine size=%d", sizeof(oatpp::async::AbstractCoroutine));
|
||||
OATPP_LOGD("Tests", "action size=%d", sizeof(oatpp::async::Action));
|
||||
OATPP_LOGD("Tests", "class count=%d", oatpp::data::mapping::type::ClassId::getClassCount());
|
||||
|
||||
/*
|
||||
auto names = oatpp::data::mapping::type::ClassId::getRegisteredClassNames();
|
||||
v_int32 i = 0;
|
||||
for(auto& name : names) {
|
||||
|
Loading…
Reference in New Issue
Block a user