Async Processor with slowQueue

This commit is contained in:
lganzzzo 2018-03-19 03:02:15 +02:00
parent cc1f7f4541
commit d34d11cafd
5 changed files with 285 additions and 115 deletions

View File

@ -37,12 +37,16 @@ private:
oatpp::concurrency::SpinLock::Atom m_atom;
Entry* m_first;
Entry* m_last;
v_int32 m_count;
v_int32 m_max;
public:
Queue()
: m_atom(false)
, m_first(nullptr)
, m_last(nullptr)
, m_count(0)
, m_max(0)
{}
Entry* peekFront() {
@ -57,6 +61,7 @@ private:
if(m_first == nullptr) {
m_last = nullptr;
}
m_count --;
}
return result;
}
@ -72,6 +77,7 @@ private:
if(m_last == nullptr) {
m_last = entry;
}
m_count ++;
}
void pushBack(Routine* routine) {
@ -88,6 +94,32 @@ private:
m_first = entry;
m_last = entry;
}
m_count ++;
if(m_count > m_max) {
m_max = m_count;
//OATPP_LOGD("queue", "size=%d", m_max);
}
}
v_int32 getCount(){
return m_count;
}
static void moveEntryToQueue(Queue& from, Queue& to, Queue::Entry* curr, Queue::Entry* prev){
//OATPP_LOGD("proc", "moved to fast");
if(prev == nullptr) {
to.pushFront(from.popFront());
} else if(curr->next == nullptr) {
to.pushBack(curr);
from.m_last = prev;
prev->next = nullptr;
from.m_count --;
} else {
prev->next = curr->next;
to.pushBack(curr);
from.m_count --;
}
}
};
@ -120,9 +152,12 @@ private:
}
void doAction(Action& a){
if(a.getType() == Action::TYPE_RETRY) {
if(a.getType() == Action::TYPE_REPEAT) {
m_queue.pushBack(m_queue.popFront());
return;
} else if(a.getType() == Action::TYPE_WAIT_RETRY) {
m_queueSlow.pushBack(m_queue.popFront());
return;
} else if(a.getType() == Action::TYPE_RETURN) {
auto entry = m_queue.popFront();
auto routine = entry->routine->m_parent;
@ -184,11 +219,71 @@ private:
}
}
bool auditQueueSlow(){
m_auditTimer = 0;
Queue::Entry* curr = m_queueSlow.peekFront();
Queue::Entry* prev = nullptr;
bool hasActions = false;
while (curr != nullptr) {
auto& block = curr->routine->blocks.peek();
try{
Action action = block.function();
if(action.getType() != Action::TYPE_WAIT_RETRY){
curr->routine->pendingAction = action;
action.null();
Queue::moveEntryToQueue(m_queueSlow, m_queue, curr, prev);
hasActions = true;
if(prev != nullptr) {
curr = prev;
} else {
curr = m_queueSlow.peekFront();
}
}
} catch(...) {
Error error {"Unknown", true };
curr->routine->pendingAction = error;
Queue::moveEntryToQueue(m_queueSlow, m_queue, curr, prev);
hasActions = true;
if(prev != nullptr) {
curr = prev;
} else {
curr = m_queueSlow.peekFront();
}
}
prev = curr;
if(curr != nullptr) {
curr = curr->next;
}
}
return hasActions;
}
void checkAudit(){
m_auditTimer ++;
if(m_auditTimer > 100) {
auditQueueSlow();
}
}
private:
Queue m_queue;
Queue m_queueSlow;
v_int32 m_auditTimer; // in cycles
public:
Processor()
: m_auditTimer(0)
{}
void addRoutine(const Routine::Builder& routine){
m_queue.pushBack(routine.m_routine);
routine.m_routine = nullptr;
@ -202,6 +297,19 @@ public:
auto r = entry->routine;
if(r->blocks.isEmpty()){
returnFromCurrentRoutine();
checkAudit();
return true;
}
if(!r->pendingAction.isNone()) {
Action action = r->pendingAction;
r->pendingAction.null();
if(action.isErrorAction()){
propagateError(action.getError());
} else {
doAction(action);
}
checkAudit();
return true;
}
@ -218,9 +326,13 @@ public:
Error error {"Unknown", true };
propagateError(error);
}
checkAudit();
return true;
}
return false;
return auditQueueSlow();
}

