better IOWorker

This commit is contained in:
lganzzzo 2019-04-17 02:10:32 +03:00
parent 9f63e87d79
commit 475f024538
4 changed files with 63 additions and 28 deletions

View File

@ -30,6 +30,14 @@
namespace oatpp { namespace async {
IOWorker::IOWorker()
: Worker(Type::IO)
, m_running(true)
{
std::thread thread(&IOWorker::work, this);
thread.detach();
}
void IOWorker::pushTasks(oatpp::collection::FastQueue<AbstractCoroutine>& tasks) {
{
std::lock_guard<std::mutex> guard(m_backlogMutex);
@ -71,16 +79,23 @@ void IOWorker::work() {
v_int32 consumeIteration = 0;
v_int32 roundIteration = 0;
std::chrono::microseconds ms = std::chrono::duration_cast<std::chrono::microseconds>(std::chrono::system_clock::now().time_since_epoch());
v_int64 tick = ms.count();
while(m_running) {
auto CP = m_queue.first;
if(CP != nullptr) {
Action action = CP->iterate();
auto& schA = getCoroutineScheduledAction(CP);
switch(action.getType()) {
case Action::TYPE_IO_REPEAT:
dismissAction(schA);
++ roundIteration;
if(roundIteration == 10) {
roundIteration = 0;
@ -90,16 +105,30 @@ void IOWorker::work() {
// case Action::TYPE_IO_WAIT:
// roundIteration = 0;
// m_queue.round();
// break;
// case Action::TYPE_IO_WAIT: // schedule for timer
// roundIteration = 0;
// m_queue.popFront();
// setCoroutineScheduledAction(CP, std::move(action));
// setCoroutineScheduledAction(CP, oatpp::async::Action::createWaitRepeatAction(0));
// getCoroutineProcessor(CP)->pushOneTaskFromIO(CP);
// break;
case Action::TYPE_IO_WAIT: // schedule for timer
case Action::TYPE_IO_WAIT:
roundIteration = 0;
m_queue.popFront();
setCoroutineScheduledAction(CP, oatpp::async::Action::createWaitRepeatAction(0));
getCoroutineProcessor(CP)->pushOneTaskFromIO(CP);
if(schA.getType() == Action::TYPE_WAIT_REPEAT) {
if(getCoroutineTimePoint(CP) < tick) {
m_queue.popFront();
setCoroutineScheduledAction(CP, oatpp::async::Action::createWaitRepeatAction(0));
getCoroutineProcessor(CP)->pushOneTaskFromIO(CP);
} else {
m_queue.round();
}
} else {
setCoroutineScheduledAction(CP, oatpp::async::Action::createWaitRepeatAction(tick + 1000000));
m_queue.round();
}
break;
default:
@ -117,14 +146,26 @@ void IOWorker::work() {
if(consumeIteration == 100) {
consumeIteration = 0;
consumeBacklog(false);
std::chrono::microseconds ms = std::chrono::duration_cast<std::chrono::microseconds>(std::chrono::system_clock::now().time_since_epoch());
tick = ms.count();
}
} else {
consumeBacklog(true);
std::chrono::microseconds ms = std::chrono::duration_cast<std::chrono::microseconds>(std::chrono::system_clock::now().time_since_epoch());
tick = ms.count();
}
}
}
void IOWorker::stop() {
{
std::lock_guard<std::mutex> lock(m_backlogMutex);
m_running = false;
}
m_backlogCondition.notify_one();
}
}}

View File

@ -45,16 +45,7 @@ private:
void consumeBacklog(bool blockToConsume);
public:
IOWorker()
: Worker(Type::IO)
, m_running(true)
{
std::thread thread(&IOWorker::work, this);
thread.detach();
}
~IOWorker() {
}
IOWorker();
void pushTasks(oatpp::collection::FastQueue<AbstractCoroutine>& tasks) override;
@ -62,13 +53,7 @@ public:
void work();
void stop() override {
{
std::lock_guard<std::mutex> lock(m_backlogMutex);
m_running = false;
}
m_backlogCondition.notify_one();
}
void stop() override;
};

View File

@ -26,10 +26,18 @@
namespace oatpp { namespace async {
Worker::Worker(Type type)
: m_type(type)
{}
void Worker::setCoroutineScheduledAction(AbstractCoroutine *CP, Action &&action) {
CP->_SCH_A = std::forward<Action>(action);
}
Action& Worker::getCoroutineScheduledAction(AbstractCoroutine* CP) {
return CP->_SCH_A;
}
Processor* Worker::getCoroutineProcessor(AbstractCoroutine* CP) {
return CP->_PP;
}
@ -46,4 +54,8 @@ AbstractCoroutine* Worker::nextCoroutine(AbstractCoroutine* CP) {
return CP->_ref;
}
Worker::Type Worker::getType() {
return m_type;
}
}}

View File

@ -60,15 +60,14 @@ private:
Type m_type;
protected:
static void setCoroutineScheduledAction(AbstractCoroutine* CP, Action&& action);
static Action& getCoroutineScheduledAction(AbstractCoroutine* CP);
static Processor* getCoroutineProcessor(AbstractCoroutine* CP);
static v_int64 getCoroutineTimePoint(AbstractCoroutine* CP);
static void dismissAction(Action& action);
static AbstractCoroutine* nextCoroutine(AbstractCoroutine* CP);
public:
Worker(Type type)
: m_type(type)
{}
Worker(Type type);
virtual ~Worker() = default;
@ -77,9 +76,7 @@ public:
virtual void stop() = 0;
Type getType() {
return m_type;
}
Type getType();
};