Merge pull request #424 from MHaselmaier/issue_408

Implementing Issue #408
This commit is contained in:
Leonid Stryzhevskyi 2021-05-21 00:57:13 +03:00 committed by GitHub
commit 1f93f6caec
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 569 additions and 88 deletions

View File

@ -6,7 +6,8 @@
* (_____)(__)(__)(__) |_| |_|
*
*
* Copyright 2018-present, Leonid Stryzhevskyi <lganzzzo@gmail.com>
* Copyright 2018-present, Leonid Stryzhevskyi <lganzzzo@gmail.com>,
* Matthias Haselmaier <mhaselmaier@gmail.com>
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -65,6 +66,13 @@ Action Action::createWaitListAction(CoroutineWaitList* waitList) {
return result;
}
Action Action::createWaitListActionWithTimeout(CoroutineWaitList* waitList, const std::chrono::steady_clock::time_point& timeout) {
Action result(TYPE_WAIT_LIST_WITH_TIMEOUT);
result.m_data.waitListWithTimeout.waitList = waitList;
result.m_data.waitListWithTimeout.timeoutTimeSinceEpochMS = std::chrono::duration_cast<std::chrono::milliseconds>(timeout.time_since_epoch()).count();
return result;
}
Action::Action()
: m_type(TYPE_NONE)
{}

View File

