test: better async/ConditionVariableTest

This commit is contained in:
Leonid Stryzhevskyi 2023-01-16 00:08:25 +02:00
parent 316b079f0a
commit 86617ecb9c
2 changed files with 165 additions and 35 deletions

View File

@ -98,15 +98,15 @@ CoroutineStarter ConditionVariable::waitUntil(Lock* lock, std::function<bool()>
}
Action onLocked() {
OATPP_LOGD("WaitCoroutine", "Waked!")
if(m_lockGuard.owns_lock()) {
OATPP_LOGD("WaitCoroutine", "Locked")
if (m_condition()) {
return finish();
}
m_cv->m_notified = false;
m_lockGuard.unlock();
OATPP_LOGD("WaitCoroutine", "UnLocked")
} else {
if(std::chrono::system_clock::now() > m_timeoutTime) {
return finish();
@ -118,7 +118,7 @@ CoroutineStarter ConditionVariable::waitUntil(Lock* lock, std::function<bool()>
return finish();
}
OATPP_LOGD("WaitCoroutine", "Sleeeeep")
return Action::createWaitListAction(&m_cv->m_list, m_timeoutTime);
}

View File

@ -35,33 +35,79 @@ struct Resource {
v_int64 counter;
};
class TestCoroutine : public oatpp::async::Coroutine<TestCoroutine> {
class TestCoroutineWait : public oatpp::async::Coroutine<TestCoroutineWait> {
private:
std::shared_ptr<Resource> m_resource;
oatpp::async::Lock* m_lock;
oatpp::async::ConditionVariable* m_cv;
public:
TestCoroutine(std::shared_ptr<Resource> resource,
oatpp::async::Lock* lock,
oatpp::async::ConditionVariable* cv)
TestCoroutineWait(std::shared_ptr<Resource> resource,
oatpp::async::Lock* lock,
oatpp::async::ConditionVariable* cv)
: m_resource(resource)
, m_lock(lock)
, m_cv(cv)
{}
bool condition() {
OATPP_LOGD("Resource", "%d", m_resource->counter)
return m_resource->counter == 100;
}
Action act() override {
OATPP_LOGD("TestCoroutine", "Waiting...")
return m_cv->waitFor(m_lock, [this]{return condition();}, std::chrono::milliseconds(5000)).next(yieldTo(&TestCoroutine::onReady));
return m_cv->wait(m_lock, [this]{return m_resource->counter == 100;})
.next(yieldTo(&TestCoroutineWait::onReady));
}
Action onReady() {
return finish();
}
};
class TestCoroutineWaitWithTimeout : public oatpp::async::Coroutine<TestCoroutineWaitWithTimeout> {
private:
std::shared_ptr<Resource> m_resource;
oatpp::async::Lock* m_lock;
oatpp::async::ConditionVariable* m_cv;
public:
TestCoroutineWaitWithTimeout(std::shared_ptr<Resource> resource,
oatpp::async::Lock* lock,
oatpp::async::ConditionVariable* cv)
: m_resource(resource)
, m_lock(lock)
, m_cv(cv)
{}
Action act() override {
return m_cv->waitFor(m_lock, [this]{return m_resource->counter == 100;}, std::chrono::seconds(5))
.next(yieldTo(&TestCoroutineWaitWithTimeout::onReady));
}
Action onReady() {
return finish();
}
};
class TestCoroutineTimeout : public oatpp::async::Coroutine<TestCoroutineTimeout> {
private:
std::shared_ptr<Resource> m_resource;
oatpp::async::Lock* m_lock;
oatpp::async::ConditionVariable* m_cv;
public:
TestCoroutineTimeout(std::shared_ptr<Resource> resource,
oatpp::async::Lock* lock,
oatpp::async::ConditionVariable* cv)
: m_resource(resource)
, m_lock(lock)
, m_cv(cv)
{}
Action act() override {
return m_cv->waitFor(m_lock, [this]{return false;}, std::chrono::seconds(1))
.next(yieldTo(&TestCoroutineTimeout::onReady));
}
Action onReady() {
OATPP_LOGD("TestCoroutine", "Ready!!!")
return finish();
}
@ -71,34 +117,118 @@ public:
void ConditionVariableTest::onRun() {
for(v_int32 iter = 0; iter < 100; iter ++ ) {
std::atomic<bool> finished(false);
OATPP_LOGD("ITER", "%d", iter)
std::thread timeoutThread([&finished] {
auto now = oatpp::base::Environment::getMicroTickCount();
while(!finished) {
auto ticks = oatpp::base::Environment::getMicroTickCount();
if(ticks - now > 10 * 60 * 1000 * 1000) {
break;
}
std::this_thread::sleep_for(std::chrono::microseconds(100));
}
OATPP_ASSERT(finished);
});
oatpp::async::Executor executor;
{
auto resource = std::make_shared<Resource>();
oatpp::async::Lock lock;
oatpp::async::ConditionVariable cv;
executor.execute<TestCoroutine>(resource, &lock, &cv);
for (v_int32 iter = 0; iter < 100; iter++) {
OATPP_LOGD("ITERATION 'WAIT'", "%d", iter)
oatpp::async::Executor executor;
auto resource = std::make_shared<Resource>();
oatpp::async::Lock lock;
oatpp::async::ConditionVariable cv;
executor.execute<TestCoroutineWait>(resource, &lock, &cv);
std::vector<std::thread> threads;
for (v_int32 i = 0; i < 100; i++) {
threads.push_back(std::thread([&resource, &lock, &cv] {
{
std::lock_guard<oatpp::async::Lock> guard(lock);
resource->counter++;
}
cv.notifyAll();
}));
}
for(auto& t : threads) {
t.join();
}
executor.waitTasksFinished();
executor.stop();
executor.join();
for (v_int32 i = 0; i < 100; i++) {
std::thread t([&resource, &lock, &cv] {
{
std::lock_guard<oatpp::async::Lock> guard(lock);
resource->counter++;
}
cv.notifyAll();
});
t.detach();
}
executor.waitTasksFinished();
executor.stop();
executor.join();
}
{
for (v_int32 iter = 0; iter < 100; iter++) {
OATPP_LOGD("ITERATION 'WAIT-WITH-TIMEOUT'", "%d", iter)
oatpp::async::Executor executor;
auto resource = std::make_shared<Resource>();
oatpp::async::Lock lock;
oatpp::async::ConditionVariable cv;
executor.execute<TestCoroutineWaitWithTimeout>(resource, &lock, &cv);
std::vector<std::thread> threads;
for (v_int32 i = 0; i < 100; i++) {
threads.push_back(std::thread([&resource, &lock, &cv] {
{
std::lock_guard<oatpp::async::Lock> guard(lock);
resource->counter++;
}
cv.notifyAll();
}));
}
for(auto& t : threads) {
t.join();
}
executor.waitTasksFinished();
executor.stop();
executor.join();
}
}
{
for (v_int32 iter = 0; iter < 5; iter++) {
OATPP_LOGD("ITERATION 'TIMEOUT'", "%d", iter)
oatpp::async::Executor executor;
auto resource = std::make_shared<Resource>();
oatpp::async::Lock lock;
oatpp::async::ConditionVariable cv;
executor.execute<TestCoroutineTimeout>(resource, &lock, &cv);
executor.waitTasksFinished();
executor.stop();
executor.join();
}
}
finished = true;
timeoutThread.join();
}
}}}