Coroutine. Add waitFor convenience method

This commit is contained in:
lganzzzo 2019-11-15 17:16:59 +02:00
parent e695e7c483
commit 237e9c0c90
4 changed files with 79 additions and 86 deletions

View File

@ -327,14 +327,56 @@ Action AbstractCoroutine::handleError(Error* error) {
return Action(error);
}
/**
* Get parent coroutine
* @return - pointer to a parent coroutine
*/
AbstractCoroutine* AbstractCoroutine::getParent() const {
return m_parent;
}
Action AbstractCoroutine::repeat() {
return Action::createActionByType(Action::TYPE_REPEAT);
}
Action AbstractCoroutine::waitRepeat(const std::chrono::duration<v_int64, std::micro>& timeout) {
auto startTime = std::chrono::system_clock::now();
auto end = startTime + timeout;
std::chrono::microseconds ms = std::chrono::duration_cast<std::chrono::microseconds>(end.time_since_epoch());
return Action::createWaitRepeatAction(ms.count());
}
CoroutineStarter AbstractCoroutine::waitFor(const std::chrono::duration<v_int64, std::micro>& timeout) {
class WaitingCoroutine : public Coroutine<WaitingCoroutine> {
private:
std::chrono::duration<v_int64, std::micro> m_duration;
bool m_wait;
public:
WaitingCoroutine(const std::chrono::duration<v_int64, std::micro>& duration)
: m_duration(duration)
, m_wait(true)
{}
Action act() override {
if(m_wait) {
m_wait = false;
return waitRepeat(m_duration);
}
return finish();
}
};
return WaitingCoroutine::start(timeout);
}
Action AbstractCoroutine::ioWait(data::v_io_handle ioHandle, Action::IOEventType ioEventType) {
return Action::createIOWaitAction(ioHandle, ioEventType);
}
Action AbstractCoroutine::ioRepeat(data::v_io_handle ioHandle, Action::IOEventType ioEventType) {
return Action::createIORepeatAction(ioHandle, ioEventType);
}
Action AbstractCoroutine::error(Error* error) {
return error;
}

View File