@ -6,7 +6,8 @@
* (_____)(__)(__)(__) |_| |_|
*
*
* Copyright 2018-present, Leonid Stryzhevskyi <lganzzzo@gmail.com>
* Copyright 2018-present, Leonid Stryzhevskyi <lganzzzo@gmail.com>,
* Matthias Haselmaier <mhaselmaier@gmail.com>
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -113,6 +114,11 @@ public:
*/
static constexpr const v_int32 TYPE_WAIT_LIST = 9;
/**
* Indicate that coroutine should be put on a wait-list provided with a timeout.
*/
static constexpr const v_int32 TYPE_WAIT_LIST_WITH_TIMEOUT = 10;
public:
/**
@ -174,6 +180,11 @@ private:
IOEventType ioEventType;
};
struct WaitListWithTimeout {
CoroutineWaitList* waitList;
v_int64 timeoutTimeSinceEpochMS;
};
private:
union Data {
FunctionPtr fptr;
@ -182,6 +193,7 @@ private:
IOData ioData;
v_int64 timePointMicroseconds;
CoroutineWaitList* waitList;
WaitListWithTimeout waitListWithTimeout;
};
private:
mutable v_int32 m_type;
@ -243,6 +255,14 @@ public:
*/
static Action createWaitListAction(CoroutineWaitList* waitList);
/**
* Create TYPE_WAIT_LIST_WITH_TIMEOUT Action.
* @param waitList - wait-list to put coroutine on.
* @param timeout - latest time point at which the coroutine should be continued.
* @return - Action.
*/
static Action createWaitListActionWithTimeout(CoroutineWaitList* waitList, const std::chrono::steady_clock::time_point& timeout);
/**
* Constructor. Create start-coroutine Action.
* @param coroutine - pointer to &l:AbstractCoroutine;.

View File

@ -6,7 +6,8 @@
* (_____)(__)(__)(__) |_| |_|
*
*
* Copyright 2018-present, Leonid Stryzhevskyi <lganzzzo@gmail.com>
* Copyright 2018-present, Leonid Stryzhevskyi <lganzzzo@gmail.com>,
* Matthias Haselmaier <mhaselmaier@gmail.com>
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -25,18 +26,63 @@
#include "CoroutineWaitList.hpp"
#include "./Processor.hpp"
#include <algorithm>
#include <set>
namespace oatpp { namespace async {
CoroutineWaitList::CoroutineWaitList(CoroutineWaitList&& other) {
{
std::lock_guard<oatpp::concurrency::SpinLock> lock{other.m_lock};
m_list = std::move(other.m_list);
}
{
std::lock_guard<oatpp::concurrency::SpinLock> lock{other.m_timeoutsLock};
m_coroutinesWithTimeout = std::move(other.m_coroutinesWithTimeout);
m_timeoutCheckingProcessors = std::move(other.m_timeoutCheckingProcessors);
for (const std::pair<Processor*, v_int64>& entry : m_timeoutCheckingProcessors) {
Processor* processor = entry.first;
processor->removeCoroutineWaitListWithTimeouts(std::addressof(other));
processor->addCoroutineWaitListWithTimeouts(this);
}
}
}
CoroutineWaitList::~CoroutineWaitList() {
notifyAll();
}
void CoroutineWaitList::checkCoroutinesForTimeouts() {
std::lock_guard<oatpp::concurrency::SpinLock> listLock{m_lock};
std::lock_guard<oatpp::concurrency::SpinLock> lock{m_timeoutsLock};
const auto currentTimeSinceEpochMS = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now().time_since_epoch()).count();
const auto newEndIt = std::remove_if(std::begin(m_coroutinesWithTimeout), std::end(m_coroutinesWithTimeout), [&](const std::pair<CoroutineHandle*, v_int64>& entry) {
return currentTimeSinceEpochMS > entry.second;
});
for (CoroutineHandle* curr = m_list.first, *prev = nullptr; !m_list.empty() && m_list.last->_ref != curr; curr = curr->_ref) {
const bool removeFromWaitList = std::any_of(newEndIt, std::end(m_coroutinesWithTimeout), [=](const std::pair<CoroutineHandle*, v_int64>& entry) {
return entry.first == curr;
});
if (!removeFromWaitList) {
prev = curr;
continue;
}
m_list.cutEntry(curr, prev);
if (--m_timeoutCheckingProcessors[curr->_PP] <= 0) {
curr->_PP->removeCoroutineWaitListWithTimeouts(this);
m_timeoutCheckingProcessors.erase(curr->_PP);
}
curr->_PP->pushOneTask(curr);
}
m_coroutinesWithTimeout.erase(newEndIt, std::end(m_coroutinesWithTimeout));
}
void CoroutineWaitList::setListener(Listener* listener) {
m_listener = listener;
}
@ -51,6 +97,17 @@ void CoroutineWaitList::pushFront(CoroutineHandle* coroutine) {
}
}
void CoroutineWaitList::pushFront(CoroutineHandle* coroutine, v_int64 timeoutTimeSinceEpochMS) {
{
std::lock_guard<oatpp::concurrency::SpinLock> lock{m_timeoutsLock};
m_coroutinesWithTimeout.emplace_back(coroutine, timeoutTimeSinceEpochMS);
if (++m_timeoutCheckingProcessors[coroutine->_PP] == 1) {
coroutine->_PP->addCoroutineWaitListWithTimeouts(this);
}
}
pushFront(coroutine);
}
void CoroutineWaitList::pushBack(CoroutineHandle* coroutine) {
{
std::lock_guard<oatpp::concurrency::SpinLock> lock(m_lock);
@ -61,21 +118,68 @@ void CoroutineWaitList::pushBack(CoroutineHandle* coroutine) {
}
}
void CoroutineWaitList::pushBack(CoroutineHandle* coroutine, v_int64 timeoutTimeSinceEpochMS) {
{
std::lock_guard<oatpp::concurrency::SpinLock> lock{m_timeoutsLock};
m_coroutinesWithTimeout.emplace_back(coroutine, timeoutTimeSinceEpochMS);
if (++m_timeoutCheckingProcessors[coroutine->_PP] == 1) {
coroutine->_PP->addCoroutineWaitListWithTimeouts(this);
}
}
pushBack(coroutine);
}
void CoroutineWaitList::notifyFirst() {
std::lock_guard<oatpp::concurrency::SpinLock> lock(m_lock);
std::lock_guard<oatpp::concurrency::SpinLock> lock{m_lock};
if(m_list.first) {
auto coroutine = m_list.popFront();
coroutine->_PP->pushOneTask(coroutine);
removeFirstCoroutine();
}
}
void CoroutineWaitList::notifyAll() {
std::lock_guard<oatpp::concurrency::SpinLock> lock(m_lock);
while (!m_list.empty()) {
auto curr = m_list.popFront();
curr->_PP->pushOneTask(curr);
}
while (!m_list.empty()) {
removeFirstCoroutine();
}
}
void CoroutineWaitList::removeFirstCoroutine() {
auto coroutine = m_list.popFront();
{
std::lock_guard<oatpp::concurrency::SpinLock> lock{m_timeoutsLock};
if (--m_timeoutCheckingProcessors[coroutine->_PP] <= 0) {
coroutine->_PP->removeCoroutineWaitListWithTimeouts(this);
m_timeoutCheckingProcessors.erase(coroutine->_PP);
}
}
coroutine->_PP->pushOneTask(coroutine);
}
CoroutineWaitList& CoroutineWaitList::operator=(CoroutineWaitList&& other) {
if (this == std::addressof(other)) return *this;
notifyAll();
{
std::lock_guard<oatpp::concurrency::SpinLock> otherLock{other.m_lock};
std::lock_guard<oatpp::concurrency::SpinLock> myLock{m_lock};
m_list = std::move(other.m_list);
}
{
std::lock_guard<oatpp::concurrency::SpinLock> otherLock{other.m_timeoutsLock};
std::lock_guard<oatpp::concurrency::SpinLock> myLock{m_timeoutsLock};
m_coroutinesWithTimeout = std::move(other.m_coroutinesWithTimeout);
m_timeoutCheckingProcessors = std::move(other.m_timeoutCheckingProcessors);
for (const std::pair<Processor*, v_int64>& entry : m_timeoutCheckingProcessors) {
Processor* processor = entry.first;
processor->removeCoroutineWaitListWithTimeouts(std::addressof(other));
processor->addCoroutineWaitListWithTimeouts(this);
}
}
return *this;
}
}}

View File

@ -6,7 +6,8 @@
* (_____)(__)(__)(__) |_| |_|
*
*
* Copyright 2018-present, Leonid Stryzhevskyi <lganzzzo@gmail.com>
* Copyright 2018-present, Leonid Stryzhevskyi <lganzzzo@gmail.com>,
* Matthias Haselmaier <mhaselmaier@gmail.com>
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -29,7 +30,10 @@
#include "oatpp/core/collection/FastQueue.hpp"
#include "oatpp/core/concurrency/SpinLock.hpp"
#include <map>
#include <mutex>
#include <thread>
#include <utility>
namespace oatpp { namespace async {
@ -59,6 +63,16 @@ private:
oatpp::collection::FastQueue<CoroutineHandle> m_list;
oatpp::concurrency::SpinLock m_lock;
Listener* m_listener = nullptr;
std::map<Processor*, v_int64> m_timeoutCheckingProcessors;
std::vector<std::pair<CoroutineHandle*, v_int64>> m_coroutinesWithTimeout;
oatpp::concurrency::SpinLock m_timeoutsLock;
private:
void checkCoroutinesForTimeouts();
void removeFirstCoroutine();
protected:
/*
* Put coroutine on wait-list.
@ -67,12 +81,28 @@ protected:
*/
void pushFront(CoroutineHandle* coroutine);
/*
* Put coroutine on wait-list with timeout.
* This method should be called by Coroutine Processor only.
* @param coroutine
* @param timeoutTimeSinceEpochMS
*/
void pushFront(CoroutineHandle* coroutine, v_int64 timeoutTimeSinceEpochMS);
/*
* Put coroutine on wait-list.
* This method should be called by Coroutine Processor only.
* @param coroutine
*/
void pushBack(CoroutineHandle* coroutine);
/*
* Put coroutine on wait-list with timeout.
* This method should be called by Coroutine Processor only.
* @param coroutine
* @param timeoutTimeSinceEpochMS
*/
void pushBack(CoroutineHandle* coroutine, v_int64 timeoutTimeSinceEpochMS);
public:
/**
@ -118,12 +148,7 @@ public:
*/
void notifyAll();
CoroutineWaitList& operator=(CoroutineWaitList&& other) {
notifyAll();
std::lock_guard<oatpp::concurrency::SpinLock> lock(m_lock);
m_list = std::move(other.m_list);
return *this;
}
CoroutineWaitList& operator=(CoroutineWaitList&& other);
};