View File

@ -10,10 +10,99 @@
namespace oatpp { namespace async {
const v_int32 Action::TYPE_RETRY = 0;
const v_int32 Action::TYPE_RETURN = 1;
const v_int32 Action::TYPE_ABORT = 2;
const v_int32 Action::TYPE_ERROR = 3;
const v_int32 Action::TYPE_ROUTINE = 4;
const v_int32 Action::TYPE_NONE = 0;
const v_int32 Action::TYPE_REPEAT = 1;
const v_int32 Action::TYPE_WAIT_RETRY = 2;
const v_int32 Action::TYPE_RETURN = 3;
const v_int32 Action::TYPE_ABORT = 4;
const v_int32 Action::TYPE_ERROR = 5;
const v_int32 Action::TYPE_ROUTINE = 6;
RoutineBuilder::RoutineBuilder()
: m_routine(new Routine())
{}
RoutineBuilder::RoutineBuilder(const RoutineBuilder& other)
: m_routine(other.m_routine)
{
other.m_routine = nullptr;
}
RoutineBuilder::~RoutineBuilder() {
if(m_routine != nullptr) {
delete m_routine;
}
}
RoutineBuilder& RoutineBuilder::_then(const Block& block){
m_routine->blocks.pushBack(block);
return *this;
}
Action& Action::_repeat() {
static Action a(TYPE_REPEAT);
return a;
}
Action& Action::_wait_retry() {
static Action a(TYPE_WAIT_RETRY);
return a;
}
Action& Action::_return(){
static Action a(TYPE_RETURN);
return a;
}
Action& Action::_abort(){
static Action a(TYPE_ABORT);
return a;
}
Action::Action(v_int32 type)
: m_type(type)
, m_routine(nullptr)
{}
Action::Action()
: m_type(TYPE_NONE)
, m_routine(nullptr)
{}
Action::Action(const Error& error)
: m_type (TYPE_ERROR)
, m_error(error)
, m_routine(nullptr)
{}
Action::Action(const Routine::Builder& routine)
: m_type(TYPE_ROUTINE)
, m_error({nullptr, false})
, m_routine(routine.m_routine)
{
routine.m_routine = nullptr;
}
Action::Action(std::nullptr_t nullp)
: m_type(TYPE_ROUTINE)
, m_error({nullptr, false})
, m_routine(new Routine())
{}
Action::~Action() {
if(m_routine != nullptr){
delete m_routine;
}
}
Error& Action::getError(){
return m_error;
}
bool Action::isErrorAction(){
return m_type == TYPE_ERROR;
}
v_int32 Action::getType(){
return m_type;
}
}}

View File

