Concept of CoroutineWaitList

This commit is contained in:
lganzzzo 2019-04-26 05:10:42 +03:00
parent 7744874467
commit 3836421898
13 changed files with 349 additions and 113 deletions

View File

@ -17,6 +17,8 @@ add_library(oatpp
oatpp/core/async/Error.hpp
oatpp/core/async/Coroutine.cpp
oatpp/core/async/Coroutine.hpp
oatpp/core/async/CoroutineWaitList.cpp
oatpp/core/async/CoroutineWaitList.hpp
oatpp/core/async/Processor.cpp
oatpp/core/async/Processor.hpp
oatpp/core/async/worker/Worker.cpp
@ -191,6 +193,7 @@ add_library(oatpp
oatpp/web/url/mapping/Subscriber.hpp
oatpp/core/parser/ParsingError.cpp
oatpp/core/parser/ParsingError.hpp
oatpp/core/parser/ParsingError.hpp
)
set_target_properties(oatpp PROPERTIES

View File

@ -59,6 +59,12 @@ Action Action::createWaitRepeatAction(v_int64 timePointMicroseconds) {
return result;
}
Action Action::createWaitListAction(CoroutineWaitList* waitList) {
Action result(TYPE_WAIT_LIST);
result.m_data.waitList = waitList;
return result;
}
Action::Action(AbstractCoroutine* coroutine)
: m_type(TYPE_COROUTINE)
{

View File

@ -43,6 +43,7 @@ namespace oatpp { namespace async {
class AbstractCoroutine; // FWD
class Processor; // FWD
class CoroutineStarter; // FWD
class CoroutineWaitList; // FWD
namespace worker {
class Worker; // FWD
@ -105,6 +106,11 @@ public:
*/
static constexpr const v_int32 TYPE_ERROR = 8;
/**
* Indicate that coroutine should be put on a wait-list provided.
*/
static constexpr const v_int32 TYPE_WAIT_LIST = 9;
public:
static constexpr const v_int32 IO_EVENT_READ = 0;
@ -123,6 +129,7 @@ private:
AbstractCoroutine* coroutine;
IOData ioData;
v_int64 timePointMicroseconds;
CoroutineWaitList* waitList;
};
private:
mutable v_int32 m_type;
@ -162,10 +169,17 @@ public:
/**
* Create TYPE_WAIT_REPEAT Action.
* @param timePointMicroseconds - time since epoch.
* @return
* @return - Action.
*/
static Action createWaitRepeatAction(v_int64 timePointMicroseconds);
/**
* Create TYPE_WAIT_LIST Action.
* @param waitList - wait-list to put coroutine on.
* @return - Action.
*/
static Action createWaitListAction(CoroutineWaitList* waitList);
/**
* Constructor. Create start-coroutine Action.
* @param coroutine - pointer to &l:AbstractCoroutine;.
@ -287,6 +301,7 @@ class AbstractCoroutine : public oatpp::base::Countable {
friend Processor;
friend CoroutineStarter;
friend worker::Worker;
friend CoroutineWaitList;
public:
/**
* Convenience typedef for Action

View File

@ -0,0 +1,66 @@
/***************************************************************************
*
* Project _____ __ ____ _ _
* ( _ ) /__\ (_ _)_| |_ _| |_
* )(_)( /(__)\ )( (_ _)(_ _)
* (_____)(__)(__)(__) |_| |_|
*
*
* Copyright 2018-present, Leonid Stryzhevskyi <lganzzzo@gmail.com>
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
***************************************************************************/
#include "CoroutineWaitList.hpp"
#include "./Processor.hpp"
namespace oatpp { namespace async {
CoroutineWaitList::CoroutineWaitList(CoroutineWaitList&& other) {
std::memcpy(&m_list, &other.m_list, sizeof(m_list));
std::memset(&other.m_list, 0, sizeof(m_list));
}
CoroutineWaitList::~CoroutineWaitList() {
notifyAllAndClear();
}
void CoroutineWaitList::setListener(Listener* listener) {
m_listener = listener;
}
void CoroutineWaitList::put(AbstractCoroutine* coroutine) {
{
std::lock_guard<oatpp::concurrency::SpinLock> lock(m_lock);
m_list.pushBack(coroutine);
}
if(m_listener != nullptr) {
m_listener->onNewItem(*this);
}
}
void CoroutineWaitList::notifyAllAndClear() {
std::lock_guard<oatpp::concurrency::SpinLock> lock(m_lock);
auto curr = m_list.first;
while(curr != nullptr) {
curr->_PP->pushOneTask(curr);
curr = curr->_ref;
}
std::memset(&m_list, 0, sizeof(m_list));
}
}}

View File

@ -0,0 +1,105 @@
/***************************************************************************
*
* Project _____ __ ____ _ _
* ( _ ) /__\ (_ _)_| |_ _| |_
* )(_)( /(__)\ )( (_ _)(_ _)
* (_____)(__)(__)(__) |_| |_|
*
*
* Copyright 2018-present, Leonid Stryzhevskyi <lganzzzo@gmail.com>
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
***************************************************************************/
#ifndef oatpp_async_CoroutineWaitList_hpp
#define oatpp_async_CoroutineWaitList_hpp
#include "oatpp/core/async/Coroutine.hpp"
#include "oatpp/core/collection/FastQueue.hpp"
#include "oatpp/core/concurrency/SpinLock.hpp"
#include <mutex>
namespace oatpp { namespace async {
/**
* List of &id:oatpp::async::Coroutine; waiting to be notified.
*/
class CoroutineWaitList {
friend Processor;
public:
class Listener {
public:
virtual ~Listener() = default;
virtual void onNewItem(CoroutineWaitList& list) = 0;
};
private:
oatpp::collection::FastQueue<AbstractCoroutine> m_list;
oatpp::concurrency::SpinLock m_lock;
Listener* m_listener = nullptr;
protected:
/*
* Put coroutine on wait-list.
* This method should be called by Coroutine Processor only.
* @param coroutine
*/
void put(AbstractCoroutine* coroutine);
public:
/**
* Deleted copy-constructor.
* @param other
*/
CoroutineWaitList(const CoroutineWaitList&) = delete;
CoroutineWaitList& operator=(const CoroutineWaitList&) = delete;
public:
/**
* Default constructor.
*/
CoroutineWaitList() = default;
/**
* Move-constructor.
* @param other
*/
CoroutineWaitList(CoroutineWaitList&& other);
/**
* Virtual destructor.
* Will call notifyAllAndClear().
*/
virtual ~CoroutineWaitList();
void setListener(Listener* listener);
/**
* Put all coroutines back to its processors.
*/
void notifyAllAndClear();
CoroutineWaitList& operator=(CoroutineWaitList&& other) {
notifyAllAndClear();
std::lock_guard<oatpp::concurrency::SpinLock> lock(m_lock);
std::memcpy(&m_list, &other.m_list, sizeof(m_list));
std::memset(&other.m_list, 0, sizeof(m_list));
return *this;
}
};
}}
#endif //oatpp_async_CoroutineWaitList_hpp

View File

@ -23,6 +23,7 @@
***************************************************************************/
#include "Processor.hpp"
#include "./CoroutineWaitList.hpp"
#include "oatpp/core/async/worker/Worker.hpp"
namespace oatpp { namespace async {
@ -91,6 +92,12 @@ void Processor::addCoroutine(AbstractCoroutine* coroutine) {
popTimerTask(coroutine);
break;
// case Action::TYPE_WAIT_LIST:
// coroutine->_SCH_A = Action::createActionByType(Action::TYPE_NONE);
// m_queue.popFront();
// action.m_data.waitList->put(coroutine);
// break;
default:
m_queue.pushBack(coroutine);
@ -104,47 +111,27 @@ void Processor::addCoroutine(AbstractCoroutine* coroutine) {
}
void Processor::pushOneTaskFromIO(AbstractCoroutine* coroutine) {
void Processor::pushOneTask(AbstractCoroutine* coroutine) {
{
std::lock_guard<oatpp::concurrency::SpinLock> waitLock(m_waitLock);
std::lock_guard<oatpp::concurrency::SpinLock> lock(m_sch_push_io_lock);
m_sch_push_io.pushBack(coroutine);
std::lock_guard<oatpp::concurrency::SpinLock> lock(m_taskLock);
m_pushList.pushBack(coroutine);
}
m_waitCondition.notify_one();
m_taskCondition.notify_one();
}
void Processor::pushTasksFromIO(oatpp::collection::FastQueue<AbstractCoroutine>& tasks) {
void Processor::pushTasks(oatpp::collection::FastQueue<AbstractCoroutine>& tasks) {
{
std::lock_guard<oatpp::concurrency::SpinLock> waitLock(m_waitLock);
std::lock_guard<oatpp::concurrency::SpinLock> lock(m_sch_push_io_lock);
collection::FastQueue<AbstractCoroutine>::moveAll(tasks, m_sch_push_io);
std::lock_guard<oatpp::concurrency::SpinLock> lock(m_taskLock);
collection::FastQueue<AbstractCoroutine>::moveAll(tasks, m_pushList);
}
m_waitCondition.notify_one();
}
void Processor::pushOneTaskFromTimer(AbstractCoroutine* coroutine) {
{
std::lock_guard<oatpp::concurrency::SpinLock> waitLock(m_waitLock);
std::lock_guard<oatpp::concurrency::SpinLock> lock(m_sch_push_timer_lock);
m_sch_push_timer.pushBack(coroutine);
}
m_waitCondition.notify_one();
}
void Processor::pushTasksFromTimer(oatpp::collection::FastQueue<AbstractCoroutine>& tasks) {
{
std::lock_guard<oatpp::concurrency::SpinLock> waitLock(m_waitLock);
std::lock_guard<oatpp::concurrency::SpinLock> lock(m_sch_push_timer_lock);
collection::FastQueue<AbstractCoroutine>::moveAll(tasks, m_sch_push_timer);
}
m_waitCondition.notify_one();
m_taskCondition.notify_one();
}
void Processor::waitForTasks() {
std::unique_lock<oatpp::concurrency::SpinLock> lock(m_waitLock);
while (m_sch_push_io.first == nullptr && m_sch_push_timer.first == nullptr && m_taskList.empty() && m_running) {
m_waitCondition.wait(lock);
std::unique_lock<oatpp::concurrency::SpinLock> lock(m_taskLock);
while (m_pushList.first == nullptr && m_taskList.empty() && m_running) {
m_taskCondition.wait(lock);
}
}
@ -165,12 +152,6 @@ void Processor::popTasks() {
}
void Processor::pushAllFromQueue(oatpp::collection::FastQueue<AbstractCoroutine>& pushQueue) {
while(pushQueue.first != nullptr) {
addCoroutine(pushQueue.popFront());
}
}
void Processor::consumeAllTasks() {
for(auto& submission : m_taskList) {
auto coroutine = submission->createCoroutine();
@ -186,37 +167,29 @@ void Processor::pushQueues() {
if(!m_taskList.empty()) {
if (m_taskList.size() < MAX_BATCH_SIZE && m_queue.first != nullptr) {
std::unique_lock<std::mutex> lock(m_taskMutex, std::try_to_lock);
std::unique_lock<oatpp::concurrency::SpinLock> lock(m_taskLock, std::try_to_lock);
if (lock.owns_lock()) {
consumeAllTasks();
}
} else {
std::lock_guard<std::mutex> lock(m_taskMutex);
std::lock_guard<oatpp::concurrency::SpinLock> lock(m_taskLock);
consumeAllTasks();
}
}
if(m_sch_push_io.first != nullptr) {
if (m_sch_push_io.count < MAX_BATCH_SIZE && m_queue.first != nullptr) {
std::unique_lock<oatpp::concurrency::SpinLock> lock(m_sch_push_io_lock, std::try_to_lock);
if(m_pushList.first != nullptr) {
if (m_pushList.count < MAX_BATCH_SIZE && m_queue.first != nullptr) {
std::unique_lock<oatpp::concurrency::SpinLock> lock(m_taskLock, std::try_to_lock);
if (lock.owns_lock()) {
pushAllFromQueue(m_sch_push_io);
while(m_pushList.first != nullptr) {
addCoroutine(m_pushList.popFront());
}
}
} else {
std::lock_guard<oatpp::concurrency::SpinLock> lock(m_sch_push_io_lock);
pushAllFromQueue(m_sch_push_io);
}
}
if(m_sch_push_timer.first != nullptr) {
if (m_sch_push_timer.count < MAX_BATCH_SIZE && m_queue.first != nullptr) {
std::unique_lock<oatpp::concurrency::SpinLock> lock(m_sch_push_timer_lock, std::try_to_lock);
if (lock.owns_lock()) {
pushAllFromQueue(m_sch_push_timer);
std::lock_guard<oatpp::concurrency::SpinLock> lock(m_taskLock);
while(m_pushList.first != nullptr) {
addCoroutine(m_pushList.popFront());
}
} else {
std::lock_guard<oatpp::concurrency::SpinLock> lock(m_sch_push_timer_lock);
pushAllFromQueue(m_sch_push_timer);
}
}
@ -232,7 +205,7 @@ bool Processor::iterate(v_int32 numIterations) {
auto CP = m_queue.first;
if (CP == nullptr) {
break;
goto end_loop;
}
if (CP->finished()) {
m_queue.popFrontNoData();
@ -260,6 +233,12 @@ bool Processor::iterate(v_int32 numIterations) {
popTimerTask(CP);
break;
case Action::TYPE_WAIT_LIST:
CP->_SCH_A = Action::createActionByType(Action::TYPE_NONE);
m_queue.popFront();
action.m_data.waitList->put(CP);
break;
// default:
// m_queue.round();
}
@ -274,21 +253,20 @@ bool Processor::iterate(v_int32 numIterations) {
}
end_loop:
popTasks();
return m_queue.first != nullptr ||
m_sch_push_io.first != nullptr ||
m_sch_push_timer.first != nullptr ||
!m_taskList.empty();
return m_queue.first != nullptr || m_pushList.first != nullptr || !m_taskList.empty();
}
void Processor::stop() {
{
std::lock_guard<oatpp::concurrency::SpinLock> lock(m_waitLock);
std::lock_guard<oatpp::concurrency::SpinLock> lock(m_taskLock);
m_running = false;
}
m_waitCondition.notify_one();
m_taskCondition.notify_one();
}
}}

View File

@ -83,14 +83,6 @@ private:
};
private:
oatpp::collection::FastQueue<AbstractCoroutine> m_sch_push_io;
oatpp::collection::FastQueue<AbstractCoroutine> m_sch_push_timer;
oatpp::concurrency::SpinLock m_sch_push_io_lock;
oatpp::concurrency::SpinLock m_sch_push_timer_lock;
private:
std::vector<std::shared_ptr<worker::Worker>> m_ioWorkers;
@ -104,18 +96,15 @@ private:
private:
std::mutex m_taskMutex;
oatpp::concurrency::SpinLock m_taskLock;
std::condition_variable_any m_taskCondition;
std::list<std::shared_ptr<TaskSubmission>> m_taskList;
oatpp::collection::FastQueue<AbstractCoroutine> m_pushList;
private:
oatpp::collection::FastQueue<AbstractCoroutine> m_queue;
private:
oatpp::concurrency::SpinLock m_waitLock;
std::condition_variable_any m_waitCondition;
private:
bool m_running = true;
@ -128,28 +117,14 @@ private:
void consumeAllTasks();
void addCoroutine(AbstractCoroutine* coroutine);
void popTasks();
void pushAllFromQueue(oatpp::collection::FastQueue<AbstractCoroutine>& pushQueue);
void pushQueues();
public:
void addWorker(const std::shared_ptr<worker::Worker>& worker);
/**
* Return coroutine scheduled for I/O back to owner processor.
* @param coroutine
*/
void pushOneTaskFromIO(AbstractCoroutine* coroutine);
void pushTasksFromIO(oatpp::collection::FastQueue<AbstractCoroutine>& tasks);
/**
* Return coroutine scheduled for Timer back to owner processor.
* @param coroutine
*/
void pushOneTaskFromTimer(AbstractCoroutine* coroutine);
void pushTasksFromTimer(oatpp::collection::FastQueue<AbstractCoroutine>& tasks);
void pushOneTask(AbstractCoroutine* coroutine);
void pushTasks(oatpp::collection::FastQueue<AbstractCoroutine>& tasks);
/**
* Execute Coroutine.
@ -160,9 +135,9 @@ public:
template<typename CoroutineType, typename ... Args>
void execute(Args... params) {
auto submission = std::make_shared<SubmissionTemplate<CoroutineType, Args...>>(params...);
std::lock_guard<std::mutex> lock(m_taskMutex);
std::lock_guard<oatpp::concurrency::SpinLock> lock(m_taskLock);
m_taskList.push_back(submission);
m_waitCondition.notify_one();
m_taskCondition.notify_one();
}
/**

View File

@ -186,7 +186,7 @@ void IOEventWorker::waitEvents() {
}
setCoroutineScheduledAction(coroutine, std::move(action));
getCoroutineProcessor(coroutine)->pushOneTaskFromIO(coroutine);
getCoroutineProcessor(coroutine)->pushOneTask(coroutine);
}

View File

@ -175,7 +175,7 @@ void IOEventWorker::waitEvents() {
default:
setCoroutineScheduledAction(coroutine, std::move(action));
getCoroutineProcessor(coroutine)->pushOneTaskFromIO(coroutine);
getCoroutineProcessor(coroutine)->pushOneTask(coroutine);
}

View File

@ -112,7 +112,7 @@ void IOWorker::work() {
// roundIteration = 0;
// m_queue.popFront();
// setCoroutineScheduledAction(CP, oatpp::async::Action::createWaitRepeatAction(0));
// getCoroutineProcessor(CP)->pushOneTaskFromIO(CP);
// getCoroutineProcessor(CP)->pushOneTask(CP);
// break;
case Action::TYPE_IO_WAIT:
@ -121,7 +121,7 @@ void IOWorker::work() {
if(getCoroutineTimePoint(CP) < tick) {
m_queue.popFront();
setCoroutineScheduledAction(CP, oatpp::async::Action::createWaitRepeatAction(0));
getCoroutineProcessor(CP)->pushOneTaskFromIO(CP);
getCoroutineProcessor(CP)->pushOneTask(CP);
} else {
m_queue.round();
}
@ -135,7 +135,7 @@ void IOWorker::work() {
roundIteration = 0;
m_queue.popFront();
setCoroutineScheduledAction(CP, std::move(action));
getCoroutineProcessor(CP)->pushOneTaskFromIO(CP);
getCoroutineProcessor(CP)->pushOneTask(CP);
break;
}

View File

@ -98,7 +98,7 @@ void TimerWorker::work() {
default:
m_queue.cutEntry(curr, prev);
setCoroutineScheduledAction(curr, std::move(action));
getCoroutineProcessor(curr)->pushOneTaskFromTimer(curr);
getCoroutineProcessor(curr)->pushOneTask(curr);
curr = prev;
break;

View File

@ -71,8 +71,11 @@ data::v_io_size Pipe::Reader::read(void *data, data::v_io_size count) {
result = data::IOError::BROKEN_PIPE;
}
}
pipe.m_conditionWrite.notify_one();
if(result > 0) {
pipe.m_conditionWrite.notify_one();
pipe.m_writer.notifyWaitList();
}
return result;
@ -85,8 +88,13 @@ oatpp::async::Action Pipe::Reader::suggestInputStreamAction(data::v_io_size ioRe
}
switch (ioResult) {
case oatpp::data::IOError::WAIT_RETRY:
return oatpp::async::Action::createWaitRepeatAction(10 * 1000 /* 10 milliseconds */);
case oatpp::data::IOError::WAIT_RETRY: {
std::unique_lock<std::mutex> lock(m_pipe->m_mutex);
if (m_pipe->m_fifo.availableToRead() > 0) {
return oatpp::async::Action::createActionByType(oatpp::async::Action::TYPE_REPEAT);
}
return oatpp::async::Action::createWaitListAction(&m_waitList);
}
case oatpp::data::IOError::RETRY:
return oatpp::async::Action::createActionByType(oatpp::async::Action::TYPE_REPEAT);
}
@ -95,6 +103,12 @@ oatpp::async::Action Pipe::Reader::suggestInputStreamAction(data::v_io_size ioRe
}
void Pipe::Reader::notifyWaitList() {
m_waitList.notifyAllAndClear();
}
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
void Pipe::Writer::setOutputStreamIOMode(oatpp::data::stream::IOMode ioMode) {
m_ioMode = ioMode;
}
@ -140,8 +154,11 @@ data::v_io_size Pipe::Writer::write(const void *data, data::v_io_size count) {
result = data::IOError::BROKEN_PIPE;
}
}
pipe.m_conditionRead.notify_one();
if(result > 0) {
pipe.m_conditionRead.notify_one();
pipe.m_reader.notifyWaitList();
}
return result;
@ -154,8 +171,13 @@ oatpp::async::Action Pipe::Writer::suggestOutputStreamAction(data::v_io_size ioR
}
switch (ioResult) {
case oatpp::data::IOError::WAIT_RETRY:
return oatpp::async::Action::createWaitRepeatAction(10 * 1000 /* 10 milliseconds */);
case oatpp::data::IOError::WAIT_RETRY: {
std::unique_lock<std::mutex> lock(m_pipe->m_mutex);
if (m_pipe->m_fifo.availableToWrite() > 0) {
return oatpp::async::Action::createActionByType(oatpp::async::Action::TYPE_REPEAT);
}
return oatpp::async::Action::createWaitListAction(&m_waitList);
}
case oatpp::data::IOError::RETRY:
return oatpp::async::Action::createActionByType(oatpp::async::Action::TYPE_REPEAT);
}
@ -164,6 +186,12 @@ oatpp::async::Action Pipe::Writer::suggestOutputStreamAction(data::v_io_size ioR
}
void Pipe::Writer::notifyWaitList() {
m_waitList.notifyAllAndClear();
}
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
Pipe::Pipe()
: m_open(true)
, m_writer(this)
@ -190,7 +218,9 @@ void Pipe::close() {
m_open = false;
}
m_conditionRead.notify_one();
m_reader.notifyWaitList();
m_conditionWrite.notify_one();
m_writer.notifyWaitList();
}
}}}

View File

@ -25,6 +25,8 @@
#ifndef oatpp_network_virtual__Pipe_hpp
#define oatpp_network_virtual__Pipe_hpp
#include "oatpp/core/async/CoroutineWaitList.hpp"
#include "oatpp/core/data/stream/Stream.hpp"
#include "oatpp/core/data/buffer/FIFOBuffer.hpp"
@ -50,6 +52,24 @@ public:
*/
class Reader : public oatpp::data::stream::InputStream {
friend Pipe;
private:
class WaitListListener : public oatpp::async::CoroutineWaitList::Listener {
private:
Pipe* m_pipe;
public:
WaitListListener(Pipe* pipe)
: m_pipe(pipe)
{}
void onNewItem(oatpp::async::CoroutineWaitList& list) override {
std::unique_lock<std::mutex> lock(m_pipe->m_mutex);
if (m_pipe->m_fifo.availableToRead() > 0) {
list.notifyAllAndClear();
}
}
};
private:
Pipe* m_pipe;
oatpp::data::stream::IOMode m_ioMode;
@ -58,13 +78,19 @@ public:
* this one used for testing purposes only
*/
data::v_io_size m_maxAvailableToRead;
oatpp::async::CoroutineWaitList m_waitList;
WaitListListener m_waitListListener;
protected:
Reader(Pipe* pipe, oatpp::data::stream::IOMode ioMode = oatpp::data::stream::IOMode::BLOCKING)
: m_pipe(pipe)
, m_ioMode(ioMode)
, m_maxAvailableToRead(-1)
{}
, m_waitListListener(pipe)
{
m_waitList.setListener(&m_waitListListener);
}
public:
@ -105,6 +131,11 @@ public:
* @return
*/
oatpp::data::stream::IOMode getInputStreamIOMode() override;
/**
* Notify coroutine wait-list
*/
void notifyWaitList();
};
@ -114,6 +145,22 @@ public:
*/
class Writer : public oatpp::data::stream::OutputStream {
friend Pipe;
private:
class WaitListListener : public oatpp::async::CoroutineWaitList::Listener {
private:
Pipe* m_pipe;
public:
WaitListListener(Pipe* pipe)
: m_pipe(pipe)
{}
void onNewItem(oatpp::async::CoroutineWaitList& list) override {
std::unique_lock<std::mutex> lock(m_pipe->m_mutex);
if (m_pipe->m_fifo.availableToWrite() > 0) {
list.notifyAllAndClear();
}
}
};
private:
Pipe* m_pipe;
oatpp::data::stream::IOMode m_ioMode;
@ -122,13 +169,19 @@ public:
* this one used for testing purposes only
*/
data::v_io_size m_maxAvailableToWrtie;
oatpp::async::CoroutineWaitList m_waitList;
WaitListListener m_waitListListener;
protected:
Writer(Pipe* pipe, oatpp::data::stream::IOMode ioMode = oatpp::data::stream::IOMode::BLOCKING)
: m_pipe(pipe)
, m_ioMode(ioMode)
, m_maxAvailableToWrtie(-1)
{}
, m_waitListListener(pipe)
{
m_waitList.setListener(&m_waitListListener);
}
public:
@ -168,6 +221,11 @@ public:
* @return
*/
oatpp::data::stream::IOMode getOutputStreamIOMode() override;
/**
* Notify coroutine wait-list
*/
void notifyWaitList();
};