mirror of
https://github.com/oatpp/oatpp.git
synced 2025-01-06 16:24:27 +08:00
New Async Event-Based IO using kqueue/epoll. + Use CoroutineWaitList for virtual_::* I/O
This commit is contained in:
parent
3836421898
commit
03388a295c
@ -39,14 +39,14 @@ Action Action::createActionByType(v_int32 type) {
|
||||
return Action(type);
|
||||
}
|
||||
|
||||
Action Action::createIOWaitAction(data::v_io_handle ioHandle, v_int32 ioEventType) {
|
||||
Action Action::createIOWaitAction(data::v_io_handle ioHandle, Action::IOEventType ioEventType) {
|
||||
Action result(TYPE_IO_WAIT);
|
||||
result.m_data.ioData.ioHandle = ioHandle;
|
||||
result.m_data.ioData.ioEventType = ioEventType;
|
||||
return result;
|
||||
}
|
||||
|
||||
Action Action::createIORepeatAction(data::v_io_handle ioHandle, v_int32 ioEventType) {
|
||||
Action Action::createIORepeatAction(data::v_io_handle ioHandle, Action::IOEventType ioEventType) {
|
||||
Action result(TYPE_IO_REPEAT);
|
||||
result.m_data.ioData.ioHandle = ioHandle;
|
||||
result.m_data.ioData.ioEventType = ioEventType;
|
||||
@ -86,11 +86,10 @@ Action::Action(Action&& other)
|
||||
, m_data(other.m_data)
|
||||
{
|
||||
other.m_type = TYPE_NONE;
|
||||
//OATPP_LOGD("aaa", "moving");
|
||||
}
|
||||
|
||||
Action::~Action() {
|
||||
if(m_type == TYPE_COROUTINE && m_data.coroutine != nullptr) {
|
||||
if(m_type == TYPE_COROUTINE) {
|
||||
delete m_data.coroutine;
|
||||
}
|
||||
}
|
||||
@ -110,11 +109,15 @@ v_int32 Action::getType() const {
|
||||
return m_type;
|
||||
}
|
||||
|
||||
v_int64 Action::getTimePointMicroseconds() const {
|
||||
return m_data.timePointMicroseconds;
|
||||
}
|
||||
|
||||
oatpp::data::v_io_handle Action::getIOHandle() const {
|
||||
return m_data.ioData.ioHandle;
|
||||
}
|
||||
|
||||
v_int32 Action::getIOEventType() const {
|
||||
Action::IOEventType Action::getIOEventType() const {
|
||||
return m_data.ioData.ioEventType;
|
||||
}
|
||||
|
||||
|
@ -113,14 +113,24 @@ public:
|
||||
|
||||
public:
|
||||
|
||||
static constexpr const v_int32 IO_EVENT_READ = 0;
|
||||
static constexpr const v_int32 IO_EVENT_WRITE = 1;
|
||||
enum IOEventType : v_int32 {
|
||||
/**
|
||||
* IO event type READ.
|
||||
*/
|
||||
IO_EVENT_READ = 0,
|
||||
|
||||
/**
|
||||
* IO event type WRITE.
|
||||
*/
|
||||
IO_EVENT_WRITE = 1
|
||||
|
||||
};
|
||||
|
||||
private:
|
||||
|
||||
struct IOData {
|
||||
oatpp::data::v_io_handle ioHandle;
|
||||
v_int32 ioEventType;
|
||||
IOEventType ioEventType;
|
||||
};
|
||||
|
||||
private:
|
||||
@ -142,9 +152,13 @@ protected:
|
||||
Action(v_int32 type);
|
||||
public:
|
||||
|
||||
/**
|
||||
* Clone action.
|
||||
* @param action - action to clone.
|
||||
* @return - cloned action.
|
||||
*/
|
||||
static Action clone(const Action& action);
|
||||
|
||||
|
||||
/**
|
||||
* Create action of specific type
|
||||
* @param type
|
||||
@ -157,14 +171,14 @@ public:
|
||||
* @param ioHandle - &id:oatpp::data::v_io_handle;.
|
||||
* @return - Action.
|
||||
*/
|
||||
static Action createIOWaitAction(data::v_io_handle ioHandle, v_int32 ioEventType);
|
||||
static Action createIOWaitAction(data::v_io_handle ioHandle, IOEventType ioEventType);
|
||||
|
||||
/**
|
||||
* Create TYPE_IO_REPEAT Action
|
||||
* @param ioHandle - &id:oatpp::data::v_io_handle;.
|
||||
* @return - Action.
|
||||
*/
|
||||
static Action createIORepeatAction(data::v_io_handle ioHandle, v_int32 ioEventType);
|
||||
static Action createIORepeatAction(data::v_io_handle ioHandle, IOEventType ioEventType);
|
||||
|
||||
/**
|
||||
* Create TYPE_WAIT_REPEAT Action.
|
||||
@ -230,8 +244,25 @@ public:
|
||||
*/
|
||||
v_int32 getType() const;
|
||||
|
||||
/**
|
||||
* Get microseconds tick when timer should call coroutine again.
|
||||
* This method returns meaningful value only if Action is TYPE_WAIT_REPEAT.
|
||||
* @return - microseconds tick.
|
||||
*/
|
||||
v_int64 getTimePointMicroseconds() const;
|
||||
|
||||
/**
|
||||
* Get I/O handle which is passed with this action to I/O worker.
|
||||
* This method returns meaningful value only if Action is TYPE_IO_WAIT or TYPE_IO_REPEAT.
|
||||
* @return - &id:oatpp::data::v_io_handle;.
|
||||
*/
|
||||
oatpp::data::v_io_handle getIOHandle() const;
|
||||
v_int32 getIOEventType() const;
|
||||
|
||||
/**
|
||||
* This method returns meaningful value only if Action is TYPE_IO_WAIT or TYPE_IO_REPEAT.
|
||||
* @return - should return one of
|
||||
*/
|
||||
IOEventType getIOEventType() const;
|
||||
|
||||
|
||||
};
|
||||
@ -512,7 +543,7 @@ public:
|
||||
* Convenience method to generate Action of `type == Action::TYPE_IO_WAIT`.
|
||||
* @return - TYPE_WAIT_FOR_IO Action.
|
||||
*/
|
||||
Action ioWait(data::v_io_handle ioHandle, v_int32 ioEventType) const {
|
||||
Action ioWait(data::v_io_handle ioHandle, Action::IOEventType ioEventType) const {
|
||||
return Action::createIOWaitAction(ioHandle, ioEventType);
|
||||
}
|
||||
|
||||
@ -520,7 +551,7 @@ public:
|
||||
* Convenience method to generate Action of `type == Action::TYPE_IO_WAIT`.
|
||||
* @return - TYPE_IO_REPEAT Action.
|
||||
*/
|
||||
Action ioRepeat(data::v_io_handle ioHandle, v_int32 ioEventType) const {
|
||||
Action ioRepeat(data::v_io_handle ioHandle, Action::IOEventType ioEventType) const {
|
||||
return Action::createIORepeatAction(ioHandle, ioEventType);
|
||||
}
|
||||
|
||||
@ -701,7 +732,7 @@ public:
|
||||
* Convenience method to generate Action of `type == Action::TYPE_IO_WAIT`.
|
||||
* @return - TYPE_WAIT_FOR_IO Action.
|
||||
*/
|
||||
Action ioWait(data::v_io_handle ioHandle, v_int32 ioEventType) const {
|
||||
Action ioWait(data::v_io_handle ioHandle, Action::IOEventType ioEventType) const {
|
||||
return Action::createIOWaitAction(ioHandle, ioEventType);
|
||||
}
|
||||
|
||||
@ -709,7 +740,7 @@ public:
|
||||
* Convenience method to generate Action of `type == Action::TYPE_IO_WAIT`.
|
||||
* @return - TYPE_IO_REPEAT Action.
|
||||
*/
|
||||
Action ioRepeat(data::v_io_handle ioHandle, v_int32 ioEventType) const {
|
||||
Action ioRepeat(data::v_io_handle ioHandle, Action::IOEventType ioEventType) const {
|
||||
return Action::createIORepeatAction(ioHandle, ioEventType);
|
||||
}
|
||||
|
||||
|
@ -56,8 +56,9 @@ void CoroutineWaitList::notifyAllAndClear() {
|
||||
std::lock_guard<oatpp::concurrency::SpinLock> lock(m_lock);
|
||||
auto curr = m_list.first;
|
||||
while(curr != nullptr) {
|
||||
auto next = curr->_ref;
|
||||
curr->_PP->pushOneTask(curr);
|
||||
curr = curr->_ref;
|
||||
curr = next;
|
||||
}
|
||||
std::memset(&m_list, 0, sizeof(m_list));
|
||||
}
|
||||
|
@ -39,9 +39,20 @@ namespace oatpp { namespace async {
|
||||
class CoroutineWaitList {
|
||||
friend Processor;
|
||||
public:
|
||||
/**
|
||||
* Listener for new items in the wait-list.
|
||||
*/
|
||||
class Listener {
|
||||
public:
|
||||
/**
|
||||
* Default virtual destructor.
|
||||
*/
|
||||
virtual ~Listener() = default;
|
||||
|
||||
/**
|
||||
* Called when new item is pushed to the list.
|
||||
* @param list - list where new item was pushed to.
|
||||
*/
|
||||
virtual void onNewItem(CoroutineWaitList& list) = 0;
|
||||
};
|
||||
private:
|
||||
|
@ -59,24 +59,30 @@ Executor::Executor(v_int32 processorThreads, v_int32 ioThreads, v_int32 timerThr
|
||||
: m_processorThreads(processorThreads)
|
||||
, m_ioThreads(ioThreads)
|
||||
, m_timerThreads(timerThreads)
|
||||
, m_threads(new std::thread[m_processorThreads])
|
||||
, m_threadsCount(m_processorThreads + ioThreads + timerThreads)
|
||||
, m_threads(new std::thread[m_threadsCount])
|
||||
, m_processors(new SubmissionProcessor[m_processorThreads])
|
||||
{
|
||||
|
||||
v_int32 threadCnt = 0;
|
||||
for(v_int32 i = 0; i < m_processorThreads; i ++) {
|
||||
m_threads[i] = std::thread(&SubmissionProcessor::run, &m_processors[i]);
|
||||
m_threads[threadCnt ++] = std::thread(&SubmissionProcessor::run, &m_processors[i]);
|
||||
}
|
||||
|
||||
std::vector<std::shared_ptr<worker::Worker>> ioWorkers;
|
||||
for(v_int32 i = 0; i < m_ioThreads; i++) {
|
||||
ioWorkers.push_back(std::make_shared<worker::IOEventWorker>());
|
||||
std::shared_ptr<worker::Worker> worker = std::make_shared<worker::IOEventWorker>();
|
||||
ioWorkers.push_back(worker);
|
||||
m_threads[threadCnt ++] = std::thread(&worker::Worker::run, worker.get());
|
||||
}
|
||||
|
||||
linkWorkers(ioWorkers);
|
||||
|
||||
std::vector<std::shared_ptr<worker::Worker>> timerWorkers;
|
||||
for(v_int32 i = 0; i < m_timerThreads; i++) {
|
||||
timerWorkers.push_back(std::make_shared<worker::TimerWorker>());
|
||||
std::shared_ptr<worker::Worker> worker = std::make_shared<worker::TimerWorker>();
|
||||
timerWorkers.push_back(worker);
|
||||
m_threads[threadCnt ++] = std::thread(&worker::Worker::run, worker.get());
|
||||
}
|
||||
|
||||
linkWorkers(timerWorkers);
|
||||
@ -130,13 +136,13 @@ void Executor::linkWorkers(const std::vector<std::shared_ptr<worker::Worker>>& w
|
||||
}
|
||||
|
||||
void Executor::join() {
|
||||
for(v_int32 i = 0; i < m_processorThreads; i ++) {
|
||||
for(v_int32 i = 0; i < m_threadsCount; i ++) {
|
||||
m_threads[i].join();
|
||||
}
|
||||
}
|
||||
|
||||
void Executor::detach() {
|
||||
for(v_int32 i = 0; i < m_processorThreads; i ++) {
|
||||
for(v_int32 i = 0; i < m_threadsCount; i ++) {
|
||||
m_threads[i].detach();
|
||||
}
|
||||
}
|
||||
|
@ -79,6 +79,7 @@ private:
|
||||
v_int32 m_processorThreads;
|
||||
v_int32 m_ioThreads;
|
||||
v_int32 m_timerThreads;
|
||||
v_int32 m_threadsCount;
|
||||
std::thread* m_threads;
|
||||
SubmissionProcessor* m_processors;
|
||||
std::atomic<v_word32> m_balancer;
|
||||
|
@ -92,11 +92,10 @@ void Processor::addCoroutine(AbstractCoroutine* coroutine) {
|
||||
popTimerTask(coroutine);
|
||||
break;
|
||||
|
||||
// case Action::TYPE_WAIT_LIST:
|
||||
// coroutine->_SCH_A = Action::createActionByType(Action::TYPE_NONE);
|
||||
// m_queue.popFront();
|
||||
// action.m_data.waitList->put(coroutine);
|
||||
// break;
|
||||
case Action::TYPE_WAIT_LIST:
|
||||
coroutine->_SCH_A = Action::createActionByType(Action::TYPE_NONE);
|
||||
action.m_data.waitList->put(coroutine);
|
||||
break;
|
||||
|
||||
default:
|
||||
m_queue.pushBack(coroutine);
|
||||
|
@ -38,6 +38,7 @@ namespace oatpp { namespace async {
|
||||
/**
|
||||
* Asynchronous Processor.<br>
|
||||
* Responsible for processing and managing multiple Coroutines.
|
||||
* Do not use bare processor to run coroutines. Use &id:oatpp::async::Executor; instead;.
|
||||
*/
|
||||
class Processor {
|
||||
private:
|
||||
@ -121,9 +122,22 @@ private:
|
||||
|
||||
public:
|
||||
|
||||
/**
|
||||
* Add dedicated co-worker to processor.
|
||||
* @param worker - &id:oatpp::async::worker::Worker;.
|
||||
*/
|
||||
void addWorker(const std::shared_ptr<worker::Worker>& worker);
|
||||
|
||||
/**
|
||||
* Push one Coroutine back to processor.
|
||||
* @param coroutine - &id:oatpp::async::AbstractCoroutine; previously popped-out(rescheduled to coworker) from this processor.
|
||||
*/
|
||||
void pushOneTask(AbstractCoroutine* coroutine);
|
||||
|
||||
/**
|
||||
* Push list of Coroutines back to processor.
|
||||
* @param tasks - &id:oatpp::collection::FastQueue; of &id:oatpp::async::AbstractCoroutine; previously popped-out(rescheduled to coworker) from this processor.
|
||||
*/
|
||||
void pushTasks(oatpp::collection::FastQueue<AbstractCoroutine>& tasks);
|
||||
|
||||
/**
|
||||
|
@ -54,6 +54,13 @@
|
||||
|
||||
namespace oatpp { namespace async { namespace worker {
|
||||
|
||||
/**
|
||||
* Event-based implementation of I/O worker.
|
||||
* <ul>
|
||||
* <li>`kqueue` based implementation - for Mac/BSD systems</li>
|
||||
* <li>`epoll` based implementation - for Linux systems</li>
|
||||
* </ul>
|
||||
*/
|
||||
class IOEventWorker : public Worker {
|
||||
private:
|
||||
static constexpr const v_int32 MAX_EVENTS = 10000;
|
||||
@ -77,16 +84,36 @@ private:
|
||||
void setCoroutineEvent(AbstractCoroutine* coroutine, int operation, p_char8 eventPtr);
|
||||
public:
|
||||
|
||||
/**
|
||||
* Constructor.
|
||||
*/
|
||||
IOEventWorker();
|
||||
|
||||
/**
|
||||
* Virtual destructor.
|
||||
*/
|
||||
~IOEventWorker();
|
||||
|
||||
/**
|
||||
* Push list of tasks to worker.
|
||||
* @param tasks - &id:oatpp::collection::FastQueue; of &id:oatpp::async::AbstractCoroutine;.
|
||||
*/
|
||||
void pushTasks(oatpp::collection::FastQueue<AbstractCoroutine>& tasks) override;
|
||||
|
||||
/**
|
||||
* Push one task to worker.
|
||||
* @param task - &id:AbstractCoroutine;.
|
||||
*/
|
||||
void pushOneTask(AbstractCoroutine* task) override;
|
||||
|
||||
void work();
|
||||
/**
|
||||
* Run worker.
|
||||
*/
|
||||
void run() override;
|
||||
|
||||
/**
|
||||
* Break run loop.
|
||||
*/
|
||||
void stop() override;
|
||||
|
||||
};
|
||||
|
@ -36,12 +36,7 @@ IOEventWorker::IOEventWorker()
|
||||
, m_inEvents(nullptr)
|
||||
, m_inEventsCount(0)
|
||||
, m_outEvents(nullptr)
|
||||
{
|
||||
|
||||
std::thread thread(&IOEventWorker::work, this);
|
||||
thread.detach();
|
||||
|
||||
}
|
||||
{}
|
||||
|
||||
IOEventWorker::~IOEventWorker() {
|
||||
|
||||
@ -81,7 +76,7 @@ void IOEventWorker::pushOneTask(AbstractCoroutine *task) {
|
||||
triggerWakeup();
|
||||
}
|
||||
|
||||
void IOEventWorker::work() {
|
||||
void IOEventWorker::run() {
|
||||
|
||||
initEventQueue();
|
||||
|
||||
|
@ -99,11 +99,11 @@ void IOEventWorker::setCoroutineEvent(AbstractCoroutine* coroutine, int operatio
|
||||
|
||||
switch(action.getIOEventType()) {
|
||||
|
||||
case Action::IO_EVENT_READ:
|
||||
case Action::IOEventType::IO_EVENT_READ:
|
||||
event.events = EPOLLIN | EPOLLET | EPOLLONESHOT;
|
||||
break;
|
||||
|
||||
case Action::IO_EVENT_WRITE:
|
||||
case Action::IOEventType::IO_EVENT_WRITE:
|
||||
event.events = EPOLLOUT | EPOLLET | EPOLLONESHOT;
|
||||
break;
|
||||
|
||||
|
@ -98,11 +98,11 @@ void IOEventWorker::setCoroutineEvent(AbstractCoroutine* coroutine, int operatio
|
||||
|
||||
switch(action.getIOEventType()) {
|
||||
|
||||
case Action::IO_EVENT_READ:
|
||||
case Action::IOEventType::IO_EVENT_READ:
|
||||
event->filter = EVFILT_READ;
|
||||
break;
|
||||
|
||||
case Action::IO_EVENT_WRITE:
|
||||
case Action::IOEventType::IO_EVENT_WRITE:
|
||||
event->filter = EVFILT_WRITE;
|
||||
break;
|
||||
|
||||
|
@ -33,10 +33,7 @@ namespace oatpp { namespace async { namespace worker {
|
||||
IOWorker::IOWorker()
|
||||
: Worker(Type::IO)
|
||||
, m_running(true)
|
||||
{
|
||||
std::thread thread(&IOWorker::work, this);
|
||||
thread.detach();
|
||||
}
|
||||
{}
|
||||
|
||||
void IOWorker::pushTasks(oatpp::collection::FastQueue<AbstractCoroutine>& tasks) {
|
||||
{
|
||||
@ -74,7 +71,7 @@ void IOWorker::consumeBacklog(bool blockToConsume) {
|
||||
|
||||
}
|
||||
|
||||
void IOWorker::work() {
|
||||
void IOWorker::run() {
|
||||
|
||||
v_int32 consumeIteration = 0;
|
||||
v_int32 roundIteration = 0;
|
||||
@ -118,7 +115,7 @@ void IOWorker::work() {
|
||||
case Action::TYPE_IO_WAIT:
|
||||
roundIteration = 0;
|
||||
if(schA.getType() == Action::TYPE_WAIT_REPEAT) {
|
||||
if(getCoroutineTimePoint(CP) < tick) {
|
||||
if(schA.getTimePointMicroseconds() < tick) {
|
||||
m_queue.popFront();
|
||||
setCoroutineScheduledAction(CP, oatpp::async::Action::createWaitRepeatAction(0));
|
||||
getCoroutineProcessor(CP)->pushOneTask(CP);
|
||||
|
@ -35,6 +35,10 @@
|
||||
|
||||
namespace oatpp { namespace async { namespace worker {
|
||||
|
||||
/**
|
||||
* Naive implementation of IOWorker.
|
||||
* Polls all I/O handles in a loop. Reschedules long-waiting handles to Timer.
|
||||
*/
|
||||
class IOWorker : public Worker {
|
||||
private:
|
||||
bool m_running;
|
||||
@ -46,14 +50,31 @@ private:
|
||||
void consumeBacklog(bool blockToConsume);
|
||||
public:
|
||||
|
||||
/**
|
||||
* Constructor.
|
||||
*/
|
||||
IOWorker();
|
||||
|
||||
/**
|
||||
* Push list of tasks to worker.
|
||||
* @param tasks - &id:oatpp::collection::FastQueue; of &id:oatpp::async::AbstractCoroutine;.
|
||||
*/
|
||||
void pushTasks(oatpp::collection::FastQueue<AbstractCoroutine>& tasks) override;
|
||||
|
||||
/**
|
||||
* Push one task to worker.
|
||||
* @param task - &id:AbstractCoroutine;.
|
||||
*/
|
||||
void pushOneTask(AbstractCoroutine* task) override;
|
||||
|
||||
void work();
|
||||
/**
|
||||
* Run worker.
|
||||
*/
|
||||
void run() override;
|
||||
|
||||
/**
|
||||
* Break run loop.
|
||||
*/
|
||||
void stop() override;
|
||||
|
||||
};
|
||||
|
@ -34,10 +34,7 @@ TimerWorker::TimerWorker(const std::chrono::duration<v_int64, std::micro>& granu
|
||||
: Worker(Type::TIMER)
|
||||
, m_running(true)
|
||||
, m_granularity(granularity)
|
||||
{
|
||||
std::thread thread(&TimerWorker::work, this);
|
||||
thread.detach();
|
||||
}
|
||||
{}
|
||||
|
||||
void TimerWorker::pushTasks(oatpp::collection::FastQueue<AbstractCoroutine>& tasks) {
|
||||
{
|
||||
@ -65,7 +62,7 @@ void TimerWorker::pushOneTask(AbstractCoroutine* task) {
|
||||
m_backlogCondition.notify_one();
|
||||
}
|
||||
|
||||
void TimerWorker::work() {
|
||||
void TimerWorker::run() {
|
||||
|
||||
while(m_running) {
|
||||
|
||||
@ -81,7 +78,9 @@ void TimerWorker::work() {
|
||||
|
||||
auto next = nextCoroutine(curr);
|
||||
|
||||
if(getCoroutineTimePoint(curr) < tick) {
|
||||
const Action& schA = getCoroutineScheduledAction(curr);
|
||||
|
||||
if(schA.getTimePointMicroseconds() < tick) {
|
||||
|
||||
Action action = curr->iterate();
|
||||
|
||||
|
@ -35,6 +35,10 @@
|
||||
|
||||
namespace oatpp { namespace async { namespace worker {
|
||||
|
||||
/**
|
||||
* Timer worker.
|
||||
* Used to wait for timer-scheduled coroutines.
|
||||
*/
|
||||
class TimerWorker : public Worker {
|
||||
private:
|
||||
bool m_running;
|
||||
@ -48,14 +52,32 @@ private:
|
||||
void consumeBacklog();
|
||||
public:
|
||||
|
||||
/**
|
||||
* Constructor.
|
||||
* @param granularity - minimum possible time to wait.
|
||||
*/
|
||||
TimerWorker(const std::chrono::duration<v_int64, std::micro>& granularity = std::chrono::milliseconds(100));
|
||||
|
||||
/**
|
||||
* Push list of tasks to worker.
|
||||
* @param tasks - &id:oatpp::collection::FastQueue; of &id:oatpp::async::AbstractCoroutine;.
|
||||
*/
|
||||
void pushTasks(oatpp::collection::FastQueue<AbstractCoroutine>& tasks) override;
|
||||
|
||||
/**
|
||||
* Push one task to worker.
|
||||
* @param task - &id:AbstractCoroutine;.
|
||||
*/
|
||||
void pushOneTask(AbstractCoroutine* task) override;
|
||||
|
||||
void work();
|
||||
/**
|
||||
* Run worker.
|
||||
*/
|
||||
void run() override;
|
||||
|
||||
/**
|
||||
* Break run loop.
|
||||
*/
|
||||
void stop() override;
|
||||
|
||||
};
|
||||
|
@ -42,10 +42,6 @@ Processor* Worker::getCoroutineProcessor(AbstractCoroutine* CP) {
|
||||
return CP->_PP;
|
||||
}
|
||||
|
||||
v_int64 Worker::getCoroutineTimePoint(AbstractCoroutine* CP) {
|
||||
return CP->_SCH_A.m_data.timePointMicroseconds;
|
||||
}
|
||||
|
||||
void Worker::dismissAction(Action& action) {
|
||||
action.m_type = Action::TYPE_NONE;
|
||||
}
|
||||
|
@ -26,12 +26,20 @@
|
||||
#define oatpp_async_worker_Worker_hpp
|
||||
|
||||
#include "oatpp/core/async/Coroutine.hpp"
|
||||
#include <thread>
|
||||
|
||||
namespace oatpp { namespace async { namespace worker {
|
||||
|
||||
/**
|
||||
* Worker base class.
|
||||
* Workers are used by &id:oatpp::async::Executor; to reschedule worker-specific tasks from &id:oatpp::async::Processor;.
|
||||
*/
|
||||
class Worker {
|
||||
public:
|
||||
|
||||
/**
|
||||
* Worker type
|
||||
*/
|
||||
enum Type : v_int32 {
|
||||
|
||||
/**
|
||||
@ -62,20 +70,47 @@ protected:
|
||||
static void setCoroutineScheduledAction(AbstractCoroutine* CP, Action&& action);
|
||||
static Action& getCoroutineScheduledAction(AbstractCoroutine* CP);
|
||||
static Processor* getCoroutineProcessor(AbstractCoroutine* CP);
|
||||
static v_int64 getCoroutineTimePoint(AbstractCoroutine* CP);
|
||||
static void dismissAction(Action& action);
|
||||
static AbstractCoroutine* nextCoroutine(AbstractCoroutine* CP);
|
||||
public:
|
||||
|
||||
/**
|
||||
* Constructor.
|
||||
* @param type - worker type - one of &l:Worker::Type; values.
|
||||
*/
|
||||
Worker(Type type);
|
||||
|
||||
/**
|
||||
* Default virtual destructor.
|
||||
*/
|
||||
virtual ~Worker() = default;
|
||||
|
||||
/**
|
||||
* Push list of tasks to worker.
|
||||
* @param tasks - &id:oatpp::collection::FastQueue; of &id:oatpp::async::AbstractCoroutine;.
|
||||
*/
|
||||
virtual void pushTasks(oatpp::collection::FastQueue<AbstractCoroutine>& tasks) = 0;
|
||||
|
||||
/**
|
||||
* Push one task to worker.
|
||||
* @param task - &id:AbstractCoroutine;.
|
||||
*/
|
||||
virtual void pushOneTask(AbstractCoroutine* task) = 0;
|
||||
|
||||
/**
|
||||
* Run worker.
|
||||
*/
|
||||
virtual void run() = 0;
|
||||
|
||||
/**
|
||||
* Break run loop.
|
||||
*/
|
||||
virtual void stop() = 0;
|
||||
|
||||
/**
|
||||
* Get worker type.
|
||||
* @return - one of &l:Worker::Type; values.
|
||||
*/
|
||||
Type getType();
|
||||
|
||||
};
|
||||
|
@ -128,14 +128,14 @@ oatpp::data::stream::IOMode Connection::getStreamIOMode() {
|
||||
oatpp::async::Action Connection::suggestOutputStreamAction(data::v_io_size ioResult) {
|
||||
|
||||
if(ioResult > 0) {
|
||||
return oatpp::async::Action::createIORepeatAction(m_handle, oatpp::async::Action::IO_EVENT_WRITE);
|
||||
return oatpp::async::Action::createIORepeatAction(m_handle, oatpp::async::Action::IOEventType::IO_EVENT_WRITE);
|
||||
}
|
||||
|
||||
switch (ioResult) {
|
||||
case oatpp::data::IOError::WAIT_RETRY:
|
||||
return oatpp::async::Action::createIOWaitAction(m_handle, oatpp::async::Action::IO_EVENT_WRITE);
|
||||
return oatpp::async::Action::createIOWaitAction(m_handle, oatpp::async::Action::IOEventType::IO_EVENT_WRITE);
|
||||
case oatpp::data::IOError::RETRY:
|
||||
return oatpp::async::Action::createIORepeatAction(m_handle, oatpp::async::Action::IO_EVENT_WRITE);
|
||||
return oatpp::async::Action::createIORepeatAction(m_handle, oatpp::async::Action::IOEventType::IO_EVENT_WRITE);
|
||||
}
|
||||
|
||||
throw std::runtime_error("[oatpp::network::virtual_::Pipe::Reader::suggestInputStreamAction()]: Error. Unable to suggest async action for I/O result.");
|
||||
@ -145,14 +145,14 @@ oatpp::async::Action Connection::suggestOutputStreamAction(data::v_io_size ioRes
|
||||
oatpp::async::Action Connection::suggestInputStreamAction(data::v_io_size ioResult) {
|
||||
|
||||
if(ioResult > 0) {
|
||||
return oatpp::async::Action::createIORepeatAction(m_handle, oatpp::async::Action::IO_EVENT_READ);
|
||||
return oatpp::async::Action::createIORepeatAction(m_handle, oatpp::async::Action::IOEventType::IO_EVENT_READ);
|
||||
}
|
||||
|
||||
switch (ioResult) {
|
||||
case oatpp::data::IOError::WAIT_RETRY:
|
||||
return oatpp::async::Action::createIOWaitAction(m_handle, oatpp::async::Action::IO_EVENT_READ);
|
||||
return oatpp::async::Action::createIOWaitAction(m_handle, oatpp::async::Action::IOEventType::IO_EVENT_READ);
|
||||
case oatpp::data::IOError::RETRY:
|
||||
return oatpp::async::Action::createIORepeatAction(m_handle, oatpp::async::Action::IO_EVENT_READ);
|
||||
return oatpp::async::Action::createIORepeatAction(m_handle, oatpp::async::Action::IOEventType::IO_EVENT_READ);
|
||||
}
|
||||
|
||||
throw std::runtime_error("[oatpp::network::virtual_::Pipe::Reader::suggestInputStreamAction()]: Error. Unable to suggest async action for I/O result.");
|
||||
|
@ -197,9 +197,9 @@ oatpp::async::CoroutineStarterForResult<const std::shared_ptr<oatpp::data::strea
|
||||
return _return(oatpp::network::Connection::createShared(m_clientHandle));
|
||||
}
|
||||
if(errno == EALREADY || errno == EINPROGRESS) {
|
||||
return ioWait(m_clientHandle, oatpp::async::Action::IO_EVENT_WRITE);
|
||||
return ioWait(m_clientHandle, oatpp::async::Action::IOEventType::IO_EVENT_WRITE);
|
||||
} else if(errno == EINTR) {
|
||||
return ioRepeat(m_clientHandle, oatpp::async::Action::IO_EVENT_WRITE);
|
||||
return ioRepeat(m_clientHandle, oatpp::async::Action::IOEventType::IO_EVENT_WRITE);
|
||||
}
|
||||
|
||||
::close(m_clientHandle);
|
||||
|
@ -63,7 +63,7 @@ public:
|
||||
{}
|
||||
|
||||
void onNewItem(oatpp::async::CoroutineWaitList& list) override {
|
||||
std::unique_lock<std::mutex> lock(m_pipe->m_mutex);
|
||||
std::lock_guard<std::mutex> lock(m_pipe->m_mutex);
|
||||
if (m_pipe->m_fifo.availableToRead() > 0) {
|
||||
list.notifyAllAndClear();
|
||||
}
|
||||
@ -155,7 +155,7 @@ public:
|
||||
{}
|
||||
|
||||
void onNewItem(oatpp::async::CoroutineWaitList& list) override {
|
||||
std::unique_lock<std::mutex> lock(m_pipe->m_mutex);
|
||||
std::lock_guard<std::mutex> lock(m_pipe->m_mutex);
|
||||
if (m_pipe->m_fifo.availableToWrite() > 0) {
|
||||
list.notifyAllAndClear();
|
||||
}
|
||||
|
@ -27,18 +27,15 @@
|
||||
#include "oatpp/core/base/CommandLineArgumentsTest.hpp"
|
||||
#include "oatpp/core/base/RegRuleTest.hpp"
|
||||
|
||||
#include "oatpp/core/concurrency/SpinLock.hpp"
|
||||
#include "oatpp/core/base/Environment.hpp"
|
||||
|
||||
|
||||
#include "oatpp/core/async/Coroutine.hpp"
|
||||
#include "oatpp/core/Types.hpp"
|
||||
|
||||
#include "oatpp/core/concurrency/SpinLock.hpp"
|
||||
#include "oatpp/core/base/Environment.hpp"
|
||||
|
||||
#include <iostream>
|
||||
#include <mutex>
|
||||
|
||||
#include "oatpp/core/data/stream/Stream.hpp"
|
||||
|
||||
#ifdef OATPP_ENABLE_ALL_TESTS_MAIN
|
||||
namespace {
|
||||
|
||||
@ -60,10 +57,6 @@ void runTests() {
|
||||
|
||||
oatpp::base::Environment::printCompilationConfig();
|
||||
|
||||
OATPP_LOGD("test", "osSize=%d", sizeof(oatpp::data::stream::OutputStream));
|
||||
OATPP_LOGD("test", "isSize=%d", sizeof(oatpp::data::stream::InputStream));
|
||||
|
||||
/*
|
||||
OATPP_RUN_TEST(oatpp::test::base::RegRuleTest);
|
||||
OATPP_RUN_TEST(oatpp::test::base::CommandLineArgumentsTest);
|
||||
|
||||
@ -89,7 +82,7 @@ void runTests() {
|
||||
OATPP_RUN_TEST(oatpp::test::network::virtual_::InterfaceTest);
|
||||
|
||||
OATPP_RUN_TEST(oatpp::test::web::server::api::ApiControllerTest);
|
||||
*/
|
||||
|
||||
OATPP_RUN_TEST(oatpp::test::web::FullTest);
|
||||
|
||||
OATPP_RUN_TEST(oatpp::test::web::FullAsyncTest);
|
||||
|
@ -58,7 +58,7 @@ class TestComponent {
|
||||
public:
|
||||
|
||||
OATPP_CREATE_COMPONENT(std::shared_ptr<oatpp::async::Executor>, executor)([] {
|
||||
return std::make_shared<oatpp::async::Executor>(8, 2, 1);
|
||||
return std::make_shared<oatpp::async::Executor>(4, 1, 1);
|
||||
}());
|
||||
|
||||
OATPP_CREATE_COMPONENT(std::shared_ptr<oatpp::network::virtual_::Interface>, virtualInterface)([] {
|
||||
@ -224,6 +224,12 @@ void FullAsyncClientTest::onRun() {
|
||||
ClientCoroutine_getRootAsync::SUCCESS_COUNTER != -1 ||
|
||||
ClientCoroutine_echoBodyAsync::SUCCESS_COUNTER != -1
|
||||
) {
|
||||
|
||||
OATPP_LOGD("Client", "Root=%d, Body=%d",
|
||||
ClientCoroutine_getRootAsync::SUCCESS_COUNTER.load(),
|
||||
ClientCoroutine_echoBodyAsync::SUCCESS_COUNTER.load()
|
||||
);
|
||||
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(100));
|
||||
if(ClientCoroutine_getRootAsync::SUCCESS_COUNTER == iterations){
|
||||
ClientCoroutine_getRootAsync::SUCCESS_COUNTER = -1;
|
||||
|
@ -50,11 +50,15 @@ namespace oatpp { namespace test { namespace web {
|
||||
|
||||
namespace {
|
||||
|
||||
//#define OATPP_TEST_USE_PORT 8123
|
||||
//#define OATPP_TEST_USE_PORT 8000
|
||||
|
||||
class TestComponent {
|
||||
public:
|
||||
|
||||
OATPP_CREATE_COMPONENT(std::shared_ptr<oatpp::async::Executor>, executor)([] {
|
||||
return std::make_shared<oatpp::async::Executor>(1, 1, 1);
|
||||
}());
|
||||
|
||||
OATPP_CREATE_COMPONENT(std::shared_ptr<oatpp::network::virtual_::Interface>, virtualInterface)([] {
|
||||
return oatpp::network::virtual_::Interface::createShared("virtualhost");
|
||||
}());
|
||||
@ -74,7 +78,8 @@ public:
|
||||
|
||||
OATPP_CREATE_COMPONENT(std::shared_ptr<oatpp::network::server::ConnectionHandler>, serverConnectionHandler)([] {
|
||||
OATPP_COMPONENT(std::shared_ptr<oatpp::web::server::HttpRouter>, router);
|
||||
return oatpp::web::server::AsyncHttpConnectionHandler::createShared(router);
|
||||
OATPP_COMPONENT(std::shared_ptr<oatpp::async::Executor>, executor);
|
||||
return oatpp::web::server::AsyncHttpConnectionHandler::createShared(router, executor);
|
||||
}());
|
||||
|
||||
OATPP_CREATE_COMPONENT(std::shared_ptr<oatpp::data::mapping::ObjectMapper>, objectMapper)([] {
|
||||
@ -115,7 +120,9 @@ void FullAsyncTest::onRun() {
|
||||
v_int32 iterationsStep = 1000;
|
||||
|
||||
for(v_int32 i = 0; i < iterationsStep * 10; i ++) {
|
||||
|
||||
|
||||
//OATPP_LOGD("i", "%d", i);
|
||||
|
||||
{ // test simple GET
|
||||
auto response = client->getRoot(connection);
|
||||
OATPP_ASSERT(response->getStatusCode() == 200);
|
||||
@ -170,6 +177,9 @@ void FullAsyncTest::onRun() {
|
||||
|
||||
}, std::chrono::minutes(10));
|
||||
|
||||
OATPP_COMPONENT(std::shared_ptr<oatpp::async::Executor>, executor);
|
||||
executor->join();
|
||||
|
||||
}
|
||||
|
||||
}}}
|
||||
|
Loading…
Reference in New Issue
Block a user