Draft of IOEventWorker

This commit is contained in:
lganzzzo 2019-04-24 07:03:31 +03:00
parent 48b217ede1
commit 755bcad21a
10 changed files with 411 additions and 25 deletions

View File

@ -23,6 +23,8 @@ add_library(oatpp
oatpp/core/async/worker/Worker.hpp
oatpp/core/async/worker/IOWorker.cpp
oatpp/core/async/worker/IOWorker.hpp
oatpp/core/async/worker/IOEventWorker.cpp
oatpp/core/async/worker/IOEventWorker.hpp
oatpp/core/async/worker/TimerWorker.cpp
oatpp/core/async/worker/TimerWorker.hpp
oatpp/core/async/Executor.cpp

View File

@ -39,15 +39,17 @@ Action Action::createActionByType(v_int32 type) {
return Action(type);
}
Action Action::createIOWaitAction(data::v_io_handle ioHandle) {
Action Action::createIOWaitAction(data::v_io_handle ioHandle, v_int32 ioEventType) {
Action result(TYPE_IO_WAIT);
result.m_data.ioHandle = ioHandle;
result.m_data.ioData.ioHandle = ioHandle;
result.m_data.ioData.ioEventType = ioEventType;
return result;
}
Action Action::createIORepeatAction(data::v_io_handle ioHandle) {
Action Action::createIORepeatAction(data::v_io_handle ioHandle, v_int32 ioEventType) {
Action result(TYPE_IO_REPEAT);
result.m_data.ioHandle = ioHandle;
result.m_data.ioData.ioHandle = ioHandle;
result.m_data.ioData.ioEventType = ioEventType;
return result;
}
@ -102,6 +104,14 @@ v_int32 Action::getType() const {
return m_type;
}
oatpp::data::v_io_handle Action::getIOHandle() const {
return m_data.ioData.ioHandle;
}
v_int32 Action::getIOEventType() const {
return m_data.ioData.ioEventType;
}
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// CoroutineStarter

View File

@ -105,11 +105,23 @@ public:
*/
static constexpr const v_int32 TYPE_ERROR = 8;
public:
static constexpr const v_int32 IO_EVENT_READ = 0;
static constexpr const v_int32 IO_EVENT_WRITE = 1;
private:
struct IOData {
oatpp::data::v_io_handle ioHandle;
v_int32 ioEventType;
};
private:
union Data {
FunctionPtr fptr;
AbstractCoroutine* coroutine;
oatpp::data::v_io_handle ioHandle;
IOData ioData;
v_int64 timePointMicroseconds;
};
private:
@ -138,14 +150,14 @@ public:
* @param ioHandle - &id:oatpp::data::v_io_handle;.
* @return - Action.
*/
static Action createIOWaitAction(data::v_io_handle ioHandle);
static Action createIOWaitAction(data::v_io_handle ioHandle, v_int32 ioEventType);
/**
* Create TYPE_IO_REPEAT Action
* @param ioHandle - &id:oatpp::data::v_io_handle;.
* @return - Action.
*/
static Action createIORepeatAction(data::v_io_handle ioHandle);
static Action createIORepeatAction(data::v_io_handle ioHandle, v_int32 ioEventType);
/**
* Create TYPE_WAIT_REPEAT Action.
@ -203,6 +215,10 @@ public:
* @return - action type.
*/
v_int32 getType() const;
oatpp::data::v_io_handle getIOHandle() const;
v_int32 getIOEventType() const;
};
@ -481,16 +497,16 @@ public:
* Convenience method to generate Action of `type == Action::TYPE_IO_WAIT`.
* @return - TYPE_WAIT_FOR_IO Action.
*/
Action ioWait(data::v_io_handle ioHandle) const {
return Action::createIOWaitAction(ioHandle);
Action ioWait(data::v_io_handle ioHandle, v_int32 ioEventType) const {
return Action::createIOWaitAction(ioHandle, ioEventType);
}
/**
* Convenience method to generate Action of `type == Action::TYPE_IO_WAIT`.
* @return - TYPE_IO_REPEAT Action.
*/
Action ioRepeat(data::v_io_handle ioHandle) const {
return Action::createIORepeatAction(ioHandle);
Action ioRepeat(data::v_io_handle ioHandle, v_int32 ioEventType) const {
return Action::createIORepeatAction(ioHandle, ioEventType);
}
/**
@ -670,16 +686,16 @@ public:
* Convenience method to generate Action of `type == Action::TYPE_IO_WAIT`.
* @return - TYPE_WAIT_FOR_IO Action.
*/
Action ioWait(data::v_io_handle ioHandle) const {
return Action::createIOWaitAction(ioHandle);
Action ioWait(data::v_io_handle ioHandle, v_int32 ioEventType) const {
return Action::createIOWaitAction(ioHandle, ioEventType);
}
/**
* Convenience method to generate Action of `type == Action::TYPE_IO_WAIT`.
* @return - TYPE_IO_REPEAT Action.
*/
Action ioRepeat(data::v_io_handle ioHandle) const {
return Action::createIORepeatAction(ioHandle);
Action ioRepeat(data::v_io_handle ioHandle, v_int32 ioEventType) const {
return Action::createIORepeatAction(ioHandle, ioEventType);
}
/**

View File

@ -23,7 +23,7 @@
***************************************************************************/
#include "Executor.hpp"
#include "oatpp/core/async/worker/IOEventWorker.hpp"
#include "oatpp/core/async/worker/IOWorker.hpp"
#include "oatpp/core/async/worker/TimerWorker.hpp"
@ -69,7 +69,7 @@ Executor::Executor(v_int32 processorThreads, v_int32 ioThreads, v_int32 timerThr
std::vector<std::shared_ptr<worker::Worker>> ioWorkers;
for(v_int32 i = 0; i < m_ioThreads; i++) {
ioWorkers.push_back(std::make_shared<worker::IOWorker>());
ioWorkers.push_back(std::make_shared<worker::IOEventWorker>());
}
linkWorkers(ioWorkers);

View File

@ -113,6 +113,15 @@ void Processor::pushOneTaskFromIO(AbstractCoroutine* coroutine) {
m_waitCondition.notify_one();
}
void Processor::pushTasksFromIO(oatpp::collection::FastQueue<AbstractCoroutine>& tasks) {
{
std::lock_guard<oatpp::concurrency::SpinLock> waitLock(m_waitLock);
std::lock_guard<oatpp::concurrency::SpinLock> lock(m_sch_push_io_lock);
collection::FastQueue<AbstractCoroutine>::moveAll(tasks, m_sch_push_io);
}
m_waitCondition.notify_one();
}
void Processor::pushOneTaskFromTimer(AbstractCoroutine* coroutine) {
{
std::lock_guard<oatpp::concurrency::SpinLock> waitLock(m_waitLock);

View File

@ -141,6 +141,8 @@ public:
*/
void pushOneTaskFromIO(AbstractCoroutine* coroutine);
void pushTasksFromIO(oatpp::collection::FastQueue<AbstractCoroutine>& tasks);
/**
* Return coroutine scheduled for Timer back to owner processor.
* @param coroutine

View File

@ -0,0 +1,270 @@
/***************************************************************************
*
* Project _____ __ ____ _ _
* ( _ ) /__\ (_ _)_| |_ _| |_
* )(_)( /(__)\ )( (_ _)(_ _)
* (_____)(__)(__)(__) |_| |_|
*
*
* Copyright 2018-present, Leonid Stryzhevskyi <lganzzzo@gmail.com>
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
***************************************************************************/
#include "IOEventWorker.hpp"
#include "oatpp/core/async/Processor.hpp"
#include <chrono>
#include <unistd.h>
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// BSD
#include <sys/event.h>
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
namespace oatpp { namespace async { namespace worker {
IOEventWorker::IOEventWorker()
: Worker(Type::IO)
, m_running(true)
, m_eventQueueHandle(-1)
, m_inEvents(nullptr)
, m_inEventsCount(0)
, m_outEvents(nullptr)
{
OATPP_LOGD("IOEventWorker", "created");
std::thread thread(&IOEventWorker::work, this);
thread.detach();
}
IOEventWorker::~IOEventWorker() {
if(m_inEvents != nullptr) {
delete[] m_inEvents;
}
if(m_outEvents != nullptr) {
delete[] m_outEvents;
}
}
void IOEventWorker::initEventQueue() {
m_eventQueueHandle = ::kqueue();
if(m_eventQueueHandle == -1) {
throw std::runtime_error("[oatpp::async::worker::IOEventWorker::initEventQueue()]: Error. Call to ::kqueue() failed.");
}
m_outEvents = (p_char8)(new struct kevent[MAX_EVENTS]);
}
void IOEventWorker::triggerWakeup() {
struct kevent event;
memset(&event, 0, sizeof(event));
event.ident = 0;
event.filter = EVFILT_USER;
event.fflags = NOTE_TRIGGER;
auto res = kevent(m_eventQueueHandle, &event, 1, nullptr, 0, NULL);
if(res < 0) {
throw std::runtime_error("[oatpp::async::worker::IOEventWorker::triggerWakeup()]: Error. trigger wakeup failed.");
}
}
void IOEventWorker::setTriggerEvent(p_char8 eventPtr) {
struct kevent* event = (struct kevent*) eventPtr;
std::memset(event, 0, sizeof(struct kevent));
event->ident = 0;
event->filter = EVFILT_USER;
event->flags = EV_ADD | EV_CLEAR;
}
void IOEventWorker::setCoroutineEvent(AbstractCoroutine* coroutine, p_char8 eventPtr) {
auto& action = getCoroutineScheduledAction(coroutine);
struct kevent* event = (struct kevent*) eventPtr;
std::memset(event, 0, sizeof(struct kevent));
event->ident = action.getIOHandle();
event->flags = EV_ADD | EV_ONESHOT;
event->udata = coroutine;
switch(action.getIOEventType()) {
case Action::IO_EVENT_READ:
event->filter = EVFILT_READ;
break;
case Action::IO_EVENT_WRITE:
event->filter = EVFILT_WRITE;
break;
default:
throw std::runtime_error("[oatpp::async::worker::IOEventWorker::pushCoroutineToQueue()]: Error. Unknown Action Event Type.");
}
switch(action.getType()) {
case Action::TYPE_IO_WAIT:
return;
case Action::TYPE_IO_REPEAT:
return;
default:
throw std::runtime_error("[oatpp::async::worker::IOEventWorker::pushCoroutineToQueue()]: Error. Unknown Action.");
}
}
void IOEventWorker::pushTasks(oatpp::collection::FastQueue<AbstractCoroutine>& tasks) {
if(tasks.first != nullptr) {
{
std::lock_guard<oatpp::concurrency::SpinLock> guard(m_backlogLock);
oatpp::collection::FastQueue<AbstractCoroutine>::moveAll(tasks, m_backlog);
}
triggerWakeup();
}
}
void IOEventWorker::pushOneTask(AbstractCoroutine* task) {
{
std::lock_guard<oatpp::concurrency::SpinLock> guard(m_backlogLock);
m_backlog.pushBack(task);
}
triggerWakeup();
}
void IOEventWorker::consumeBacklog() {
std::lock_guard<oatpp::concurrency::SpinLock> lock(m_backlogLock);
m_inEventsCount = m_backlog.count + 1;
m_inEvents = (p_char8)(new struct kevent[m_inEventsCount]);
v_int32 eventSize = sizeof(struct kevent);
setTriggerEvent(&m_inEvents[0]);
auto curr = m_backlog.first;
v_int32 i = 1;
while(curr != nullptr) {
setCoroutineEvent(curr, &m_inEvents[i * eventSize]);
curr = nextCoroutine(curr);
++i;
}
m_backlog.first = nullptr;
m_backlog.last = nullptr;
m_backlog.count = 0;
}
void IOEventWorker::waitEvents() {
auto eventsCount = kevent(m_eventQueueHandle, (struct kevent*)m_inEvents, m_inEventsCount, (struct kevent*)m_outEvents, MAX_EVENTS, NULL);
if(eventsCount < 0) {
throw std::runtime_error("[oatpp::async::worker::IOEventWorker::waitEvents()]: Error. Event loop failed.");
}
v_int32 eventSize = sizeof(struct kevent);
oatpp::collection::FastQueue<AbstractCoroutine> m_repeatQueue;
//OATPP_LOGD("IOEventWorker", "eventsCount=%d", eventsCount);
for(v_int32 i = 0; i < eventsCount; i ++) {
struct kevent* event = (struct kevent *)&m_outEvents[i * eventSize];
auto coroutine = (AbstractCoroutine*) event->udata;
if((event->flags & EV_ERROR) > 0) {
OATPP_LOGD("Error", "data='%s'", strerror(event->data));
continue;
}
if(coroutine != nullptr) {
Action action = coroutine->iterate();
switch(action.getType()) {
case Action::TYPE_IO_WAIT:
setCoroutineScheduledAction(coroutine, std::move(action));
m_repeatQueue.pushBack(coroutine);
break;
case Action::TYPE_IO_REPEAT:
setCoroutineScheduledAction(coroutine, std::move(action));
m_repeatQueue.pushBack(coroutine);
break;
default:
setCoroutineScheduledAction(coroutine, std::move(action));
getCoroutineProcessor(coroutine)->pushOneTaskFromIO(coroutine);
}
}
}
if(m_repeatQueue.count > 0) {
{
std::lock_guard<oatpp::concurrency::SpinLock> lock(m_backlogLock);
oatpp::collection::FastQueue<AbstractCoroutine>::moveAll(m_repeatQueue, m_backlog);
}
}
}
void IOEventWorker::work() {
initEventQueue();
while(m_running) {
consumeBacklog();
//OATPP_LOGD("IOEventWorker", "Waiting events...");
waitEvents();
}
}
void IOEventWorker::stop() {
{
std::lock_guard<oatpp::concurrency::SpinLock> lock(m_backlogLock);
m_running = false;
}
triggerWakeup();
}
}}}

View File

@ -0,0 +1,77 @@
/***************************************************************************
*
* Project _____ __ ____ _ _
* ( _ ) /__\ (_ _)_| |_ _| |_
* )(_)( /(__)\ )( (_ _)(_ _)
* (_____)(__)(__)(__) |_| |_|
*
*
* Copyright 2018-present, Leonid Stryzhevskyi <lganzzzo@gmail.com>
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
***************************************************************************/
#ifndef oatpp_async_worker_IOEventWorker_hpp
#define oatpp_async_worker_IOEventWorker_hpp
#include "./Worker.hpp"
#include "oatpp/core/collection/LinkedList.hpp"
#include "oatpp/core/concurrency/SpinLock.hpp"
#include <unordered_map>
#include <thread>
#include <mutex>
#include <condition_variable>
namespace oatpp { namespace async { namespace worker {
class IOEventWorker : public Worker {
private:
static constexpr const v_int32 MAX_EVENTS = 10000;
private:
bool m_running;
oatpp::collection::FastQueue<AbstractCoroutine> m_backlog;
oatpp::concurrency::SpinLock m_backlogLock;
private:
oatpp::data::v_io_handle m_eventQueueHandle;
p_char8 m_inEvents;
v_int32 m_inEventsCount;
p_char8 m_outEvents;
private:
void consumeBacklog();
void waitEvents();
private:
void initEventQueue();
void triggerWakeup();
void setTriggerEvent(p_char8 eventPtr);
void setCoroutineEvent(AbstractCoroutine* coroutine, p_char8 eventPtr);
public:
IOEventWorker();
~IOEventWorker();
void pushTasks(oatpp::collection::FastQueue<AbstractCoroutine>& tasks) override;
void pushOneTask(AbstractCoroutine* task) override;
void work();
void stop() override;
};
}}}
#endif //oatpp_async_worker_IOEventWorker_hpp

View File

@ -128,14 +128,14 @@ oatpp::data::stream::IOMode Connection::getStreamIOMode() {
oatpp::async::Action Connection::suggestOutputStreamAction(data::v_io_size ioResult) {
if(ioResult > 0) {
return oatpp::async::Action::createIORepeatAction(m_handle);
return oatpp::async::Action::createIORepeatAction(m_handle, oatpp::async::Action::IO_EVENT_WRITE);
}
switch (ioResult) {
case oatpp::data::IOError::WAIT_RETRY:
return oatpp::async::Action::createIOWaitAction(m_handle);
return oatpp::async::Action::createIOWaitAction(m_handle, oatpp::async::Action::IO_EVENT_WRITE);
case oatpp::data::IOError::RETRY:
return oatpp::async::Action::createIORepeatAction(m_handle);
return oatpp::async::Action::createIORepeatAction(m_handle, oatpp::async::Action::IO_EVENT_WRITE);
}
throw std::runtime_error("[oatpp::network::virtual_::Pipe::Reader::suggestInputStreamAction()]: Error. Unable to suggest async action for I/O result.");
@ -145,14 +145,14 @@ oatpp::async::Action Connection::suggestOutputStreamAction(data::v_io_size ioRes
oatpp::async::Action Connection::suggestInputStreamAction(data::v_io_size ioResult) {
if(ioResult > 0) {
return oatpp::async::Action::createIORepeatAction(m_handle);
return oatpp::async::Action::createIORepeatAction(m_handle, oatpp::async::Action::IO_EVENT_READ);
}
switch (ioResult) {
case oatpp::data::IOError::WAIT_RETRY:
return oatpp::async::Action::createIOWaitAction(m_handle);
return oatpp::async::Action::createIOWaitAction(m_handle, oatpp::async::Action::IO_EVENT_READ);
case oatpp::data::IOError::RETRY:
return oatpp::async::Action::createIORepeatAction(m_handle);
return oatpp::async::Action::createIORepeatAction(m_handle, oatpp::async::Action::IO_EVENT_READ);
}
throw std::runtime_error("[oatpp::network::virtual_::Pipe::Reader::suggestInputStreamAction()]: Error. Unable to suggest async action for I/O result.");

View File

@ -197,9 +197,9 @@ oatpp::async::CoroutineStarterForResult<const std::shared_ptr<oatpp::data::strea
return _return(oatpp::network::Connection::createShared(m_clientHandle));
}
if(errno == EALREADY || errno == EINPROGRESS) {
return ioWait(m_clientHandle);
return ioWait(m_clientHandle, oatpp::async::Action::IO_EVENT_WRITE);
} else if(errno == EINTR) {
return ioRepeat(m_clientHandle);
return ioRepeat(m_clientHandle, oatpp::async::Action::IO_EVENT_WRITE);
}
::close(m_clientHandle);