@ -18,6 +18,7 @@
namespace oatpp { namespace async {
class Action; // FWD
class Routine; // FWD
struct Error {
public:
@ -34,8 +35,69 @@ public:
ErrorHandler errorHandler;
};
class RoutineBuilder {
friend Action;
friend Processor;
private:
mutable Routine* m_routine;
public:
RoutineBuilder();
RoutineBuilder(const RoutineBuilder& other);
~RoutineBuilder();
RoutineBuilder& _then(const Block& block);
};
class Action {
friend Processor;
public:
static const v_int32 TYPE_NONE;
static const v_int32 TYPE_REPEAT;
static const v_int32 TYPE_WAIT_RETRY;
static const v_int32 TYPE_RETURN;
static const v_int32 TYPE_ABORT;
static const v_int32 TYPE_ERROR;
static const v_int32 TYPE_ROUTINE;
public:
static Action& _repeat();
static Action& _wait_retry();
static Action& _return();
static Action& _abort();
private:
void null(){
m_type = TYPE_NONE;
m_routine = nullptr;
}
private:
v_int32 m_type;
Error m_error;
Routine* m_routine;
private:
Action(v_int32 type);
public:
Action();
Action(const Error& error);
Action(const RoutineBuilder& routine);
Action(std::nullptr_t nullp);
~Action();
Error& getError();
bool isErrorAction();
v_int32 getType();
bool isNone() {
return m_type == TYPE_NONE;
}
};
class Routine {
friend Processor;
public:
typedef RoutineBuilder Builder;
private:
Routine* m_parent;
public:
@ -45,39 +107,8 @@ public:
{}
Stack<Block> blocks;
public:
class Builder {
friend Action;
friend Processor;
private:
mutable Routine* m_routine;
public:
Builder()
: m_routine(new Routine())
{}
Builder(const Builder& other)
: m_routine(other.m_routine)
{
other.m_routine = nullptr;
}
~Builder() {
if(m_routine != nullptr) {
delete m_routine;
}
}
Builder& _then(const Block& block){
m_routine->blocks.pushBack(block);
return *this;
}
};
Action pendingAction;
public:
static Builder _do(const Block& block){
@ -88,77 +119,6 @@ public:
};
class Action {
friend Processor;
public:
static const v_int32 TYPE_RETRY;
static const v_int32 TYPE_RETURN;
static const v_int32 TYPE_ABORT;
static const v_int32 TYPE_ERROR;
static const v_int32 TYPE_ROUTINE;
public:
static Action& _retry() {
static Action a(TYPE_RETRY);
return a;
}
static Action& _return(){
static Action a(TYPE_RETURN);
return a;
}
static Action& _abort(){
static Action a(TYPE_ABORT);
return a;
}
private:
v_int32 m_type;
Error m_error;
Routine* m_routine;
private:
Action(v_int32 type)
: m_type(type)
{}
public:
Action(const Error& error)
: m_type (TYPE_ERROR)
, m_error(error)
, m_routine(nullptr)
{}
Action(const Routine::Builder& routine)
: m_type(TYPE_ROUTINE)
, m_error({nullptr, false})
, m_routine(routine.m_routine)
{
routine.m_routine = nullptr;
}
Action(std::nullptr_t nullp)
: m_type(TYPE_ROUTINE)
, m_error({nullptr, false})
, m_routine(new Routine())
{}
~Action() {
if(m_routine != nullptr){
delete m_routine;
}
}
Error& getError(){
return m_error;
}
bool isErrorAction(){
return m_type == TYPE_ERROR;
}
v_int32 getType(){
return m_type;
}
};
}}
#endif /* oatpp_async_Block_hpp */

View File

@ -97,7 +97,7 @@ void AsyncHttpConnectionHandler::handleConnection(const std::shared_ptr<oatpp::d
}, [] (const oatpp::async::Error& error) {
//OATPP_LOGD("AsyncHttpConnectionHandler", "received error");
if(error.error == HttpProcessor::RETURN_KEEP_ALIVE) {
return oatpp::async::Action::_retry();
return oatpp::async::Action::_repeat();
}
return oatpp::async::Action(nullptr);
}

View File

@ -159,11 +159,13 @@ HttpProcessor::processRequestAsync(HttpRouter* router,
: connectionState(pConnectionState)
, ioBuffer(pConnectionState->ioBuffer->getData())
, ioBufferSize(pConnectionState->ioBuffer->getSize())
, retries(0)
{}
const std::shared_ptr<ConnectionState>& connectionState;
void* ioBuffer;
v_int32 ioBufferSize;
oatpp::os::io::Library::v_size readCount;
v_int32 retries;
};
auto state = std::make_shared<LocaleState>(connectionState);
@ -172,12 +174,19 @@ HttpProcessor::processRequestAsync(HttpRouter* router,
[state] {
//static std::atomic<v_int32> maxRetries(0);
state->readCount = state->connectionState->connection->read(state->ioBuffer, state->ioBufferSize);
if(state->readCount > 0) {
//OATPP_LOGD("FR", "line=%s", (const char*) state->ioBuffer);
return oatpp::async::Action(nullptr);
} else if(state->readCount == oatpp::data::stream::IOStream::ERROR_TRY_AGAIN){
return oatpp::async::Action::_retry();
/*
state->retries ++;
if(state->retries > maxRetries){
maxRetries = state->retries;
OATPP_LOGD("Retry", "max=%d", maxRetries.load());
}*/
return oatpp::async::Action::_wait_retry();
}
return oatpp::async::Action::_abort();
}, nullptr