async IOWorker draft

This commit is contained in:
lganzzzo 2019-04-07 02:46:54 +03:00
parent 08ff2a566c
commit a76b37778b
11 changed files with 460 additions and 20 deletions

View File

@ -17,10 +17,16 @@ add_library(oatpp
oatpp/core/async/Error.hpp
oatpp/core/async/Coroutine.cpp
oatpp/core/async/Coroutine.hpp
oatpp/core/async/Executor.cpp
oatpp/core/async/Executor.hpp
oatpp/core/async/Processor.cpp
oatpp/core/async/Processor.hpp
oatpp/core/async/Worker.cpp
oatpp/core/async/Worker.hpp
oatpp/core/async/IOWorker.cpp
oatpp/core/async/IOWorker.hpp
oatpp/core/async/Scheduler.cpp
oatpp/core/async/Scheduler.hpp
oatpp/core/async/Executor.cpp
oatpp/core/async/Executor.hpp
oatpp/core/base/CommandLineArguments.cpp
oatpp/core/base/CommandLineArguments.hpp
oatpp/core/base/Config.hpp

View File

@ -35,6 +35,12 @@ Action Action::clone(const Action& action) {
return result;
}
Action Action::createWaitIOAction(data::v_io_handle ioHandle) {
Action result(TYPE_WAIT_FOR_IO);
result.m_data.ioHandle = ioHandle;
return result;
}
Action::Action(AbstractCoroutine* coroutine)
: m_type(TYPE_COROUTINE)
{
@ -72,11 +78,11 @@ Action& Action::operator=(Action&& other) {
return *this;
}
bool Action::isError() {
bool Action::isError() const {
return m_type == TYPE_ERROR;
}
v_int32 Action::getType() {
v_int32 Action::getType() const {
return m_type;
}

View File

@ -26,6 +26,9 @@
#define oatpp_async_Coroutine_hpp
#include "./Error.hpp"
#include "oatpp/core/data/IODefinitions.hpp"
#include "oatpp/core/collection/FastQueue.hpp"
#include "oatpp/core/base/memory/MemoryPool.hpp"
#include "oatpp/core/base/Environment.hpp"
@ -63,30 +66,36 @@ public:
*/
static constexpr const v_int32 TYPE_YIELD_TO = 2;
/**
* Indicate that Action is to WAIT and then RETRY call to current method of Coroutine.
*/
static constexpr const v_int32 TYPE_WAIT_RETRY = 3;
/**
* Indicate that Action is to REPEAT call to current method of Coroutine.
*/
static constexpr const v_int32 TYPE_REPEAT = 4;
static constexpr const v_int32 TYPE_REPEAT = 3;
/**
* Indicate that Action is to WAIT and then RETRY call to current method of Coroutine.
*/
static constexpr const v_int32 TYPE_WAIT_RETRY = 4;
/**
* Indicate that Action is waiting for IO and should be assigned to corresponding worker.
*/
static constexpr const v_int32 TYPE_WAIT_FOR_IO = 5;
/**
* Indicate that Action is to FINISH current Coroutine and return control to a caller-Coroutine.
*/
static constexpr const v_int32 TYPE_FINISH = 5;
static constexpr const v_int32 TYPE_FINISH = 6;
/**
* Indicate that Error occurred.
*/
static constexpr const v_int32 TYPE_ERROR = 6;
static constexpr const v_int32 TYPE_ERROR = 7;
private:
union Data {
FunctionPtr fptr;
AbstractCoroutine* coroutine;
oatpp::data::v_io_handle ioHandle;
};
private:
mutable v_int32 m_type;
@ -95,6 +104,13 @@ public:
static Action clone(const Action& action);
/**
* Create WAIT_FOR_IO Action
* @param ioHandle - &id:oatpp::data::v_io_handle;.
* @return - Action.
*/
static Action createWaitIOAction(data::v_io_handle ioHandle);
/**
* Constructor. Create start-coroutine Action.
* @param coroutine - pointer to &l:AbstractCoroutine;.
@ -118,6 +134,10 @@ public:
*/
Action(const Action&) = delete;
/**
* Move-constructor.
* @param other
*/
Action(Action&& other);
/**
@ -139,13 +159,13 @@ public:
* Check if action is an error reporting action.
* @return `true` if action is an error reporting action.
*/
bool isError();
bool isError() const;
/**
* Get Action type.
* @return - action type.
*/
v_int32 getType();
v_int32 getType() const;
};
@ -394,6 +414,14 @@ public:
return Action(static_cast<FunctionPtr>(function));
}
/**
* Convenience method to generate Action of `type == Action::TYPE_REPEAT`.
* @return - repeat Action.
*/
Action repeat() const {
return Action::TYPE_REPEAT;
}
/**
* Convenience method to generate Action of `type == Action::TYPE_WAIT_RETRY`.
* @return - WAIT_RETRY Action.
@ -403,11 +431,11 @@ public:
}
/**
* Convenience method to generate Action of `type == Action::TYPE_REPEAT`.
* @return - repeat Action.
* Convenience method to generate Action of `type == Action::TYPE_WAIT_FOR_IO`.
* @return - TYPE_WAIT_FOR_IO Action.
*/
Action repeat() const {
return Action::TYPE_REPEAT;
Action waitForIO(data::v_io_handle ioHandle) const {
return Action(this, ioHandle);
}
/**

View File

@ -26,6 +26,7 @@
#define oatpp_async_Executor_hpp
#include "./Processor.hpp"
#include "./Worker.hpp"
#include "oatpp/core/concurrency/SpinLock.hpp"
#include "oatpp/core/concurrency/Thread.hpp"
@ -87,7 +88,7 @@ private:
};
class SubmissionProcessor {
class SubmissionProcessor/* : public Worker */{
private:
typedef oatpp::collection::LinkedList<std::shared_ptr<TaskSubmission>> Tasks;
private:
@ -105,7 +106,7 @@ private:
public:
void run();
void stop();
void stop() ;
void addTaskSubmission(const std::shared_ptr<TaskSubmission>& task);
};

View File

@ -0,0 +1,109 @@
/***************************************************************************
*
* Project _____ __ ____ _ _
* ( _ ) /__\ (_ _)_| |_ _| |_
* )(_)( /(__)\ )( (_ _)(_ _)
* (_____)(__)(__)(__) |_| |_|
*
*
* Copyright 2018-present, Leonid Stryzhevskyi <lganzzzo@gmail.com>
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
***************************************************************************/
#include "IOWorker.hpp"
#include <chrono>
namespace oatpp { namespace async {
void IOWorker::addTask(const Task& task) {
{
std::lock_guard<std::mutex> guard(m_backlogMutex);
m_backlog.pushBack(task);
}
m_backlogCondition.notify_one();
}
void IOWorker::queueBacklog() {
auto curr = m_backlog.getFirstNode();
while(curr != nullptr) {
m_queue.pushBack(new TaskSlot({curr->getData()}));
curr = curr->getNext();
}
m_backlog.clear();
}
void IOWorker::consumeBacklog(bool blockToConsume) {
if(blockToConsume) {
std::unique_lock<std::mutex> lock(m_backlogMutex);
while (m_backlog.count() == 0) {
m_backlogCondition.wait(lock);
}
} else {
std::unique_lock<std::mutex> lock(m_backlogMutex, std::try_to_lock);
if (lock.owns_lock()) {
queueBacklog();
}
}
}
void IOWorker::work() {
v_int32 sleepIteration = 0;
v_int32 consumeIteration = 0;
while(m_running) {
auto curr = m_queue.first;
if(curr != nullptr) {
const Action &action = curr->task.coroutine->iterate();
if (action.getType() == Action::TYPE_WAIT_FOR_IO) {
m_queue.round();
} else {
curr->task.sender->addTask(curr->task);
m_queue.popFrontNoData();
}
++ sleepIteration;
if(sleepIteration == 1000) {
sleepIteration = 0;
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
++ consumeIteration;
if(consumeIteration == 100) {
consumeBacklog(false);
}
} else {
sleepIteration = 0;
consumeBacklog(true);
}
}
}
}}

View File

@ -0,0 +1,88 @@
/***************************************************************************
*
* Project _____ __ ____ _ _
* ( _ ) /__\ (_ _)_| |_ _| |_
* )(_)( /(__)\ )( (_ _)(_ _)
* (_____)(__)(__)(__) |_| |_|
*
*
* Copyright 2018-present, Leonid Stryzhevskyi <lganzzzo@gmail.com>
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
***************************************************************************/
#ifndef oatpp_async_IOEventWorker_hpp
#define oatpp_async_IOEventWorker_hpp
#include "./Worker.hpp"
#include "oatpp/core/collection/LinkedList.hpp"
#include <thread>
#include <mutex>
#include <condition_variable>
namespace oatpp { namespace async {
class IOWorker : public Worker {
private:
struct TaskSlot {
static void* operator new(std::size_t sz) {
return ::operator new(sz);
}
static void operator delete(void* ptr, std::size_t sz) {
::operator delete(ptr);
}
Task task;
TaskSlot* _ref;
};
private:
typedef oatpp::collection::FastQueue<TaskSlot> Queue;
typedef oatpp::collection::LinkedList<Task> Backlog;
private:
bool m_running;
Backlog m_backlog;
Queue m_queue;
std::mutex m_backlogMutex;
std::condition_variable m_backlogCondition;
private:
void queueBacklog();
void consumeBacklog(bool blockToConsume);
public:
IOWorker()
: m_running(true)
{
std::thread thread(&IOWorker::work, this);
thread.detach();
}
void addTask(const Task& task) override;
void work();
void stop() override {
m_running = false;
}
};
}}
#endif //oatpp_async_IOEventWorker_hpp

View File

@ -0,0 +1,25 @@
/***************************************************************************
*
* Project _____ __ ____ _ _
* ( _ ) /__\ (_ _)_| |_ _| |_
* )(_)( /(__)\ )( (_ _)(_ _)
* (_____)(__)(__)(__) |_| |_|
*
*
* Copyright 2018-present, Leonid Stryzhevskyi <lganzzzo@gmail.com>
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
***************************************************************************/
#include "Scheduler.hpp"

View File

@ -0,0 +1,60 @@
/***************************************************************************
*
* Project _____ __ ____ _ _
* ( _ ) /__\ (_ _)_| |_ _| |_
* )(_)( /(__)\ )( (_ _)(_ _)
* (_____)(__)(__)(__) |_| |_|
*
*
* Copyright 2018-present, Leonid Stryzhevskyi <lganzzzo@gmail.com>
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
***************************************************************************/
#ifndef oatpp_async_Scheduler_hpp
#define oatpp_async_Scheduler_hpp
#include "oatpp/core/Types.hpp"
class Scheduler {
public:
enum WorkerType : v_int32 {
/**
* Worker type - general processor.
*/
PROCESSOR = 0,
/**
* Worker type - timer processor.
*/
TIMER = 1,
/**
* Worker type - I/O processor.
*/
IO = 2
};
private:
};
#endif //oatpp_async_Scheduler_hpp

View File

@ -0,0 +1,25 @@
/***************************************************************************
*
* Project _____ __ ____ _ _
* ( _ ) /__\ (_ _)_| |_ _| |_
* )(_)( /(__)\ )( (_ _)(_ _)
* (_____)(__)(__)(__) |_| |_|
*
*
* Copyright 2018-present, Leonid Stryzhevskyi <lganzzzo@gmail.com>
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
***************************************************************************/
#include "Worker.hpp"

View File

@ -0,0 +1,68 @@
/***************************************************************************
*
* Project _____ __ ____ _ _
* ( _ ) /__\ (_ _)_| |_ _| |_
* )(_)( /(__)\ )( (_ _)(_ _)
* (_____)(__)(__)(__) |_| |_|
*
*
* Copyright 2018-present, Leonid Stryzhevskyi <lganzzzo@gmail.com>
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
***************************************************************************/
#ifndef oatpp_async_Worker_hpp
#define oatpp_async_Worker_hpp
#include "./Coroutine.hpp"
namespace oatpp { namespace async {
class Worker {
public:
struct Task {
mutable Action action;
AbstractCoroutine* coroutine;
Worker* sender;
Task(const Task& other)
: action(std::move(other.action))
, coroutine(other.coroutine)
, sender(other.sender)
{}
Task& operator = (const Task& other) {
action = std::move(other.action);
coroutine = other.coroutine;
sender = other.sender;
return *this;
}
};
public:
virtual ~Worker() = default;
virtual void addTask(const Task& task) = 0;
virtual void stop() = 0;
};
}}
#endif //oatpp_async_Worker_hpp

View File

@ -86,24 +86,48 @@ enum IOError : v_io_size {
};
/**
* Asynchronous I/O error. <br>
* Extends &id:oatpp::async::Error;.
*/
class AsyncIOError : public oatpp::async::Error {
public:
/**
* Pre Created error. BROKEN_PIPE.
*/
static const std::shared_ptr<const Error> ERROR_BROKEN_PIPE;
/**
* Pre Created error. ZERO_VALUE.
*/
static const std::shared_ptr<const Error> ERROR_ZERO_VALUE;
private:
v_io_size m_code;
public:
/**
* Constructor.
* @param what - description of error type.
* @param code - I/O opersation error code. &l:IOError;.
*/
AsyncIOError(const char* what, v_io_size code)
: oatpp::async::Error(what)
, m_code(code)
{}
/**
* Constructor.
* @param code - I/O opersation error code. &l:IOError;.
*/
AsyncIOError(v_io_size code)
: oatpp::async::Error("AsyncIOError")
, m_code(code)
{}
/**
* Get I/O opersation error code.
* @return - I/O opersation error code. &l:IOError;.
*/
v_io_size getCode() const {
return m_code;
}