View File

@ -6,7 +6,8 @@
* (_____)(__)(__)(__) |_| |_|
*
*
* Copyright 2018-present, Leonid Stryzhevskyi <lganzzzo@gmail.com>
* Copyright 2018-present, Leonid Stryzhevskyi <lganzzzo@gmail.com>,
* Matthias Haselmaier <mhaselmaier@gmail.com>
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -28,6 +29,38 @@
namespace oatpp { namespace async {
void Processor::checkCoroutinesForTimeouts() {
while (m_running) {
{
std::unique_lock<std::recursive_mutex> lock{m_coroutineWaitListsWithTimeoutsMutex};
while (m_coroutineWaitListsWithTimeouts.empty()) {
m_coroutineWaitListsWithTimeoutsCV.wait(lock);
if (!m_running) return;
}
const auto coroutineWaitListsWithTimeouts = m_coroutineWaitListsWithTimeouts;
for (CoroutineWaitList* waitList : coroutineWaitListsWithTimeouts) {
waitList->checkCoroutinesForTimeouts();
}
}
std::this_thread::sleep_for(std::chrono::milliseconds{100});
}
}
void Processor::addCoroutineWaitListWithTimeouts(CoroutineWaitList* waitList) {
{
std::lock_guard<std::recursive_mutex> lock{m_coroutineWaitListsWithTimeoutsMutex};
m_coroutineWaitListsWithTimeouts.insert(waitList);
}
m_coroutineWaitListsWithTimeoutsCV.notify_one();
}
void Processor::removeCoroutineWaitListWithTimeouts(CoroutineWaitList* waitList) {
std::lock_guard<std::recursive_mutex> lock{m_coroutineWaitListsWithTimeoutsMutex};
m_coroutineWaitListsWithTimeouts.erase(waitList);
}
void Processor::addWorker(const std::shared_ptr<worker::Worker>& worker) {
switch(worker->getType()) {
@ -97,6 +130,11 @@ void Processor::addCoroutine(CoroutineHandle* coroutine) {
action.m_data.waitList->pushBack(coroutine);
break;
case Action::TYPE_WAIT_LIST_WITH_TIMEOUT:
coroutine->_SCH_A = Action::createActionByType(Action::TYPE_NONE);
action.m_data.waitListWithTimeout.waitList->pushBack(coroutine, action.m_data.waitListWithTimeout.timeoutTimeSinceEpochMS);
break;
default:
m_queue.pushBack(coroutine);
@ -209,6 +247,12 @@ bool Processor::iterate(v_int32 numIterations) {
action.m_data.waitList->pushBack(CP);
break;
case Action::TYPE_WAIT_LIST_WITH_TIMEOUT:
CP->_SCH_A = Action::createActionByType(Action::TYPE_NONE);
m_queue.popFront();
action.m_data.waitListWithTimeout.waitList->pushBack(CP, action.m_data.waitListWithTimeout.timeoutTimeSinceEpochMS);
break;
default:
m_queue.round();
}
@ -230,6 +274,9 @@ void Processor::stop() {
m_running = false;
}
m_taskCondition.notify_one();
m_coroutineWaitListsWithTimeoutsCV.notify_one();
m_coroutineWaitListTimeoutChecker.join();
}
v_int32 Processor::getTasksCount() {

View File

@ -6,7 +6,8 @@
* (_____)(__)(__)(__) |_| |_|
*
*
* Copyright 2018-present, Leonid Stryzhevskyi <lganzzzo@gmail.com>
* Copyright 2018-present, Leonid Stryzhevskyi <lganzzzo@gmail.com>,
* Matthias Haselmaier <mhaselmaier@gmail.com>
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -26,12 +27,14 @@
#define oatpp_async_Processor_hpp
#include "./Coroutine.hpp"
#include "./CoroutineWaitList.hpp"
#include "oatpp/core/collection/FastQueue.hpp"
#include <mutex>
#include <list>
#include <vector>
#include <condition_variable>
#include <list>
#include <mutex>
#include <set>
#include <vector>
namespace oatpp { namespace async {
@ -41,11 +44,12 @@ namespace oatpp { namespace async {
* Do not use bare processor to run coroutines. Use &id:oatpp::async::Executor; instead;.
*/
class Processor {
friend class CoroutineWaitList;
private:
class TaskSubmission {
public:
virtual ~TaskSubmission() {};
virtual ~TaskSubmission() = default;
virtual CoroutineHandle* createCoroutine(Processor* processor) = 0;
};
@ -108,8 +112,19 @@ private:
private:
bool m_running = true;
std::atomic<v_int32> m_tasksCounter;
std::atomic_bool m_running{true};
std::atomic<v_int32> m_tasksCounter{0};
private:
std::recursive_mutex m_coroutineWaitListsWithTimeoutsMutex;
std::condition_variable_any m_coroutineWaitListsWithTimeoutsCV;
std::set<CoroutineWaitList*> m_coroutineWaitListsWithTimeouts;
std::thread m_coroutineWaitListTimeoutChecker{&Processor::checkCoroutinesForTimeouts, this};
void checkCoroutinesForTimeouts();
void addCoroutineWaitListWithTimeouts(CoroutineWaitList* waitList);
void removeCoroutineWaitListWithTimeouts(CoroutineWaitList* waitList);
private:
@ -123,10 +138,7 @@ private:
public:
Processor()
: m_running(true)
, m_tasksCounter(0)
{}
Processor() = default;
/**
* Add dedicated co-worker to processor.

View File

@ -6,7 +6,8 @@
* (_____)(__)(__)(__) |_| |_|
*
*
* Copyright 2018-present, Leonid Stryzhevskyi <lganzzzo@gmail.com>
* Copyright 2018-present, Leonid Stryzhevskyi <lganzzzo@gmail.com>,
* Matthias Haselmaier <mhaselmaier@gmail.com>
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -189,96 +190,87 @@ private:
private:
std::shared_ptr<Provider<TResource>> m_provider;
v_int64 m_counter;
v_int64 m_counter{0};
v_int64 m_maxResources;
v_int64 m_maxResourceTTL;
std::atomic<bool> m_running;
bool m_finished;
std::atomic<bool> m_running{true};
bool m_finished{false};
private:
std::list<PoolRecord> m_bench;
async::CoroutineWaitList m_waitList;
std::condition_variable m_condition;
std::mutex m_lock;
std::chrono::duration<v_int64, std::micro> m_timeout;
protected:
PoolTemplate(const std::shared_ptr<Provider<TResource>>& provider, v_int64 maxResources, v_int64 maxResourceTTL)
PoolTemplate(const std::shared_ptr<Provider<TResource>>& provider, v_int64 maxResources, v_int64 maxResourceTTL, const std::chrono::duration<v_int64, std::micro>& timeout)
: m_provider(provider)
, m_counter(0)
, m_maxResources(maxResources)
, m_maxResourceTTL(maxResourceTTL)
, m_running(true)
, m_finished(false)
, m_timeout(timeout)
{}
void startCleanupTask(const std::shared_ptr<PoolTemplate>& _this) {
static void startCleanupTask(const std::shared_ptr<PoolTemplate>& _this) {
std::thread poolCleanupTask(cleanupTask, _this);
poolCleanupTask.detach();
}
public:
static std::shared_ptr<PoolTemplate> createShared(const std::shared_ptr<Provider<TResource>>& provider,
v_int64 maxResources,
const std::chrono::duration<v_int64, std::micro>& maxResourceTTL)
{
/* "new" is called directly to keep constructor private */
auto ptr = std::shared_ptr<PoolTemplate>(new PoolTemplate(provider, maxResources, maxResourceTTL.count()));
ptr->startCleanupTask();
return ptr;
}
virtual ~PoolTemplate() {
stop();
}
std::shared_ptr<TResource> get(const std::shared_ptr<PoolTemplate>& _this) {
static std::shared_ptr<TResource> get(const std::shared_ptr<PoolTemplate>& _this) {
auto readyPredicate = [&_this]() { return !_this->m_running || !_this->m_bench.empty() || _this->m_counter < _this->m_maxResources; };
std::unique_lock<std::mutex> guard{_this->m_lock};
if (_this->m_timeout == std::chrono::microseconds::zero())
{
std::unique_lock<std::mutex> guard(m_lock);
while (m_running && m_bench.size() == 0 && m_counter >= m_maxResources ) {
m_condition.wait(guard);
while (!readyPredicate()) {
_this->m_condition.wait(guard);
}
} else if (!_this->m_condition.wait_for(guard, _this->m_timeout, std::move(readyPredicate))) {
return nullptr;
}
if(!m_running) {
return nullptr;
}
if (m_bench.size() > 0) {
auto record = m_bench.front();
m_bench.pop_front();
return std::make_shared<AcquisitionProxyImpl>(record.resource, _this);
} else {
++ m_counter;
}
}
try {
return std::make_shared<AcquisitionProxyImpl>(m_provider->get(), _this);
} catch (...) {
std::lock_guard<std::mutex> guard(m_lock);
-- m_counter;
if(!_this->m_running) {
return nullptr;
}
if (_this->m_bench.size() > 0) {
auto record = _this->m_bench.front();
_this->m_bench.pop_front();
return std::make_shared<AcquisitionProxyImpl>(record.resource, _this);
} else {
++ _this->m_counter;
}
guard.unlock();
try {
return std::make_shared<AcquisitionProxyImpl>(_this->m_provider->get(), _this);
} catch (...) {
guard.lock();
--_this->m_counter;
return nullptr;
}
}
async::CoroutineStarterForResult<const std::shared_ptr<TResource>&> getAsync(const std::shared_ptr<PoolTemplate>& _this) {
static async::CoroutineStarterForResult<const std::shared_ptr<TResource>&> getAsync(const std::shared_ptr<PoolTemplate>& _this) {
class GetCoroutine : public oatpp::async::CoroutineWithResult<GetCoroutine, const std::shared_ptr<TResource>&> {
private:
std::shared_ptr<PoolTemplate> m_pool;
std::shared_ptr<Provider<TResource>> m_provider;
std::chrono::steady_clock::time_point m_startTime{std::chrono::steady_clock::now()};
public:
GetCoroutine(const std::shared_ptr<PoolTemplate>& pool, const std::shared_ptr<Provider<TResource>>& provider)
GetCoroutine(const std::shared_ptr<PoolTemplate>& pool)
: m_pool(pool)
, m_provider(provider)
{}
bool timedout() const noexcept {
return m_pool->m_timeout != std::chrono::microseconds::zero() && m_pool->m_timeout < (std::chrono::steady_clock::now() - m_startTime);
}
async::Action act() override {
if (timedout()) return this->_return(nullptr);
{
/* Careful!!! Using non-async lock */
@ -286,7 +278,9 @@ public:
if (m_pool->m_running && m_pool->m_bench.size() == 0 && m_pool->m_counter >= m_pool->m_maxResources) {
guard.unlock();
return async::Action::createWaitListAction(&m_pool->m_waitList);
return m_pool->m_timeout == std::chrono::microseconds::zero()
? async::Action::createWaitListAction(&m_pool->m_waitList)
: async::Action::createWaitListActionWithTimeout(&m_pool->m_waitList, m_startTime + m_pool->m_timeout);
}
if(!m_pool->m_running) {
@ -305,7 +299,7 @@ public:
}
return m_provider->getAsync().callbackTo(&GetCoroutine::onGet);
return m_pool->m_provider->getAsync().callbackTo(&GetCoroutine::onGet);
}
@ -324,10 +318,26 @@ public:
};
return GetCoroutine::startForResult(_this, m_provider);
return GetCoroutine::startForResult(_this);
}
public:
static std::shared_ptr<PoolTemplate> createShared(const std::shared_ptr<Provider<TResource>>& provider,
v_int64 maxResources,
const std::chrono::duration<v_int64, std::micro>& maxResourceTTL)
{
/* "new" is called directly to keep constructor private */
auto ptr = std::shared_ptr<PoolTemplate>(new PoolTemplate(provider, maxResources, maxResourceTTL.count()));
startCleanupTask(ptr);
return ptr;
}
virtual ~PoolTemplate() {
stop();
}
void invalidate(const std::shared_ptr<TResource>& resource) {
auto proxy = std::static_pointer_cast<AcquisitionProxyImpl>(resource);
proxy->__pool__invalidate();
@ -391,8 +401,8 @@ protected:
* @param maxResources
* @param maxResourceTTL
*/
Pool(const std::shared_ptr<TProvider>& provider, v_int64 maxResources, v_int64 maxResourceTTL)
: PoolTemplate<TResource, AcquisitionProxyImpl>(provider, maxResources, maxResourceTTL)
Pool(const std::shared_ptr<TProvider>& provider, v_int64 maxResources, v_int64 maxResourceTTL, const std::chrono::duration<v_int64, std::micro>& timeout = std::chrono::microseconds::zero())
: PoolTemplate<TResource, AcquisitionProxyImpl>(provider, maxResources, maxResourceTTL, timeout)
{
TProvider::m_properties = provider->getProperties();
}

View File

@ -55,6 +55,8 @@ add_executable(oatppAllTests
oatpp/core/parser/CaretTest.hpp
oatpp/core/provider/PoolTest.cpp
oatpp/core/provider/PoolTest.hpp
oatpp/core/provider/PoolTemplateTest.cpp
oatpp/core/provider/PoolTemplateTest.hpp
oatpp/encoding/Base64Test.cpp
oatpp/encoding/Base64Test.hpp
oatpp/encoding/UnicodeTest.cpp

View File

@ -27,6 +27,7 @@
#include "oatpp/core/parser/CaretTest.hpp"
#include "oatpp/core/provider/PoolTest.hpp"
#include "oatpp/core/provider/PoolTemplateTest.hpp"
#include "oatpp/core/async/LockTest.hpp"
#include "oatpp/core/data/mapping/type/UnorderedMapTest.hpp"
@ -110,6 +111,7 @@ void runTests() {
OATPP_RUN_TEST(oatpp::test::parser::CaretTest);
OATPP_RUN_TEST(oatpp::test::core::provider::PoolTest);
OATPP_RUN_TEST(oatpp::test::core::provider::PoolTemplateTest);
OATPP_RUN_TEST(oatpp::test::parser::json::mapping::EnumTest);

View File

@ -0,0 +1,207 @@
/***************************************************************************
*
* Project _____ __ ____ _ _
* ( _ ) /__\ (_ _)_| |_ _| |_
* )(_)( /(__)\ )( (_ _)(_ _)
* (_____)(__)(__)(__) |_| |_|
*
*
* Copyright 2018-present, Leonid Stryzhevskyi <lganzzzo@gmail.com>,
* Matthias Haselmaier <mhaselmaier@gmail.com>
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
***************************************************************************/
#include "PoolTemplateTest.hpp"
#include <future>
#include "oatpp/core/provider/Pool.hpp"
#include "oatpp/core/async/Executor.hpp"
namespace oatpp { namespace test { namespace core { namespace provider {
namespace {
struct Resource {
};
class Provider : public oatpp::provider::Provider<Resource> {
public:
std::shared_ptr<Resource> get() override {
return std::make_shared<Resource>();
}
async::CoroutineStarterForResult<const std::shared_ptr<Resource> &> getAsync() override {
class GetCoroutine : public oatpp::async::CoroutineWithResult<GetCoroutine, const std::shared_ptr<Resource>&> {
private:
Provider* m_provider;
public:
GetCoroutine(Provider* provider)
: m_provider(provider)
{}
Action act() override {
return _return(std::make_shared<Resource>());
}
};
return GetCoroutine::startForResult(this);
}
void invalidate(const std::shared_ptr<Resource>& resource) override {
(void) resource;
}
void stop() override {
OATPP_LOGD("Provider", "stop()");
}
};
struct AcquisitionProxy : public oatpp::provider::AcquisitionProxy<Resource, AcquisitionProxy> {
AcquisitionProxy(const std::shared_ptr<Resource>& resource, const std::shared_ptr<PoolInstance>& pool)
: oatpp::provider::AcquisitionProxy<Resource, AcquisitionProxy>(resource, pool)
{}
};
struct Pool : public oatpp::provider::PoolTemplate<Resource, AcquisitionProxy> {
Pool(const std::shared_ptr<Provider>& provider, v_int64 maxResources, v_int64 maxResourceTTL, const std::chrono::duration<v_int64, std::micro>& timeout)
: oatpp::provider::PoolTemplate<Resource, AcquisitionProxy>(provider, maxResources, maxResourceTTL, timeout)
{}
static std::shared_ptr<Resource> get(const std::shared_ptr<PoolTemplate>& _this) {
return oatpp::provider::PoolTemplate<Resource, AcquisitionProxy>::get(_this);
}
static async::CoroutineStarterForResult<const std::shared_ptr<Resource>&> getAsync(const std::shared_ptr<PoolTemplate>& _this) {
return oatpp::provider::PoolTemplate<Resource, AcquisitionProxy>::getAsync(_this);
}
static std::shared_ptr<PoolTemplate> createShared(const std::shared_ptr<Provider>& provider,
v_int64 maxResources,
const std::chrono::duration<v_int64, std::micro>& maxResourceTTL,
const std::chrono::duration<v_int64, std::micro>& timeout) {
auto ptr = std::make_shared<Pool>(provider, maxResources, maxResourceTTL.count(), timeout);
startCleanupTask(ptr);
return ptr;
}
};
class ClientCoroutine : public oatpp::async::Coroutine<ClientCoroutine> {
private:
std::shared_ptr<oatpp::provider::PoolTemplate<Resource, AcquisitionProxy>> m_pool;
std::promise<std::shared_ptr<Resource>>* m_promise;
public:
ClientCoroutine(const std::shared_ptr<oatpp::provider::PoolTemplate<Resource, AcquisitionProxy>>& pool, std::promise<std::shared_ptr<Resource>>* promise)
: m_pool(pool)
, m_promise(promise)
{}
Action act() override {
return Pool::getAsync(m_pool).callbackTo(&ClientCoroutine::onGet);
}
Action onGet(const std::shared_ptr<Resource>& resource) {
m_promise->set_value(resource);
return finish();
}
};
}
void PoolTemplateTest::onRun() {
const auto provider = std::make_shared<Provider>();
const v_int64 maxResources = 1;
{
OATPP_LOGD(TAG, "Synchronously with timeout");
auto poolTemplate = Pool::createShared(provider, maxResources, std::chrono::seconds(10), std::chrono::milliseconds(500));
std::shared_ptr<Resource> resource = Pool::get(poolTemplate);
OATPP_ASSERT(resource != nullptr);
OATPP_ASSERT(Pool::get(poolTemplate) == nullptr);
poolTemplate->stop();
OATPP_ASSERT(Pool::get(poolTemplate) == nullptr);
}
{
OATPP_LOGD(TAG, "Synchronously without timeout");
auto poolTemplate = Pool::createShared(provider, maxResources, std::chrono::seconds(10), std::chrono::milliseconds::zero());
std::shared_ptr<Resource> resource = Pool::get(poolTemplate);
OATPP_ASSERT(resource != nullptr);
std::future<std::shared_ptr<Resource>> futureResource = std::async(std::launch::async, [&poolTemplate]() {
return Pool::get(poolTemplate);
});
OATPP_ASSERT(futureResource.wait_for(std::chrono::seconds(1)) == std::future_status::timeout);
poolTemplate->stop();
OATPP_ASSERT(Pool::get(poolTemplate) == nullptr);
}
{
OATPP_LOGD(TAG, "Asynchronously with timeout");
oatpp::async::Executor executor(1, 1, 1);
auto poolTemplate = Pool::createShared(provider, maxResources, std::chrono::seconds(10), std::chrono::milliseconds(500));
std::shared_ptr<Resource> resource;
{
std::promise<std::shared_ptr<Resource>> promise;
auto future = promise.get_future();
executor.execute<ClientCoroutine>(poolTemplate, &promise);
resource = future.get();
OATPP_ASSERT(resource != nullptr);
}
{
std::promise<std::shared_ptr<Resource>> promise;
auto future = promise.get_future();
executor.execute<ClientCoroutine>(poolTemplate, &promise);
OATPP_ASSERT(future.get() == nullptr);
}
poolTemplate->stop();
executor.stop();
executor.join();
}
{
OATPP_LOGD(TAG, "Asynchronously without timeout");
oatpp::async::Executor executor(1, 1, 1);
auto poolTemplate = Pool::createShared(provider, maxResources, std::chrono::seconds(10), std::chrono::milliseconds::zero());
std::shared_ptr<Resource> resource = Pool::get(poolTemplate);
OATPP_ASSERT(resource != nullptr);
std::promise<std::shared_ptr<Resource>> promise;
auto future = promise.get_future();
executor.execute<ClientCoroutine>(poolTemplate, &promise);
OATPP_ASSERT(future.wait_for(std::chrono::seconds(1)) == std::future_status::timeout);
poolTemplate->stop();
executor.stop();
executor.join();
}
}
}}}}

View File

@ -0,0 +1,44 @@
/***************************************************************************
*
* Project _____ __ ____ _ _
* ( _ ) /__\ (_ _)_| |_ _| |_
* )(_)( /(__)\ )( (_ _)(_ _)
* (_____)(__)(__)(__) |_| |_|
*
*
* Copyright 2018-present, Leonid Stryzhevskyi <lganzzzo@gmail.com>,
* Matthias Haselmaier <mhaselmaier@gmail.com>
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
***************************************************************************/
#ifndef oatpp_test_provider_PoolTemplateTest_hpp
#define oatpp_test_provider_PoolTemplateTest_hpp
#include "oatpp-test/UnitTest.hpp"
namespace oatpp { namespace test { namespace core { namespace provider {
class PoolTemplateTest : public UnitTest{
public:
PoolTemplateTest():UnitTest("TEST[provider::PoolTemplateTest]"){}
void onRun() override;
};
}}}}
#endif //oatpp_test_provider_PoolTemplateTest_hpp