mirror of
https://github.com/oatpp/oatpp.git
synced 2025-01-18 16:43:57 +08:00
Moved timeout-checking-thread to core::async::Processor
This commit is contained in:
parent
0ae1b0e7fd
commit
e8e0154121
@ -34,123 +34,102 @@ namespace oatpp { namespace async {
|
||||
|
||||
CoroutineWaitList::CoroutineWaitList(CoroutineWaitList&& other) {
|
||||
{
|
||||
std::lock_guard<oatpp::concurrency::SpinLock> lock{other.m_lock};
|
||||
m_list = std::move(other.m_list);
|
||||
std::lock_guard<oatpp::concurrency::SpinLock> lock{other.m_data->m_lock};
|
||||
m_data->m_list = std::move(other.m_data->m_list);
|
||||
}
|
||||
{
|
||||
std::lock_guard<oatpp::concurrency::SpinLock> lock{other.m_timeoutsLock};
|
||||
m_coroutinesWithTimeout = std::move(other.m_coroutinesWithTimeout);
|
||||
if (!m_coroutinesWithTimeout.empty()) {
|
||||
startTimeoutCheckerThread();
|
||||
}
|
||||
std::lock_guard<oatpp::concurrency::SpinLock> lock{other.m_data->m_timeoutsLock};
|
||||
m_data->m_coroutinesWithTimeout = std::move(other.m_data->m_coroutinesWithTimeout);
|
||||
}
|
||||
}
|
||||
|
||||
CoroutineWaitList::~CoroutineWaitList() {
|
||||
notifyAll();
|
||||
m_stop = true;
|
||||
if (m_thread.joinable()) {
|
||||
m_thread.join();
|
||||
}
|
||||
}
|
||||
|
||||
void CoroutineWaitList::startTimeoutCheckerThread() {
|
||||
m_thread = std::thread{[this]() { checkCoroutinesForTimeouts(); }};
|
||||
}
|
||||
|
||||
void CoroutineWaitList::checkCoroutinesForTimeouts() {
|
||||
while (!m_stop) {
|
||||
std::set<CoroutineHandle*> timedoutCoroutines;
|
||||
{
|
||||
std::lock_guard<oatpp::concurrency::SpinLock> lock{m_timeoutsLock};
|
||||
const auto currentTimeSinceEpochMS = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now().time_since_epoch()).count();
|
||||
const auto newEndIt = std::remove_if(std::begin(m_coroutinesWithTimeout), std::end(m_coroutinesWithTimeout), [&](const std::pair<CoroutineHandle*, v_int64>& entry) {
|
||||
if (currentTimeSinceEpochMS > entry.second) {
|
||||
timedoutCoroutines.insert(entry.first);
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
});
|
||||
m_coroutinesWithTimeout.erase(newEndIt, std::end(m_coroutinesWithTimeout));
|
||||
}
|
||||
if (!timedoutCoroutines.empty()) {
|
||||
std::lock_guard<oatpp::concurrency::SpinLock> lock{m_lock};
|
||||
CoroutineHandle* prev = nullptr;
|
||||
CoroutineHandle* curr = m_list.first;
|
||||
while (curr) {
|
||||
if (timedoutCoroutines.count(curr)) {
|
||||
m_list.cutEntry(curr, prev);
|
||||
curr->_PP->pushOneTask(curr);
|
||||
}
|
||||
prev = curr;
|
||||
curr = curr->_ref;
|
||||
void CoroutineWaitList::checkCoroutinesForTimeouts(const std::shared_ptr<Data>& data) {
|
||||
std::set<CoroutineHandle*> timedoutCoroutines;
|
||||
{
|
||||
std::lock_guard<oatpp::concurrency::SpinLock> lock{data->m_timeoutsLock};
|
||||
const auto currentTimeSinceEpochMS = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now().time_since_epoch()).count();
|
||||
const auto newEndIt = std::remove_if(std::begin(data->m_coroutinesWithTimeout), std::end(data->m_coroutinesWithTimeout), [&](const std::pair<CoroutineHandle*, v_int64>& entry) {
|
||||
if (currentTimeSinceEpochMS > entry.second) {
|
||||
timedoutCoroutines.insert(entry.first);
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
});
|
||||
data->m_coroutinesWithTimeout.erase(newEndIt, std::end(data->m_coroutinesWithTimeout));
|
||||
}
|
||||
if (!timedoutCoroutines.empty()) {
|
||||
std::lock_guard<oatpp::concurrency::SpinLock> lock{ data->m_lock};
|
||||
CoroutineHandle* prev = nullptr;
|
||||
CoroutineHandle* curr = data->m_list.first;
|
||||
while (curr) {
|
||||
if (timedoutCoroutines.count(curr)) {
|
||||
data->m_list.cutEntry(curr, prev);
|
||||
curr->_PP->pushOneTask(curr);
|
||||
}
|
||||
prev = curr;
|
||||
curr = curr->_ref;
|
||||
}
|
||||
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(100));
|
||||
}
|
||||
}
|
||||
|
||||
void CoroutineWaitList::setListener(Listener* listener) {
|
||||
m_listener = listener;
|
||||
m_data->m_listener = listener;
|
||||
}
|
||||
|
||||
void CoroutineWaitList::pushFront(CoroutineHandle* coroutine) {
|
||||
{
|
||||
std::lock_guard<oatpp::concurrency::SpinLock> lock(m_lock);
|
||||
m_list.pushFront(coroutine);
|
||||
std::lock_guard<oatpp::concurrency::SpinLock> lock(m_data->m_lock);
|
||||
m_data->m_list.pushFront(coroutine);
|
||||
}
|
||||
if(m_listener != nullptr) {
|
||||
m_listener->onNewItem(*this);
|
||||
if(m_data->m_listener != nullptr) {
|
||||
m_data->m_listener->onNewItem(*this);
|
||||
}
|
||||
}
|
||||
|
||||
void CoroutineWaitList::pushFront(CoroutineHandle* coroutine, v_int64 timeoutTimeSinceEpochMS) {
|
||||
{
|
||||
std::lock_guard<oatpp::concurrency::SpinLock> lock{m_timeoutsLock};
|
||||
m_coroutinesWithTimeout.emplace_back(coroutine, timeoutTimeSinceEpochMS);
|
||||
if (m_coroutinesWithTimeout.size() == 1 && !m_thread.joinable()) {
|
||||
startTimeoutCheckerThread();
|
||||
}
|
||||
std::lock_guard<oatpp::concurrency::SpinLock> lock{m_data->m_timeoutsLock};
|
||||
m_data->m_coroutinesWithTimeout.emplace_back(coroutine, timeoutTimeSinceEpochMS);
|
||||
}
|
||||
pushFront(coroutine);
|
||||
}
|
||||
|
||||
void CoroutineWaitList::pushBack(CoroutineHandle* coroutine) {
|
||||
{
|
||||
std::lock_guard<oatpp::concurrency::SpinLock> lock(m_lock);
|
||||
m_list.pushBack(coroutine);
|
||||
std::lock_guard<oatpp::concurrency::SpinLock> lock(m_data->m_lock);
|
||||
m_data->m_list.pushBack(coroutine);
|
||||
}
|
||||
if(m_listener != nullptr) {
|
||||
m_listener->onNewItem(*this);
|
||||
if(m_data->m_listener != nullptr) {
|
||||
m_data->m_listener->onNewItem(*this);
|
||||
}
|
||||
}
|
||||
|
||||
void CoroutineWaitList::pushBack(CoroutineHandle* coroutine, v_int64 timeoutTimeSinceEpochMS) {
|
||||
{
|
||||
std::lock_guard<oatpp::concurrency::SpinLock> lock{m_timeoutsLock};
|
||||
m_coroutinesWithTimeout.emplace_back(coroutine, timeoutTimeSinceEpochMS);
|
||||
if (m_coroutinesWithTimeout.size() == 1 && !m_thread.joinable()) {
|
||||
startTimeoutCheckerThread();
|
||||
}
|
||||
std::lock_guard<oatpp::concurrency::SpinLock> lock{m_data->m_timeoutsLock};
|
||||
m_data->m_coroutinesWithTimeout.emplace_back(coroutine, timeoutTimeSinceEpochMS);
|
||||
}
|
||||
pushBack(coroutine);
|
||||
}
|
||||
|
||||
void CoroutineWaitList::notifyFirst() {
|
||||
std::lock_guard<oatpp::concurrency::SpinLock> lock(m_lock);
|
||||
if(m_list.first) {
|
||||
auto coroutine = m_list.popFront();
|
||||
std::lock_guard<oatpp::concurrency::SpinLock> lock(m_data->m_lock);
|
||||
if(m_data->m_list.first) {
|
||||
auto coroutine = m_data->m_list.popFront();
|
||||
coroutine->_PP->pushOneTask(coroutine);
|
||||
}
|
||||
}
|
||||
|
||||
void CoroutineWaitList::notifyAll() {
|
||||
std::lock_guard<oatpp::concurrency::SpinLock> lock(m_lock);
|
||||
while (!m_list.empty()) {
|
||||
auto curr = m_list.popFront();
|
||||
curr->_PP->pushOneTask(curr);
|
||||
}
|
||||
std::lock_guard<oatpp::concurrency::SpinLock> lock(m_data->m_lock);
|
||||
while (!m_data->m_list.empty()) {
|
||||
auto curr = m_data->m_list.popFront();
|
||||
curr->_PP->pushOneTask(curr);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
@ -59,18 +59,18 @@ public:
|
||||
virtual void onNewItem(CoroutineWaitList& list) = 0;
|
||||
};
|
||||
private:
|
||||
oatpp::collection::FastQueue<CoroutineHandle> m_list;
|
||||
oatpp::concurrency::SpinLock m_lock;
|
||||
Listener* m_listener = nullptr;
|
||||
struct Data {
|
||||
oatpp::collection::FastQueue<CoroutineHandle> m_list;
|
||||
oatpp::concurrency::SpinLock m_lock;
|
||||
Listener* m_listener = nullptr;
|
||||
|
||||
std::vector<std::pair<CoroutineHandle*, v_int64>> m_coroutinesWithTimeout;
|
||||
oatpp::concurrency::SpinLock m_timeoutsLock;
|
||||
};
|
||||
|
||||
std::vector<std::pair<CoroutineHandle*, v_int64>> m_coroutinesWithTimeout;
|
||||
oatpp::concurrency::SpinLock m_timeoutsLock;
|
||||
|
||||
std::atomic_bool m_stop{false};
|
||||
std::thread m_thread;
|
||||
std::shared_ptr<Data> m_data{std::make_shared<Data>()};
|
||||
private:
|
||||
void startTimeoutCheckerThread();
|
||||
void checkCoroutinesForTimeouts();
|
||||
static void checkCoroutinesForTimeouts(const std::shared_ptr<Data>& data);
|
||||
|
||||
protected:
|
||||
/*
|
||||
@ -153,17 +153,14 @@ public:
|
||||
notifyAll();
|
||||
|
||||
{
|
||||
std::lock_guard<oatpp::concurrency::SpinLock> otherLock{other.m_lock};
|
||||
std::lock_guard<oatpp::concurrency::SpinLock> myLock{m_lock};
|
||||
m_list = std::move(other.m_list);
|
||||
std::lock_guard<oatpp::concurrency::SpinLock> otherLock{other.m_data->m_lock};
|
||||
std::lock_guard<oatpp::concurrency::SpinLock> myLock{m_data->m_lock};
|
||||
m_data->m_list = std::move(other.m_data->m_list);
|
||||
}
|
||||
{
|
||||
std::lock_guard<oatpp::concurrency::SpinLock> otherLock{other.m_timeoutsLock};
|
||||
std::lock_guard<oatpp::concurrency::SpinLock> myLock{m_timeoutsLock};
|
||||
m_coroutinesWithTimeout = std::move(other.m_coroutinesWithTimeout);
|
||||
if (!m_coroutinesWithTimeout.empty() && !m_thread.joinable()) {
|
||||
startTimeoutCheckerThread();
|
||||
}
|
||||
std::lock_guard<oatpp::concurrency::SpinLock> otherLock{other.m_data->m_timeoutsLock};
|
||||
std::lock_guard<oatpp::concurrency::SpinLock> myLock{m_data->m_timeoutsLock};
|
||||
m_data->m_coroutinesWithTimeout = std::move(other.m_data->m_coroutinesWithTimeout);
|
||||
}
|
||||
return *this;
|
||||
}
|
||||
|
@ -29,6 +29,32 @@
|
||||
|
||||
namespace oatpp { namespace async {
|
||||
|
||||
void Processor::checkCoroutinesForTimeouts() {
|
||||
while (m_running) {
|
||||
{
|
||||
std::unique_lock<std::mutex> lock{m_coroutineWaitListsWithTimeoutsMutex};
|
||||
while (m_coroutineWaitListsWithTimeouts.empty()) {
|
||||
m_coroutineWaitListsWithTimeoutsCV.wait(lock);
|
||||
if (!m_running) return;
|
||||
}
|
||||
|
||||
auto curr = m_coroutineWaitListsWithTimeouts.rbegin();
|
||||
const auto end = m_coroutineWaitListsWithTimeouts.rend();
|
||||
for (; curr != end; ++curr) {
|
||||
std::shared_ptr<CoroutineWaitList::Data> data = curr->lock();
|
||||
if (!data) {
|
||||
m_coroutineWaitListsWithTimeouts.erase(std::next(curr).base());
|
||||
continue;
|
||||
}
|
||||
|
||||
CoroutineWaitList::checkCoroutinesForTimeouts(data);
|
||||
}
|
||||
}
|
||||
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds{100});
|
||||
}
|
||||
}
|
||||
|
||||
void Processor::addWorker(const std::shared_ptr<worker::Worker>& worker) {
|
||||
|
||||
switch(worker->getType()) {
|
||||
@ -101,6 +127,12 @@ void Processor::addCoroutine(CoroutineHandle* coroutine) {
|
||||
case Action::TYPE_WAIT_LIST_WITH_TIMEOUT:
|
||||
coroutine->_SCH_A = Action::createActionByType(Action::TYPE_NONE);
|
||||
action.m_data.waitListWithTimeout.waitList->pushBack(coroutine, action.m_data.waitListWithTimeout.timeoutTimeSinceEpochMS);
|
||||
|
||||
{
|
||||
std::lock_guard<std::mutex> lock{m_coroutineWaitListsWithTimeoutsMutex};
|
||||
m_coroutineWaitListsWithTimeouts.emplace_back(action.m_data.waitListWithTimeout.waitList->m_data);
|
||||
}
|
||||
m_coroutineWaitListsWithTimeoutsCV.notify_one();
|
||||
break;
|
||||
|
||||
default:
|
||||
@ -219,6 +251,12 @@ bool Processor::iterate(v_int32 numIterations) {
|
||||
CP->_SCH_A = Action::createActionByType(Action::TYPE_NONE);
|
||||
m_queue.popFront();
|
||||
action.m_data.waitListWithTimeout.waitList->pushBack(CP, action.m_data.waitListWithTimeout.timeoutTimeSinceEpochMS);
|
||||
|
||||
{
|
||||
std::lock_guard<std::mutex> lock{m_coroutineWaitListsWithTimeoutsMutex};
|
||||
m_coroutineWaitListsWithTimeouts.emplace_back(action.m_data.waitListWithTimeout.waitList->m_data);
|
||||
}
|
||||
m_coroutineWaitListsWithTimeoutsCV.notify_one();
|
||||
break;
|
||||
|
||||
default:
|
||||
|
@ -6,7 +6,8 @@
|
||||
* (_____)(__)(__)(__) |_| |_|
|
||||
*
|
||||
*
|
||||
* Copyright 2018-present, Leonid Stryzhevskyi <lganzzzo@gmail.com>
|
||||
* Copyright 2018-present, Leonid Stryzhevskyi <lganzzzo@gmail.com>,
|
||||
* Matthias Haselmaier <mhaselmaier@gmail.com>
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
@ -26,6 +27,7 @@
|
||||
#define oatpp_async_Processor_hpp
|
||||
|
||||
#include "./Coroutine.hpp"
|
||||
#include "./CoroutineWaitList.hpp"
|
||||
#include "oatpp/core/collection/FastQueue.hpp"
|
||||
|
||||
#include <mutex>
|
||||
@ -45,7 +47,7 @@ private:
|
||||
|
||||
class TaskSubmission {
|
||||
public:
|
||||
virtual ~TaskSubmission() {};
|
||||
virtual ~TaskSubmission() = default;
|
||||
virtual CoroutineHandle* createCoroutine(Processor* processor) = 0;
|
||||
};
|
||||
|
||||
@ -108,8 +110,17 @@ private:
|
||||
|
||||
private:
|
||||
|
||||
bool m_running = true;
|
||||
std::atomic<v_int32> m_tasksCounter;
|
||||
std::atomic_bool m_running = true;
|
||||
std::atomic<v_int32> m_tasksCounter = 0;
|
||||
|
||||
private:
|
||||
|
||||
std::mutex m_coroutineWaitListsWithTimeoutsMutex;
|
||||
std::condition_variable m_coroutineWaitListsWithTimeoutsCV;
|
||||
std::vector<std::weak_ptr<CoroutineWaitList::Data>> m_coroutineWaitListsWithTimeouts;
|
||||
std::thread m_coroutineWaitListTimeoutChecker{&Processor::checkCoroutinesForTimeouts, this};
|
||||
|
||||
void checkCoroutinesForTimeouts();
|
||||
|
||||
private:
|
||||
|
||||
@ -123,10 +134,13 @@ private:
|
||||
|
||||
public:
|
||||
|
||||
Processor()
|
||||
: m_running(true)
|
||||
, m_tasksCounter(0)
|
||||
{}
|
||||
Processor() = default;
|
||||
|
||||
~Processor() {
|
||||
m_running = false;
|
||||
m_coroutineWaitListsWithTimeoutsCV.notify_one();
|
||||
m_coroutineWaitListTimeoutChecker.join();
|
||||
}
|
||||
|
||||
/**
|
||||
* Add dedicated co-worker to processor.
|
||||
|
Loading…
Reference in New Issue
Block a user