@ -504,12 +504,42 @@ public:
*/
AbstractCoroutine* getParent() const;
/**
* Convenience method to generate Action of `type == Action::TYPE_REPEAT`.
* @return - repeat Action.
*/
static Action repeat();
/**
* Convenience method to generate Action of `type == Action::TYPE_WAIT_REPEAT`.
* @return - TYPE_WAIT_REPEAT Action.
*/
static Action waitRepeat(const std::chrono::duration<v_int64, std::micro>& timeout);
/**
* Wait asynchronously for the specified time.
* @return - repeat Action.
*/
CoroutineStarter waitFor(const std::chrono::duration<v_int64, std::micro>& timeout);
/**
* Convenience method to generate Action of `type == Action::TYPE_IO_WAIT`.
* @return - TYPE_WAIT_FOR_IO Action.
*/
static Action ioWait(data::v_io_handle ioHandle, Action::IOEventType ioEventType);
/**
* Convenience method to generate Action of `type == Action::TYPE_IO_WAIT`.
* @return - TYPE_IO_REPEAT Action.
*/
static Action ioRepeat(data::v_io_handle ioHandle, Action::IOEventType ioEventType);
/**
* Convenience method to generate error reporting Action.
* @param error - &id:oatpp:async::Error;.
* @return - error reporting Action.
*/
Action error(Error* error);
static Action error(Error* error);
/**
* Convenience method to generate error reporting Action.
@ -579,41 +609,6 @@ public:
return Action(static_cast<FunctionPtr>(function));
}
/**
* Convenience method to generate Action of `type == Action::TYPE_REPEAT`.
* @return - repeat Action.
*/
Action repeat() const {
return Action::createActionByType(Action::TYPE_REPEAT);
}
/**
* Convenience method to generate Action of `type == Action::TYPE_WAIT_REPEAT`.
* @return - TYPE_WAIT_REPEAT Action.
*/
Action waitRepeat(const std::chrono::duration<v_int64, std::micro>& timeout) const {
auto startTime = std::chrono::system_clock::now();
auto end = startTime + timeout;
std::chrono::microseconds ms = std::chrono::duration_cast<std::chrono::microseconds>(end.time_since_epoch());
return Action::createWaitRepeatAction(ms.count());
}
/**
* Convenience method to generate Action of `type == Action::TYPE_IO_WAIT`.
* @return - TYPE_WAIT_FOR_IO Action.
*/
Action ioWait(data::v_io_handle ioHandle, Action::IOEventType ioEventType) const {
return Action::createIOWaitAction(ioHandle, ioEventType);
}
/**
* Convenience method to generate Action of `type == Action::TYPE_IO_WAIT`.
* @return - TYPE_IO_REPEAT Action.
*/
Action ioRepeat(data::v_io_handle ioHandle, Action::IOEventType ioEventType) const {
return Action::createIORepeatAction(ioHandle, ioEventType);
}
/**
* Convenience method to generate Action of `type == Action::TYPE_FINISH`.
* @return - finish Action.
@ -766,41 +761,6 @@ public:
return Action(static_cast<AbstractCoroutine::FunctionPtr>(function));
}
/**
* Convenience method to generate Action of `type == Action::TYPE_WAIT_REPEAT`.
* @return - TYPE_WAIT_REPEAT Action.
*/
Action waitRepeat(const std::chrono::duration<v_int64, std::micro>& timeout) const {
auto startTime = std::chrono::system_clock::now();
auto end = startTime + timeout;
std::chrono::microseconds ms = std::chrono::duration_cast<std::chrono::microseconds>(end.time_since_epoch());
return Action::createWaitRepeatAction(ms.count());
}
/**
* Convenience method to generate Action of `type == Action::TYPE_IO_WAIT`.
* @return - TYPE_WAIT_FOR_IO Action.
*/
Action ioWait(data::v_io_handle ioHandle, Action::IOEventType ioEventType) const {
return Action::createIOWaitAction(ioHandle, ioEventType);
}
/**
* Convenience method to generate Action of `type == Action::TYPE_IO_WAIT`.
* @return - TYPE_IO_REPEAT Action.
*/
Action ioRepeat(data::v_io_handle ioHandle, Action::IOEventType ioEventType) const {
return Action::createIORepeatAction(ioHandle, ioEventType);
}
/**
* Convenience method to generate Action of `type == Action::TYPE_REPEAT`.
* @return - repeat Action.
*/
Action repeat() const {
return Action::createActionByType(Action::TYPE_REPEAT);
}
/**
* Call caller's Callback passing returned value, and generate Action of `type == Action::TYPE_FINISH`.
* @param args - argumets to be passed to callback.

View File

@ -132,7 +132,6 @@ RequestExecutor::executeAsync(
Headers m_headers;
std::shared_ptr<Body> m_body;
std::shared_ptr<ConnectionHandle> m_connectionHandle;
bool m_slept;
RetryPolicy::Context m_context;
public:
@ -148,13 +147,10 @@ RequestExecutor::executeAsync(
, m_headers(headers)
, m_body(body)
, m_connectionHandle(connectionHandle)
, m_slept(false)
{}
Action act() override {
m_slept = false;
if(!m_connectionHandle) {
return m_this->getConnectionAsync().callbackTo(&ExecutorCoroutine::onConnection);
}
@ -192,12 +188,7 @@ RequestExecutor::executeAsync(
m_connectionHandle.reset();
}
if(!m_slept) {
m_slept = true;
return waitRepeat(std::chrono::microseconds(m_this->m_retryPolicy->waitForMicroseconds(m_context)));
}
return yieldTo(&ExecutorCoroutine::act);
return waitFor(std::chrono::microseconds(m_this->m_retryPolicy->waitForMicroseconds(m_context))).next(yieldTo(&ExecutorCoroutine::act));
}

View File

@ -137,7 +137,7 @@ public:
Action useConnection() {
if(m_repeats < 1) {
m_repeats ++;
return waitRepeat(std::chrono::milliseconds(100));
return waitFor(std::chrono::milliseconds(100)).next(yieldTo(&ClientCoroutine::useConnection));
}
if(m_invalidate) {
m_connection->invalidate();