async::Action refactored. Async Error handling refactored.

This commit is contained in:
lganzzzo 2019-03-30 18:02:46 +02:00
parent 2086a362d1
commit da5162c106
40 changed files with 569 additions and 426 deletions

View File

@ -13,6 +13,8 @@ add_library(oatpp
oatpp/codegen/codegen_undef_DTO_.hpp
oatpp/core/Types.cpp
oatpp/core/Types.hpp
oatpp/core/async/Error.cpp
oatpp/core/async/Error.hpp
oatpp/core/async/Coroutine.cpp
oatpp/core/async/Coroutine.hpp
oatpp/core/async/Executor.cpp

View File

@ -26,41 +26,57 @@
namespace oatpp { namespace async {
const Action Action::_WAIT_RETRY(TYPE_WAIT_RETRY, nullptr, nullptr);
const Action Action::_REPEAT(TYPE_REPEAT, nullptr, nullptr);
const Action Action::_FINISH(TYPE_FINISH, nullptr, nullptr);
const Action Action::_ABORT(TYPE_ABORT, nullptr, nullptr);
Action Action::clone(const Action& action) {
Action result(action.m_type);
result.m_data = action.m_data;
return result;
}
Error::Error(const char* pMessage, bool pIsExceptionThrown)
: message(pMessage)
, isExceptionThrown(pIsExceptionThrown)
{}
Action::Action(AbstractCoroutine* coroutine)
: m_type(TYPE_COROUTINE)
{
m_data.coroutine = coroutine;
}
Action::Action(v_int32 type,
AbstractCoroutine* coroutine,
FunctionPtr functionPtr)
Action::Action(FunctionPtr functionPtr)
: m_type(TYPE_YIELD_TO)
{
m_data.fptr = functionPtr;
}
Action::Action(v_int32 type)
: m_type(type)
, m_coroutine(coroutine)
, m_functionPtr(functionPtr)
, m_error(Error(nullptr))
, m_data()
{}
Action::Action(const Error& error)
: m_type(TYPE_ERROR)
, m_coroutine(nullptr)
, m_functionPtr(nullptr)
, m_error(error)
{}
Action::Action(Action&& other)
: m_type(other.m_type)
, m_data(other.m_data)
{
other.m_data.fptr = nullptr;
}
bool Action::isError(){
Action::~Action() {
if(m_type == TYPE_COROUTINE && m_data.coroutine != nullptr) {
m_data.coroutine->free();
}
}
Action& Action::operator=(Action&& other) {
m_type = other.m_type;
m_data = other.m_data;
other.m_data.fptr = nullptr;
return *this;
}
bool Action::isError() {
return m_type == TYPE_ERROR;
}
void Action::free() {
if(m_coroutine != nullptr) {
m_coroutine->free();
m_coroutine = nullptr;
}
v_int32 Action::getType() {
return m_type;
}
std::shared_ptr<const Error> AbstractCoroutine::ERROR_UNKNOWN = std::make_shared<Error>("Unknown Error");
}}

View File

@ -25,6 +25,7 @@
#ifndef oatpp_async_Coroutine_hpp
#define oatpp_async_Coroutine_hpp
#include "./Error.hpp"
#include "oatpp/core/collection/FastQueue.hpp"
#include "oatpp/core/base/memory/MemoryPool.hpp"
#include "oatpp/core/base/Environment.hpp"
@ -34,31 +35,6 @@ namespace oatpp { namespace async {
class AbstractCoroutine; // FWD
class Processor; // FWD
/**
* Class to hold and communicate errors between Coroutines
*/
class Error {
public:
/**
* Constructor.
* @param pMessage - Error message.
* @param pIsExceptionThrown - Indicate that this error is a result of thrown exception.
*/
Error(const char* pMessage, bool pIsExceptionThrown = false);
/**
* Error message
*/
const char* message;
/**
* Indicates that this error is a result of thrown exception. Re-throw if true to catch original exception.
*/
bool isExceptionThrown;
};
/**
* Class Action represents an asynchronous action.
*/
@ -68,89 +44,102 @@ class Action {
public:
typedef Action (AbstractCoroutine::*FunctionPtr)();
public:
/**
* Indicate that Action is to start coroutine. Value = 0.
*/
static constexpr const v_int32 TYPE_COROUTINE = 0;
static constexpr const v_int32 TYPE_NONE = 0;
/**
* Indicate that Action is to YIELD control to other method of Coroutine. Value = 1.
* Indicate that Action is to start coroutine.
*/
static constexpr const v_int32 TYPE_YIELD_TO = 1;
static constexpr const v_int32 TYPE_COROUTINE = 1;
/**
* Indicate that Action is to WAIT and then RETRY call to current method of Coroutine. Value = 2.
* Indicate that Action is to YIELD control to other method of Coroutine.
*/
static constexpr const v_int32 TYPE_WAIT_RETRY = 2;
static constexpr const v_int32 TYPE_YIELD_TO = 2;
/**
* Indicate that Action is to REPEAT call to current method of Coroutine. Value = 3.
* Indicate that Action is to WAIT and then RETRY call to current method of Coroutine.
*/
static constexpr const v_int32 TYPE_REPEAT = 3;
static constexpr const v_int32 TYPE_WAIT_RETRY = 3;
/**
* Indicate that Action is to FINISH current Coroutine and return control to a caller-Coroutine. Value = 4.
* Indicate that Action is to REPEAT call to current method of Coroutine.
*/
static constexpr const v_int32 TYPE_FINISH = 4;
static constexpr const v_int32 TYPE_REPEAT = 4;
/**
* Deprecated
* Indicate that Action is to FINISH current Coroutine and return control to a caller-Coroutine.
*/
static constexpr const v_int32 TYPE_ABORT = 5;
static constexpr const v_int32 TYPE_FINISH = 5;
/**
* Indicate that Error occurred
* Indicate that Error occurred.
*/
static constexpr const v_int32 TYPE_ERROR = 6;
public:
/**
* Predefined WAIT_RETRY action
*/
static const Action _WAIT_RETRY;
/**
* Predefined REPEAT action
*/
static const Action _REPEAT;
/**
* Predefined FINISH action
*/
static const Action _FINISH;
/**
* Deprecated
*/
static const Action _ABORT;
private:
v_int32 m_type;
AbstractCoroutine* m_coroutine;
FunctionPtr m_functionPtr;
Error m_error;
protected:
void free();
union Data {
FunctionPtr fptr;
AbstractCoroutine* coroutine;
};
private:
mutable v_int32 m_type;
Data m_data;
public:
/**
* Constructor.
* @param type - type of the Action.
* @param coroutine - pointer to a Coroutine to start if type == TYPE_COROUTINE. nullptr otherwise.
* @param functionPtr - pointer to a function to YIELD control to if type == TYPE_YIELD_TO. nullptr otherwise.
*/
Action(v_int32 type, AbstractCoroutine* coroutine, FunctionPtr functionPtr);
static Action clone(const Action& action);
/**
* Constructor. Construct error reporting action.
* @param error - Error message.
* Constructor. Create start-coroutine Action.
* @param coroutine - pointer to &l:AbstractCoroutine;.
*/
Action(const Error& error);
Action(AbstractCoroutine* coroutine);
/**
* Constructor. Create yield_to Action.
* @param functionPtr - pointer to function.
*/
Action(FunctionPtr functionPtr);
/**
* Create Action by type.
* @param type - Action type.
*/
Action(v_int32 type);
/**
* Deleted copy-constructor.
*/
Action(const Action&) = delete;
Action(Action&& other);
/**
* Non-virtual destructor.
*/
~Action();
/*
* Deleted copy-assignment operator.
*/
Action& operator=(const Action&) = delete;
/*
* Move assignment operator.
*/
Action& operator=(Action&& other);
/**
* Check if action is an error reporting action.
* @return `true` if action is an error reporting action.
*/
bool isError();
/**
* Get Action type.
* @return - action type.
*/
v_int32 getType();
};
@ -165,6 +154,7 @@ public:
* Convenience typedef for Action
*/
typedef oatpp::async::Action Action;
typedef oatpp::async::Error Error;
typedef Action (AbstractCoroutine::*FunctionPtr)();
public:
@ -184,81 +174,77 @@ public:
}
};
private:
static std::shared_ptr<const Error> ERROR_UNKNOWN;
private:
AbstractCoroutine* _CP = this;
FunctionPtr _FP = &AbstractCoroutine::act;
std::shared_ptr<const Error> _ERR = nullptr;
AbstractCoroutine* _ref = nullptr;
private:
AbstractCoroutine* m_parent = nullptr;
std::shared_ptr<const Error>& m_propagatedError = _ERR;
protected:
Action m_parentReturnAction = Action(Action::TYPE_FINISH);
private:
Action takeAction(const Action& action){
Action takeAction(Action&& action){
AbstractCoroutine* savedCP;
switch (action.m_type) {
case Action::TYPE_COROUTINE:
action.m_coroutine->m_parent = _CP;
_CP = action.m_coroutine;
_FP = action.m_coroutine->_FP;
break;
action.m_data.coroutine->m_parent = _CP;
_CP = action.m_data.coroutine;
_FP = action.m_data.coroutine->_FP;
return std::forward<oatpp::async::Action>(action);
case Action::TYPE_FINISH:
if(_CP == this) {
_CP = nullptr;
return action;
return std::forward<oatpp::async::Action>(action);
}
{
AbstractCoroutine* savedCP = _CP;
_CP = _CP->m_parent;
_FP = nullptr;
/* Please note that savedCP->m_parentReturnAction should not be "REPEAT nor WAIT_RETRY" */
/* as funtion pointer (FP) is invalidated */
Action a = takeAction(savedCP->m_parentReturnAction);
savedCP->m_parentReturnAction.m_coroutine = nullptr;
savedCP->free();
return a;
}
break;
savedCP = _CP;
_CP = _CP->m_parent;
_FP = nullptr;
/* Please note that savedCP->m_parentReturnAction should not be "REPEAT nor WAIT_RETRY" */
/* as funtion pointer (FP) is invalidated */
action = takeAction(std::move(savedCP->m_parentReturnAction));
savedCP->free();
return std::forward<oatpp::async::Action>(action);
case Action::TYPE_YIELD_TO:
_FP = action.m_functionPtr;
break;
case Action::TYPE_ABORT:
while (_CP != this) {
_CP->free();
_CP = _CP->m_parent;
}
_CP = nullptr;
break;
_FP = action.m_data.fptr;
return std::forward<oatpp::async::Action>(action);
case Action::TYPE_ERROR:
Action a = action;
do {
a = _CP->handleError(a.m_error);
if(a.m_type == Action::TYPE_ERROR) {
if(_CP == this) {
_CP = nullptr;
return a;
do {
action = _CP->handleError(m_propagatedError);
if(action.m_type == Action::TYPE_ERROR) {
if(_CP == this) {
_CP = nullptr;
return std::forward<oatpp::async::Action>(action);
} else {
_CP->free();
_CP = _CP->m_parent;
}
} else {
_CP->free();
_CP = _CP->m_parent;
action = takeAction(std::forward<oatpp::async::Action>(action));
}
} else {
a = takeAction(a);
}
} while (a.m_type == Action::TYPE_ERROR && _CP != nullptr);
return a;
break;
} while (action.m_type == Action::TYPE_ERROR && _CP != nullptr);
return std::forward<oatpp::async::Action>(action);
};
return action;
throw std::runtime_error("[oatpp::async::AbstractCoroutine::takeAction()]: Error. Unknown Action.");
}
private:
AbstractCoroutine* m_parent = nullptr;
protected:
Action m_parentReturnAction = Action::_FINISH;
public:
/**
@ -269,7 +255,8 @@ public:
try {
return takeAction(_CP->call(_FP));
} catch (...) {
return takeAction(Action(Error("Exception", true)));
m_propagatedError = ERROR_UNKNOWN;
return takeAction(Action(Action::TYPE_ERROR));
}
};
@ -287,9 +274,7 @@ public:
/**
* Virtual Destructor
*/
virtual ~AbstractCoroutine(){
m_parentReturnAction.free();
}
virtual ~AbstractCoroutine() = default;
/**
* Entrypoint of Coroutine.
@ -323,8 +308,8 @@ public:
* @return - Action. If handleError function returns Error,
* current coroutine will finish, return control to caller coroutine and handleError is called for caller coroutine.
*/
virtual Action handleError(const Error& error) {
return error;
virtual Action handleError(const std::shared_ptr<const Error>& error) {
return Action(Action::TYPE_ERROR);
}
template<typename ...Args>
@ -341,10 +326,10 @@ public:
* @return - start Coroutine Action.
*/
template<typename C, typename ... Args>
Action startCoroutine(const Action& actionOnReturn, Args... args) {
Action startCoroutine(Action&& actionOnReturn, Args... args) {
C* coroutine = C::getBench().obtain(args...);
coroutine->m_parentReturnAction = actionOnReturn;
return Action(Action::TYPE_COROUTINE, coroutine, nullptr);
coroutine->m_parentReturnAction = std::forward<oatpp::async::Action>(std::forward<oatpp::async::Action>(actionOnReturn));
return Action(coroutine);
}
/**
@ -364,7 +349,7 @@ public:
Action startCoroutineForResult(Action (ParentCoroutineType::*function)(CallbackArgs...), Args... args) {
CoroutineType* coroutine = CoroutineType::getBench().obtain(args...);
coroutine->m_callback = reinterpret_cast<FunctionPtr>(function);
return Action(Action::TYPE_COROUTINE, coroutine, nullptr);
return Action(coroutine);
}
/**
@ -382,6 +367,22 @@ public:
AbstractCoroutine* getParent() const {
return m_parent;
}
/**
* Convenience method to generate error reporting Action.
* @param message - error message.
* @return - error reporting Action.
*/
Action error(const std::shared_ptr<const Error>& error) {
m_propagatedError = error;
return Action(Action::TYPE_ERROR);
}
template<class E, typename ... Args>
Action error(Args... args) {
m_propagatedError = std::make_shared<E>(args...);
return Action(Action::TYPE_ERROR);
}
};
@ -442,48 +443,31 @@ public:
* @return - yield Action.
*/
Action yieldTo(Function function) const {
return Action(Action::TYPE_YIELD_TO, nullptr, static_cast<FunctionPtr>(function));
return Action(static_cast<FunctionPtr>(function));
}
/**
* Convenience method to generate Action of `type == Action::TYPE_WAIT_RETRY`.
* @return - WAIT_RETRY Action.
*/
const Action& waitRetry() const {
return Action::_WAIT_RETRY;
Action waitRetry() const {
return Action::TYPE_WAIT_RETRY;
}
/**
* Convenience method to generate Action of `type == Action::TYPE_REPEAT`.
* @return - repeat Action.
*/
const Action& repeat() const {
return Action::_REPEAT;
Action repeat() const {
return Action::TYPE_REPEAT;
}
/**
* Convenience method to generate Action of `type == Action::TYPE_FINISH`.
* @return - finish Action.
*/
const Action& finish() const {
return Action::_FINISH;
}
/**
* Deprecated.
* @return - abort Action.
*/
const Action& abort() const {
return Action::_ABORT;
}
/**
* Convenience method to generate error reporting Action.
* @param message - error message.
* @return - error reporting Action.
*/
Action error(const char* message) {
return Action(Error(message));
Action finish() const {
return Action::TYPE_FINISH;
}
};
@ -539,23 +523,23 @@ public:
* @return - yield Action.
*/
Action yieldTo(Function function) const {
return Action(Action::TYPE_YIELD_TO, nullptr, static_cast<FunctionPtr>(function));
return Action(static_cast<FunctionPtr>(function));
}
/**
* Convenience method to generate Action of `type == Action::TYPE_WAIT_RETRY`.
* @return - WAIT_RETRY Action.
*/
const Action& waitRetry() const {
return Action::_WAIT_RETRY;
Action waitRetry() const {
return Action::TYPE_WAIT_RETRY;
}
/**
* Convenience method to generate Action of `type == Action::TYPE_REPEAT`.
* @return - repeat Action.
*/
const Action& repeat() const {
return Action::_REPEAT;
Action repeat() const {
return Action::TYPE_REPEAT;
}
/**
@ -564,26 +548,9 @@ public:
* @param args - argumets to be passed to callback.
* @return - finish Action.
*/
const Action& _return(Args... args) {
Action _return(Args... args) {
m_parentReturnAction = getParent()->callWithParams(m_callback, args...);
return Action::_FINISH;
}
/**
* Deprecated.
* @return - abort Action.
*/
const Action& abort() const {
return Action::_ABORT;
}
/**
* Convenience method to generate error reporting Action.
* @param message - error message.
* @return - error reporting Action.
*/
Action error(const char* message) {
return Action(Error(message));
return Action::TYPE_FINISH;
}
};

View File

@ -0,0 +1,37 @@
/***************************************************************************
*
* Project _____ __ ____ _ _
* ( _ ) /__\ (_ _)_| |_ _| |_
* )(_)( /(__)\ )( (_ _)(_ _)
* (_____)(__)(__)(__) |_| |_|
*
*
* Copyright 2018-present, Leonid Stryzhevskyi <lganzzzo@gmail.com>
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
***************************************************************************/
#include "Error.hpp"
namespace oatpp { namespace async {
Error::Error(const char* what)
: m_what(what)
{}
const char* Error::what() const {
return m_what;
}
}}

View File

@ -0,0 +1,65 @@
/***************************************************************************
*
* Project _____ __ ____ _ _
* ( _ ) /__\ (_ _)_| |_ _| |_
* )(_)( /(__)\ )( (_ _)(_ _)
* (_____)(__)(__)(__) |_| |_|
*
*
* Copyright 2018-present, Leonid Stryzhevskyi <lganzzzo@gmail.com>
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
***************************************************************************/
#ifndef oatpp_async_Error_hpp
#define oatpp_async_Error_hpp
namespace oatpp { namespace async {
/**
* Class to hold and communicate errors between Coroutines
*/
class Error {
private:
const char* m_what;
public:
/**
* Constructor.
* @param what - error explanation.
*/
Error(const char* what);
/**
* Error explanation.
* @return
*/
const char* what() const;
/**
* Check if error belongs to specified class.
* @tparam ErrorClass
* @return - `true` if error is of specified class
*/
template<class ErrorClass>
bool is() const {
return dynamic_cast<ErrorClass*>(this) != nullptr;
}
};
}}
#endif //oatpp_async_Error_hpp

View File

@ -32,7 +32,7 @@ bool Processor::checkWaitingQueue() {
AbstractCoroutine* prev = nullptr;
while (curr != nullptr) {
const Action& action = curr->iterate();
if(action.m_type == Action::TYPE_ABORT) {
if(action.m_type == Action::TYPE_FINISH) {
m_waitingQueue.removeEntry(curr, prev);
if(prev != nullptr) {
curr = prev;
@ -48,6 +48,8 @@ bool Processor::checkWaitingQueue() {
curr = m_waitingQueue.first;
}
}
action.m_type = Action::TYPE_NONE;
prev = curr;
if(curr != nullptr) {
@ -96,6 +98,7 @@ bool Processor::iterate(v_int32 numIterations) {
} else {
m_activeQueue.round();
}
action.m_type = Action::TYPE_NONE;
} else {
m_activeQueue.popFrontNoData();
}

View File

@ -25,6 +25,7 @@
#ifndef oatpp_data_IODefinitions_hpp
#define oatpp_data_IODefinitions_hpp
#include "oatpp/core/async/Error.hpp"
#include "oatpp/core/Types.hpp"
namespace oatpp { namespace data {
@ -85,6 +86,27 @@ enum IOError : v_io_size {
};
class AsyncIOError : public oatpp::async::Error {
private:
v_io_size m_code;
public:
AsyncIOError(const char* what, v_io_size code)
: oatpp::async::Error(what)
, m_code(code)
{}
AsyncIOError(v_io_size code)
: oatpp::async::Error("AsyncIOError")
, m_code(code)
{}
v_io_size getCode() {
return m_code;
}
};
}}
#endif //oatpp_data_IODefinitions_hpp

View File

@ -287,7 +287,7 @@ data::v_io_size FIFOBuffer::flushToStream(data::stream::OutputStream& stream) {
}
oatpp::async::Action FIFOBuffer::flushToStreamAsync(oatpp::async::AbstractCoroutine* parentCoroutine,
const oatpp::async::Action& actionOnFinish,
oatpp::async::Action&& actionOnFinish,
const std::shared_ptr<data::stream::OutputStream>& stream)
{
@ -336,15 +336,15 @@ oatpp::async::Action FIFOBuffer::flushToStreamAsync(oatpp::async::AbstractCorout
}
Action fullFlush() {
return data::stream::writeExactSizeDataAsyncInline(m_stream.get(), m_data1, m_size1, yieldTo(&FlushCoroutine::beforeFinish));
return data::stream::writeExactSizeDataAsyncInline(this, m_stream.get(), m_data1, m_size1, yieldTo(&FlushCoroutine::beforeFinish));
}
Action partialFlush1() {
return data::stream::writeExactSizeDataAsyncInline(m_stream.get(), m_data1, m_size1, yieldTo(&FlushCoroutine::partialFlush2));
return data::stream::writeExactSizeDataAsyncInline(this, m_stream.get(), m_data1, m_size1, yieldTo(&FlushCoroutine::partialFlush2));
}
Action partialFlush2() {
return data::stream::writeExactSizeDataAsyncInline(m_stream.get(), m_data2, m_size2, yieldTo(&FlushCoroutine::beforeFinish));
return data::stream::writeExactSizeDataAsyncInline(this, m_stream.get(), m_data2, m_size2, yieldTo(&FlushCoroutine::beforeFinish));
}
Action beforeFinish() {
@ -354,7 +354,7 @@ oatpp::async::Action FIFOBuffer::flushToStreamAsync(oatpp::async::AbstractCorout
};
return parentCoroutine->startCoroutine<FlushCoroutine>(actionOnFinish, shared_from_this(), stream);
return parentCoroutine->startCoroutine<FlushCoroutine>(std::forward<oatpp::async::Action>(actionOnFinish), shared_from_this(), stream);
}

View File

@ -131,7 +131,7 @@ public:
* @return
*/
oatpp::async::Action flushToStreamAsync(oatpp::async::AbstractCoroutine* parentCoroutine,
const oatpp::async::Action& actionOnFinish,
oatpp::async::Action&& actionOnFinish,
const std::shared_ptr<data::stream::OutputStream>& stream);

View File

@ -225,7 +225,7 @@ bool ChunkedBuffer::flushToStream(const std::shared_ptr<OutputStream>& stream){
}
oatpp::async::Action ChunkedBuffer::flushToStreamAsync(oatpp::async::AbstractCoroutine* parentCoroutine,
const oatpp::async::Action& actionOnFinish,
oatpp::async::Action&& actionOnFinish,
const std::shared_ptr<OutputStream>& stream) {
class FlushCoroutine : public oatpp::async::Coroutine<FlushCoroutine> {
@ -247,7 +247,7 @@ oatpp::async::Action ChunkedBuffer::flushToStreamAsync(oatpp::async::AbstractCor
, m_bytesLeft(chunkedBuffer->m_size)
, m_currData(nullptr)
, m_currDataSize(0)
, m_nextAction(Action(Action::TYPE_FINISH, nullptr, nullptr))
, m_nextAction(Action::TYPE_FINISH)
{}
Action act() override {
@ -275,12 +275,12 @@ oatpp::async::Action ChunkedBuffer::flushToStreamAsync(oatpp::async::AbstractCor
}
Action writeCurrData() {
return oatpp::data::stream::writeExactSizeDataAsyncInline(m_stream.get(), m_currData, m_currDataSize, m_nextAction);
return oatpp::data::stream::writeExactSizeDataAsyncInline(this, m_stream.get(), m_currData, m_currDataSize, Action::clone(m_nextAction));
}
};
return parentCoroutine->startCoroutine<FlushCoroutine>(actionOnFinish,
return parentCoroutine->startCoroutine<FlushCoroutine>(std::forward<oatpp::async::Action>(actionOnFinish),
shared_from_this(),
stream);

View File

@ -211,7 +211,7 @@ public:
* @return - &id:oatpp::async::Action;.
*/
oatpp::async::Action flushToStreamAsync(oatpp::async::AbstractCoroutine* parentCoroutine,
const oatpp::async::Action& actionOnFinish,
oatpp::async::Action&& actionOnFinish,
const std::shared_ptr<OutputStream>& stream);
std::shared_ptr<Chunks> getChunks();

View File

@ -27,10 +27,6 @@
namespace oatpp { namespace data{ namespace stream {
const char* const Errors::ERROR_ASYNC_BROKEN_PIPE = "[oatpp::data::stream{}]: Error. AsyncIO. Broken pipe.";
const char* const Errors::ERROR_ASYNC_BAD_RESULT = "[oatpp::data::stream{}]: Error. AsyncIO. Bad result code. Operation returned 0.";
const char* const Errors::ERROR_ASYNC_UNKNOWN_CODE = "[oatpp::data::stream{}]: Error. AsyncIO. Unknown error code returned";
data::v_io_size OutputStream::writeAsString(v_int32 value){
v_char8 a[100];
v_int32 size = utils::conversion::int32ToCharSequence(value, &a[0]);
@ -213,7 +209,7 @@ oatpp::data::v_io_size transfer(const std::shared_ptr<InputStream>& fromStream,
}
oatpp::async::Action transferAsync(oatpp::async::AbstractCoroutine* parentCoroutine,
const oatpp::async::Action& actionOnReturn,
oatpp::async::Action&& actionOnReturn,
const std::shared_ptr<InputStream>& fromStream,
const std::shared_ptr<OutputStream>& toStream,
oatpp::data::v_io_size transferSize,
@ -251,10 +247,10 @@ oatpp::async::Action transferAsync(oatpp::async::AbstractCoroutine* parentCorout
if(m_progress == m_transferSize) {
return finish();
} else if(m_progress > m_transferSize) {
throw std::runtime_error("[oatpp::data::stream::transferAsync{TransferCoroutine::act()}]: Invalid state. m_progress > m_transferSize");
return error<AsyncTransferError>("[oatpp::data::stream::transferAsync{TransferCoroutine::act()}]: Invalid state. m_progress > m_transferSize");
}
} else if(m_transferSize < 0) {
throw std::runtime_error("[oatpp::data::stream::transferAsync{TransferCoroutine::act()}]: Invalid state. m_transferSize < 0");
return error<AsyncTransferError>("[oatpp::data::stream::transferAsync{TransferCoroutine::act()}]: Invalid state. m_transferSize < 0");
}
m_desiredReadCount = m_transferSize - m_progress;
@ -270,7 +266,8 @@ oatpp::async::Action transferAsync(oatpp::async::AbstractCoroutine* parentCorout
}
Action doRead() {
return oatpp::data::stream::readSomeDataAsyncInline(m_fromStream.get(),
return oatpp::data::stream::readSomeDataAsyncInline(this,
m_fromStream.get(),
m_readBufferPtr,
m_bytesLeft,
yieldTo(&TransferCoroutine::prepareWrite));
@ -283,69 +280,73 @@ oatpp::async::Action transferAsync(oatpp::async::AbstractCoroutine* parentCorout
}
Action doWrite() {
return oatpp::data::stream::writeExactSizeDataAsyncInline(m_toStream.get(),
return oatpp::data::stream::writeExactSizeDataAsyncInline(this,
m_toStream.get(),
m_writeBufferPtr,
m_bytesLeft,
yieldTo(&TransferCoroutine::act));
}
Action handleError(const oatpp::async::Error& error) override {
if(!error.isExceptionThrown) {
if(m_transferSize == 0) {
return finish();
}
Action handleError(const std::shared_ptr<const Error>& error) override {
if(m_transferSize == 0) {
return finish();
}
return error;
return Action(Action::TYPE_ERROR);
}
};
return parentCoroutine->startCoroutine<TransferCoroutine>(actionOnReturn, fromStream, toStream, transferSize, buffer);
return parentCoroutine->startCoroutine<TransferCoroutine>(std::forward<oatpp::async::Action>(actionOnReturn), fromStream, toStream, transferSize, buffer);
}
namespace {
oatpp::async::Action asyncActionOnIOError(data::v_io_size res) {
std::shared_ptr<const AsyncIOError> ERROR_ASYNC_BROKEN_PIPE = std::make_shared<AsyncIOError>(IOError::BROKEN_PIPE);
std::shared_ptr<const AsyncIOError> ERROR_ASYNC_ZERO_VALUE = std::make_shared<AsyncIOError>(IOError::ZERO_VALUE);
oatpp::async::Action asyncActionOnIOError(oatpp::async::AbstractCoroutine* coroutine, data::v_io_size res) {
switch (res) {
case IOError::WAIT_RETRY:
return oatpp::async::Action::_WAIT_RETRY;
return oatpp::async::Action::TYPE_WAIT_RETRY;
case IOError::RETRY:
return oatpp::async::Action::_REPEAT;
return oatpp::async::Action::TYPE_REPEAT;
case IOError::BROKEN_PIPE:
return oatpp::async::Action(oatpp::async::Error(Errors::ERROR_ASYNC_BROKEN_PIPE));
return coroutine->error(ERROR_ASYNC_BROKEN_PIPE);
case IOError::ZERO_VALUE:
return oatpp::async::Action(oatpp::async::Error(Errors::ERROR_ASYNC_BAD_RESULT));
return coroutine->error(ERROR_ASYNC_ZERO_VALUE);
}
return oatpp::async::Action(oatpp::async::Error(Errors::ERROR_ASYNC_UNKNOWN_CODE));
return coroutine->error<AsyncIOError>("Unknown IO Error result", res);
}
}
oatpp::async::Action writeExactSizeDataAsyncInline(oatpp::data::stream::OutputStream* stream,
oatpp::async::Action writeExactSizeDataAsyncInline(oatpp::async::AbstractCoroutine* coroutine,
oatpp::data::stream::OutputStream* stream,
const void*& data,
data::v_io_size& size,
const oatpp::async::Action& nextAction) {
oatpp::async::Action&& nextAction) {
if(size > 0) {
auto res = stream->write(data, size);
if(res > 0) {
data = &((p_char8) data)[res];
size -= res;
if (size > 0) {
return oatpp::async::Action::_REPEAT;
return oatpp::async::Action::TYPE_REPEAT;
}
} else {
return asyncActionOnIOError(res);
return asyncActionOnIOError(coroutine, res);
}
}
return nextAction;
return std::forward<oatpp::async::Action>(nextAction);
}
oatpp::async::Action readSomeDataAsyncInline(oatpp::data::stream::InputStream* stream,
oatpp::async::Action readSomeDataAsyncInline(oatpp::async::AbstractCoroutine* coroutine,
oatpp::data::stream::InputStream* stream,
void*& data,
data::v_io_size& size,
const oatpp::async::Action& nextAction) {
oatpp::async::Action&& nextAction) {
if(size > 0) {
auto res = stream->read(data, size);
@ -353,30 +354,31 @@ oatpp::async::Action readSomeDataAsyncInline(oatpp::data::stream::InputStream* s
data = &((p_char8) data)[res];
size -= res;
} else {
return asyncActionOnIOError(res);
return asyncActionOnIOError(coroutine, res);
}
}
return nextAction;
return std::forward<oatpp::async::Action>(nextAction);
}
oatpp::async::Action readExactSizeDataAsyncInline(oatpp::data::stream::InputStream* stream,
oatpp::async::Action readExactSizeDataAsyncInline(oatpp::async::AbstractCoroutine* coroutine,
oatpp::data::stream::InputStream* stream,
void*& data,
data::v_io_size& size,
const oatpp::async::Action& nextAction) {
oatpp::async::Action&& nextAction) {
if(size > 0) {
auto res = stream->read(data, size);
if(res > 0) {
data = &((p_char8) data)[res];
size -= res;
if (size > 0) {
return oatpp::async::Action::_REPEAT;
return oatpp::async::Action::TYPE_REPEAT;
}
} else {
return asyncActionOnIOError(res);
return asyncActionOnIOError(coroutine, res);
}
}
return nextAction;
return std::forward<oatpp::async::Action>(nextAction);
}
oatpp::data::v_io_size readExactSizeData(oatpp::data::stream::InputStream* stream, void* data, data::v_io_size size) {

View File

@ -32,26 +32,6 @@
namespace oatpp { namespace data{ namespace stream {
class Errors {
public:
/**
* IOError::BROKEN_PIPE returned
*/
static const char* const ERROR_ASYNC_BROKEN_PIPE;
/**
* IOError::ZERO_VALUE value returned
*/
static const char* const ERROR_ASYNC_BAD_RESULT;
/**
* Error code returned is not from IOError enum
*/
static const char* const ERROR_ASYNC_UNKNOWN_CODE;
};
/**
* Output Stream.
*/
@ -206,6 +186,10 @@ OutputStream& operator << (OutputStream& s, v_float32 value);
OutputStream& operator << (OutputStream& s, v_float64 value);
OutputStream& operator << (OutputStream& s, bool value);
class AsyncTransferError : public oatpp::async::Error {
public:
AsyncTransferError(const char* what) : oatpp::async::Error(what) {}
};
/**
* Read bytes from @fromStream" and write to @toStream" using @buffer of size @bufferSize
@ -223,27 +207,30 @@ oatpp::data::v_io_size transfer(const std::shared_ptr<InputStream>& fromStream,
* Same as transfer but asynchronous
*/
oatpp::async::Action transferAsync(oatpp::async::AbstractCoroutine* parentCoroutine,
const oatpp::async::Action& actionOnReturn,
oatpp::async::Action&& actionOnReturn,
const std::shared_ptr<InputStream>& fromStream,
const std::shared_ptr<OutputStream>& toStream,
oatpp::data::v_io_size transferSize,
const std::shared_ptr<oatpp::data::buffer::IOBuffer>& buffer);
oatpp::async::Action writeExactSizeDataAsyncInline(oatpp::data::stream::OutputStream* stream,
oatpp::async::Action writeExactSizeDataAsyncInline(oatpp::async::AbstractCoroutine* coroutine,
oatpp::data::stream::OutputStream* stream,
const void*& data,
data::v_io_size& size,
const oatpp::async::Action& nextAction);
oatpp::async::Action&& nextAction);
oatpp::async::Action readSomeDataAsyncInline(oatpp::data::stream::InputStream* stream,
oatpp::async::Action readSomeDataAsyncInline(oatpp::async::AbstractCoroutine* coroutine,
oatpp::data::stream::InputStream* stream,
void*& data,
data::v_io_size& bytesLeftToRead,
const oatpp::async::Action& nextAction);
oatpp::async::Action&& nextAction);
oatpp::async::Action readExactSizeDataAsyncInline(oatpp::data::stream::InputStream* stream,
oatpp::async::Action readExactSizeDataAsyncInline(oatpp::async::AbstractCoroutine* coroutine,
oatpp::data::stream::InputStream* stream,
void*& data,
data::v_io_size& bytesLeftToRead,
const oatpp::async::Action& nextAction);
oatpp::async::Action&& nextAction);
/**
* Read exact amount of bytes to stream

View File

@ -43,8 +43,8 @@ data::v_io_size OutputStreamBufferedProxy::flush() {
}
oatpp::async::Action OutputStreamBufferedProxy::flushAsync(oatpp::async::AbstractCoroutine* parentCoroutine,
const oatpp::async::Action& actionOnFinish) {
return m_buffer->flushToStreamAsync(parentCoroutine, actionOnFinish, m_outputStream);
oatpp::async::Action&& actionOnFinish) {
return m_buffer->flushToStreamAsync(parentCoroutine, std::forward<oatpp::async::Action>(actionOnFinish), m_outputStream);
}
data::v_io_size InputStreamBufferedProxy::read(void *data, data::v_io_size count) {

View File

@ -75,7 +75,7 @@ public:
data::v_io_size write(const void *data, data::v_io_size count) override;
data::v_io_size flush();
oatpp::async::Action flushAsync(oatpp::async::AbstractCoroutine* parentCoroutine,
const oatpp::async::Action& actionOnFinish);
oatpp::async::Action&& actionOnFinish);
void setBufferPosition(data::v_io_size readPosition, data::v_io_size writePosition, bool canRead) {
m_buffer->setBufferPosition(readPosition, writePosition, canRead);

View File

@ -105,7 +105,7 @@ oatpp::async::Action SimpleTCPConnectionProvider::getConnectionAsync(oatpp::asyn
struct hostent* host = gethostbyname((const char*) m_host->getData());
if ((host == NULL) || (host->h_addr == NULL)) {
return error("[oatpp::network::client::SimpleTCPConnectionProvider::getConnectionAsync()]: Error. Can't retrieve DNS information.");
return error<Error>("[oatpp::network::client::SimpleTCPConnectionProvider::getConnectionAsync()]: Error. Can't retrieve DNS information.");
}
bzero(&m_client, sizeof(m_client));
@ -116,7 +116,7 @@ oatpp::async::Action SimpleTCPConnectionProvider::getConnectionAsync(oatpp::asyn
m_clientHandle = socket(AF_INET, SOCK_STREAM, 0);
if (m_clientHandle < 0) {
return error("[oatpp::network::client::SimpleTCPConnectionProvider::getConnectionAsync()]: Error. Can't create socket.");
return error<Error>("[oatpp::network::client::SimpleTCPConnectionProvider::getConnectionAsync()]: Error. Can't create socket.");
}
fcntl(m_clientHandle, F_SETFL, O_NONBLOCK);
@ -145,7 +145,7 @@ oatpp::async::Action SimpleTCPConnectionProvider::getConnectionAsync(oatpp::asyn
return repeat();
}
::close(m_clientHandle);
return error("[oatpp::network::client::SimpleTCPConnectionProvider::getConnectionAsync()]: Error. Can't connect.");
return error<Error>("[oatpp::network::client::SimpleTCPConnectionProvider::getConnectionAsync()]: Error. Can't connect.");
}
};

View File

@ -59,6 +59,41 @@ public:
};
/**
* Protocol Error Info.
*/
template<class Status>
struct ProtocolErrorInfo {
/**
* Default Constructor.
*/
ProtocolErrorInfo()
: ioStatus(0)
{}
/**
* Constructor.
* @param pIOStatus - I/O level error. See &id:oatpp::data::v_io_size;.
* @param pStatus - configurable arbitrary data type.
*/
ProtocolErrorInfo(oatpp::data::v_io_size pIOStatus, const Status& pStatus)
: ioStatus(pIOStatus)
, status(pStatus)
{}
/**
* Get I/O level error.
*/
oatpp::data::v_io_size ioStatus;
/**
* Configurable arbitrary data type.
*/
Status status;
};
/**
* Protocol Error template.
* @tparam Status - arbitrary data type.
@ -66,41 +101,10 @@ public:
template<class Status>
class ProtocolError : public CommunicationError {
public:
/**
* Protocol Error Info.
* Cenvenience typedef for ProtocolErrorInfo
*/
struct Info {
/**
* Default Constructor.
*/
Info()
: ioStatus(0)
{}
/**
* Constructor.
* @param pIOStatus - I/O level error. See &id:oatpp::data::v_io_size;.
* @param pStatus - configurable arbitrary data type.
*/
Info(oatpp::data::v_io_size pIOStatus, const Status& pStatus)
: ioStatus(pIOStatus)
, status(pStatus)
{}
/**
* Get I/O level error.
*/
oatpp::data::v_io_size ioStatus;
/**
* Configurable arbitrary data type.
*/
Status status;
};
typedef ProtocolErrorInfo<Status> Info;
private:
Info m_info;
public:
@ -124,6 +128,52 @@ public:
}
};
/**
* Protocol Error template.
* @tparam Status - arbitrary data type.
*/
template<class Status>
class AsyncProtocolError : public oatpp::data::AsyncIOError {
public:
/**
* Cenvenience typedef for ProtocolErrorInfo
*/
typedef ProtocolErrorInfo<Status> Info;
private:
Info m_info;
oatpp::String m_message;
public:
/**
* Constructor.
* @param info - &l:ProtocolError::Info ;.
* @param message - error message.
*/
AsyncProtocolError(const Info& info, const oatpp::String& message)
: oatpp::data::AsyncIOError("AsyncProtocolError", info.ioStatus)
, m_info(info)
, m_message(message)
{}
/**
* Error message.
* @return - error message.
*/
oatpp::String getMessage() {
return m_message;
}
/**
* Get error info.
* @return - error info.
*/
Info getInfo() {
return m_info;
}
};
}}}

View File

@ -115,7 +115,7 @@ public:
* @return - &id:oatpp::async::Action;.
*/
virtual oatpp::async::Action decodeAsync(oatpp::async::AbstractCoroutine* parentCoroutine,
const oatpp::async::Action& actionOnReturn,
oatpp::async::Action&& actionOnReturn,
const Headers& headers,
const std::shared_ptr<oatpp::data::stream::InputStream>& bodyStream,
const std::shared_ptr<oatpp::data::stream::OutputStream>& toStream) const = 0;

View File

@ -114,9 +114,9 @@ oatpp::String Request::readBodyToString() const {
}
oatpp::async::Action Request::streamBodyAsync(oatpp::async::AbstractCoroutine* parentCoroutine,
const oatpp::async::Action& actionOnReturn,
oatpp::async::Action&& actionOnReturn,
const std::shared_ptr<oatpp::data::stream::OutputStream>& toStream) const {
return m_bodyDecoder->decodeAsync(parentCoroutine, actionOnReturn, m_headers, m_bodyStream, toStream);
return m_bodyDecoder->decodeAsync(parentCoroutine, std::forward<oatpp::async::Action>(actionOnReturn), m_headers, m_bodyStream, toStream);
}
}}}}}

View File

@ -195,7 +195,7 @@ public:
* @return Start Coroutine Action
*/
oatpp::async::Action streamBodyAsync(oatpp::async::AbstractCoroutine* parentCoroutine,
const oatpp::async::Action& actionOnReturn,
oatpp::async::Action&& actionOnReturn,
const std::shared_ptr<oatpp::data::stream::OutputStream>& toStream) const;
/**

View File

@ -128,7 +128,7 @@ RequestHeadersReader::Action RequestHeadersReader::readHeadersAsync(oatpp::async
if(m_progress + desiredToRead > m_maxHeadersSize) {
desiredToRead = m_maxHeadersSize - m_progress;
if(desiredToRead <= 0) {
return error("Headers section is too large");
return error<Error>("[oatpp::web::protocol::http::incoming::RequestHeadersReader::readHeadersAsync()]: Error. Headers section is too large.");
}
}
@ -151,8 +151,10 @@ RequestHeadersReader::Action RequestHeadersReader::readHeadersAsync(oatpp::async
} else if(res == data::IOError::WAIT_RETRY || res == data::IOError::RETRY) {
return waitRetry();
} else if(res == data::IOError::BROKEN_PIPE){
return error<Error>("Connection Closed.");
} else {
return abort();
return error<Error>("[oatpp::web::protocol::http::incoming::RequestHeadersReader::readHeadersAsync()]: Error. Error reading connection stream.");
}
}
@ -168,10 +170,10 @@ RequestHeadersReader::Action RequestHeadersReader::readHeadersAsync(oatpp::async
if(status.code == 0) {
return _return(m_result);
} else {
return error("[oatpp::web::protocol::http::incoming::RequestHeadersReader::readHeadersAsync()]: Error. Error occurred while parsing headers");
return error<Error>("[oatpp::web::protocol::http::incoming::RequestHeadersReader::readHeadersAsync()]: Error. Error occurred while parsing headers.");
}
} else {
return error("[oatpp::web::protocol::http::incoming::RequestHeadersReader::readHeadersAsync()]: Error. Can't parse starting line");
return error<Error>("[oatpp::web::protocol::http::incoming::RequestHeadersReader::readHeadersAsync()]: Error. Can't parse starting line.");
}
}

View File

@ -75,9 +75,9 @@ oatpp::String Response::readBodyToString() const {
}
oatpp::async::Action Response::streamBodyAsync(oatpp::async::AbstractCoroutine* parentCoroutine,
const oatpp::async::Action& actionOnReturn,
const std::shared_ptr<oatpp::data::stream::OutputStream>& toStream) const {
return m_bodyDecoder->decodeAsync(parentCoroutine, actionOnReturn, m_headers, m_bodyStream, toStream);
oatpp::async::Action&& actionOnReturn,
const std::shared_ptr<oatpp::data::stream::OutputStream>& toStream) const {
return m_bodyDecoder->decodeAsync(parentCoroutine, std::forward<oatpp::async::Action>(actionOnReturn), m_headers, m_bodyStream, toStream);
}
}}}}}

View File

@ -147,7 +147,7 @@ public:
* @return - &id:oatpp::async::Action;.
*/
oatpp::async::Action streamBodyAsync(oatpp::async::AbstractCoroutine* parentCoroutine,
const oatpp::async::Action& actionOnReturn,
oatpp::async::Action&& actionOnReturn,
const std::shared_ptr<oatpp::data::stream::OutputStream>& toStream) const;
/**

View File

@ -128,7 +128,7 @@ ResponseHeadersReader::Action ResponseHeadersReader::readHeadersAsync(oatpp::asy
if(m_progress + desiredToRead > m_maxHeadersSize) {
desiredToRead = m_maxHeadersSize - m_progress;
if(desiredToRead <= 0) {
return error("Headers section is too large");
return error<Error>("[oatpp::web::protocol::http::incoming::RequestHeadersReader::readHeadersAsync()]: Error. Headers section is too large.");
}
}
@ -151,8 +151,10 @@ ResponseHeadersReader::Action ResponseHeadersReader::readHeadersAsync(oatpp::asy
} else if(res == data::IOError::WAIT_RETRY || res == data::IOError::RETRY) {
return waitRetry();
} else if(res == data::IOError::BROKEN_PIPE) {
return error<Error>("Connection closed");
} else {
return abort();
return error<Error>("[oatpp::web::protocol::http::incoming::ResponseHeadersReader::readHeadersAsync()]: Error. Error reading connection stream.");
}
}
@ -168,10 +170,10 @@ ResponseHeadersReader::Action ResponseHeadersReader::readHeadersAsync(oatpp::asy
if(status.code == 0) {
return _return(m_result);
} else {
return error("[oatpp::web::protocol::http::incoming::ResponseHeadersReader::readHeadersAsync()]: Error. Error occurred while parsing headers");
return error<Error>("[oatpp::web::protocol::http::incoming::ResponseHeadersReader::readHeadersAsync()]: Error. Error occurred while parsing headers.");
}
} else {
return error("[oatpp::web::protocol::http::incoming::ResponseHeadersReader::readHeadersAsync()]: Error. Can't parse starting line");
return error<Error>("[oatpp::web::protocol::http::incoming::ResponseHeadersReader::readHeadersAsync()]: Error. Can't parse starting line.");
}
}

View File

@ -111,7 +111,7 @@ void SimpleBodyDecoder::decode(const Headers& headers,
}
oatpp::async::Action SimpleBodyDecoder::doChunkedDecodingAsync(oatpp::async::AbstractCoroutine* parentCoroutine,
const oatpp::async::Action& actionOnReturn,
oatpp::async::Action&& actionOnReturn,
const std::shared_ptr<oatpp::data::stream::InputStream>& fromStream,
const std::shared_ptr<oatpp::data::stream::OutputStream>& toStream) {
@ -153,11 +153,11 @@ oatpp::async::Action SimpleBodyDecoder::doChunkedDecodingAsync(oatpp::async::Abs
Action readLineChar() {
auto res = m_fromStream->read(&m_lineChar, 1);
if(res == data::IOError::WAIT_RETRY) {
return oatpp::async::Action::_WAIT_RETRY;
return oatpp::async::Action::TYPE_WAIT_RETRY;
} else if(res == data::IOError::RETRY) {
return oatpp::async::Action::_REPEAT;
return oatpp::async::Action::TYPE_REPEAT;
} else if( res < 0) {
return error("[BodyDecoder::ChunkedDecoder] Can't read line char");
return error<Error>("[BodyDecoder::ChunkedDecoder] Can't read line char");
}
return yieldTo(&ChunkedDecoder::onLineCharRead);
}
@ -166,7 +166,7 @@ oatpp::async::Action SimpleBodyDecoder::doChunkedDecodingAsync(oatpp::async::Abs
if(!m_lineEnding) {
if(m_lineChar != '\r') {
if(m_currLineLength + 1 > MAX_LINE_SIZE){
return error("[BodyDecoder::ChunkedDecoder] too long line");
return error<Error>("[BodyDecoder::ChunkedDecoder] too long line");
}
m_lineBuffer[m_currLineLength ++] = m_lineChar;
return yieldTo(&ChunkedDecoder::readLineChar);
@ -180,7 +180,7 @@ oatpp::async::Action SimpleBodyDecoder::doChunkedDecodingAsync(oatpp::async::Abs
}
}
if(m_currLineLength == 0) {
return error("Error reading stream. 0-length line");
return error<Error>("Error reading stream. 0-length line");
}
m_lineBuffer[m_currLineLength] = 0;
return yieldTo(&ChunkedDecoder::onLineRead);
@ -199,12 +199,14 @@ oatpp::async::Action SimpleBodyDecoder::doChunkedDecodingAsync(oatpp::async::Abs
Action skipRN() {
if(m_done) {
return oatpp::data::stream::readExactSizeDataAsyncInline(m_fromStream.get(),
return oatpp::data::stream::readExactSizeDataAsyncInline(this,
m_fromStream.get(),
m_skipData,
m_skipSize,
finish());
} else {
return oatpp::data::stream::readExactSizeDataAsyncInline(m_fromStream.get(),
return oatpp::data::stream::readExactSizeDataAsyncInline(this,
m_fromStream.get(),
m_skipData,
m_skipSize,
yieldTo(&ChunkedDecoder::readLineChar));
@ -213,38 +215,38 @@ oatpp::async::Action SimpleBodyDecoder::doChunkedDecodingAsync(oatpp::async::Abs
};
return parentCoroutine->startCoroutine<ChunkedDecoder>(actionOnReturn, fromStream, toStream);
return parentCoroutine->startCoroutine<ChunkedDecoder>(std::forward<oatpp::async::Action>(actionOnReturn), fromStream, toStream);
}
oatpp::async::Action SimpleBodyDecoder::decodeAsync(oatpp::async::AbstractCoroutine* parentCoroutine,
const oatpp::async::Action& actionOnReturn,
oatpp::async::Action&& actionOnReturn,
const Headers& headers,
const std::shared_ptr<oatpp::data::stream::InputStream>& bodyStream,
const std::shared_ptr<oatpp::data::stream::OutputStream>& toStream) const {
auto transferEncodingIt = headers.find(Header::TRANSFER_ENCODING);
if(transferEncodingIt != headers.end() && transferEncodingIt->second == Header::Value::TRANSFER_ENCODING_CHUNKED) {
return doChunkedDecodingAsync(parentCoroutine, actionOnReturn, bodyStream, toStream);
return doChunkedDecodingAsync(parentCoroutine, std::forward<oatpp::async::Action>(actionOnReturn), bodyStream, toStream);
} else {
oatpp::data::v_io_size contentLength = 0;
auto contentLengthStrIt = headers.find(Header::CONTENT_LENGTH);
if(contentLengthStrIt == headers.end()) {
return actionOnReturn; // DO NOTHING // it is an empty or invalid body
return std::forward<oatpp::async::Action>(actionOnReturn); // DO NOTHING // it is an empty or invalid body
} else {
bool success;
contentLength = oatpp::utils::conversion::strToInt64(contentLengthStrIt->second.toString(), success);
if(!success){
return oatpp::async::Action(oatpp::async::Error("Invalid 'Content-Length' Header"));
return parentCoroutine->error<oatpp::async::Error>("Invalid 'Content-Length' Header");
}
if(contentLength > 0) {
return oatpp::data::stream::transferAsync(parentCoroutine,
actionOnReturn,
std::forward<oatpp::async::Action>(actionOnReturn),
bodyStream,
toStream,
contentLength,
oatpp::data::buffer::IOBuffer::createShared());
} else {
return actionOnReturn; // DO NOTHING
return std::forward<oatpp::async::Action>(actionOnReturn); // DO NOTHING
}
}
}

View File

@ -41,7 +41,7 @@ private:
const std::shared_ptr<oatpp::data::stream::OutputStream>& toStream);
static oatpp::async::Action doChunkedDecodingAsync(oatpp::async::AbstractCoroutine* parentCoroutine,
const oatpp::async::Action& actionOnReturn,
oatpp::async::Action&& actionOnReturn,
const std::shared_ptr<oatpp::data::stream::InputStream>& fromStream,
const std::shared_ptr<oatpp::data::stream::OutputStream>& toStream);
public:
@ -66,7 +66,7 @@ public:
* @return - &id:oatpp::async::Action;.
*/
oatpp::async::Action decodeAsync(oatpp::async::AbstractCoroutine* parentCoroutine,
const oatpp::async::Action& actionOnReturn,
oatpp::async::Action&& actionOnReturn,
const Headers& headers,
const std::shared_ptr<oatpp::data::stream::InputStream>& bodyStream,
const std::shared_ptr<oatpp::data::stream::OutputStream>& toStream) const override;

View File

@ -69,7 +69,7 @@ public:
* @return - &id:oatpp::async::Action;.
*/
virtual Action writeToStreamAsync(oatpp::async::AbstractCoroutine* parentCoroutine,
const Action& actionOnReturn,
Action&& actionOnReturn,
const std::shared_ptr<OutputStream>& stream) = 0;
};

View File

@ -35,7 +35,7 @@ BufferBody::WriteToStreamCoroutine::WriteToStreamCoroutine(const std::shared_ptr
{}
async::Action BufferBody::WriteToStreamCoroutine::act() {
return oatpp::data::stream::writeExactSizeDataAsyncInline(m_stream.get(), m_currData, m_currDataSize, finish());
return oatpp::data::stream::writeExactSizeDataAsyncInline(this, m_stream.get(), m_currData, m_currDataSize, finish());
}
BufferBody::BufferBody(const oatpp::String& buffer)
@ -56,9 +56,9 @@ void BufferBody::writeToStream(const std::shared_ptr<OutputStream>& stream) noex
async::Action BufferBody::writeToStreamAsync(oatpp::async::AbstractCoroutine* parentCoroutine,
const Action& actionOnReturn,
Action&& actionOnReturn,
const std::shared_ptr<OutputStream>& stream) {
return parentCoroutine->startCoroutine<WriteToStreamCoroutine>(actionOnReturn, shared_from_this(), stream);
return parentCoroutine->startCoroutine<WriteToStreamCoroutine>(std::forward<oatpp::async::Action>(actionOnReturn), shared_from_this(), stream);
}
}}}}}

View File

@ -99,7 +99,7 @@ public:
* @return - &id:oatpp::async::Action;
*/
Action writeToStreamAsync(oatpp::async::AbstractCoroutine* parentCoroutine,
const Action& actionOnReturn,
Action&& actionOnReturn,
const std::shared_ptr<OutputStream>& stream) override;
};

View File

@ -70,7 +70,7 @@ ChunkedBufferBody::WriteToStreamCoroutine::WriteToStreamCoroutine(const std::sha
, m_currChunk(m_chunks->getFirstNode())
, m_currData(nullptr)
, m_currDataSize(0)
, m_nextAction(Action(Action::TYPE_FINISH, nullptr, nullptr))
, m_nextAction(Action::TYPE_FINISH)
{}
async::Action ChunkedBufferBody::WriteToStreamCoroutine::act() {
@ -110,16 +110,16 @@ async::Action ChunkedBufferBody::WriteToStreamCoroutine::writeEndOfChunks() {
}
async::Action ChunkedBufferBody::WriteToStreamCoroutine::writeCurrData() {
return oatpp::data::stream::writeExactSizeDataAsyncInline(m_stream.get(), m_currData, m_currDataSize, m_nextAction);
return oatpp::data::stream::writeExactSizeDataAsyncInline(this, m_stream.get(), m_currData, m_currDataSize, Action::clone(m_nextAction));
}
async::Action ChunkedBufferBody::writeToStreamAsync(oatpp::async::AbstractCoroutine* parentCoroutine,
const Action& actionOnFinish,
Action&& actionOnFinish,
const std::shared_ptr<OutputStream>& stream) {
if(m_chunked) {
return parentCoroutine->startCoroutine<WriteToStreamCoroutine>(actionOnFinish, shared_from_this(), stream);
return parentCoroutine->startCoroutine<WriteToStreamCoroutine>(std::forward<oatpp::async::Action>(actionOnFinish), shared_from_this(), stream);
} else {
return m_buffer->flushToStreamAsync(parentCoroutine, actionOnFinish, stream);
return m_buffer->flushToStreamAsync(parentCoroutine, std::forward<oatpp::async::Action>(actionOnFinish), stream);
}
}

View File

@ -117,7 +117,7 @@ public:
* @return - &id:oatpp::async::Action;
*/
Action writeToStreamAsync(oatpp::async::AbstractCoroutine* parentCoroutine,
const Action& actionOnFinish,
Action&& actionOnFinish,
const std::shared_ptr<OutputStream>& stream) override;
};

View File

@ -105,7 +105,7 @@ void Request::send(const std::shared_ptr<data::stream::OutputStream>& stream){
}
oatpp::async::Action Request::sendAsync(oatpp::async::AbstractCoroutine* parentCoroutine,
const oatpp::async::Action& actionOnFinish,
oatpp::async::Action&& actionOnFinish,
const std::shared_ptr<data::stream::OutputStream>& stream){
class SendAsyncCoroutine : public oatpp::async::Coroutine<SendAsyncCoroutine> {
@ -165,7 +165,7 @@ oatpp::async::Action Request::sendAsync(oatpp::async::AbstractCoroutine* parentC
};
return parentCoroutine->startCoroutine<SendAsyncCoroutine>(actionOnFinish, shared_from_this(), stream);
return parentCoroutine->startCoroutine<SendAsyncCoroutine>(std::forward<oatpp::async::Action>(actionOnFinish), shared_from_this(), stream);
}

View File

@ -130,7 +130,7 @@ public:
* @return - &id:oatpp::async::Action;.
*/
oatpp::async::Action sendAsync(oatpp::async::AbstractCoroutine* parentCoroutine,
const oatpp::async::Action& actionOnFinish,
oatpp::async::Action&& actionOnFinish,
const std::shared_ptr<data::stream::OutputStream>& stream);
};

View File

@ -99,7 +99,7 @@ void Response::send(const std::shared_ptr<data::stream::OutputStream>& stream) {
}
oatpp::async::Action Response::sendAsync(oatpp::async::AbstractCoroutine* parentCoroutine,
const oatpp::async::Action& actionOnFinish,
oatpp::async::Action&& actionOnFinish,
const std::shared_ptr<data::stream::OutputStream>& stream){
class SendAsyncCoroutine : public oatpp::async::Coroutine<SendAsyncCoroutine> {
@ -158,7 +158,7 @@ oatpp::async::Action Response::sendAsync(oatpp::async::AbstractCoroutine* parent
};
return parentCoroutine->startCoroutine<SendAsyncCoroutine>(actionOnFinish, shared_from_this(), stream);
return parentCoroutine->startCoroutine<SendAsyncCoroutine>(std::forward<oatpp::async::Action>(actionOnFinish), shared_from_this(), stream);
}

View File

@ -122,7 +122,7 @@ public:
* @return - &id:oatpp::async::Action;.
*/
oatpp::async::Action sendAsync(oatpp::async::AbstractCoroutine* parentCoroutine,
const oatpp::async::Action& actionOnFinish,
oatpp::async::Action&& actionOnFinish,
const std::shared_ptr<data::stream::OutputStream>& stream);
};

View File

@ -177,32 +177,25 @@ HttpProcessor::Coroutine::Action HttpProcessor::Coroutine::onRequestDone() {
}
}
return abort();
return finish();
}
HttpProcessor::Coroutine::Action HttpProcessor::Coroutine::handleError(const oatpp::async::Error& error) {
if(m_currentResponse) {
if(error.isExceptionThrown) {
OATPP_LOGE("Server", "Unhandled exception. Dropping connection");
} else {
OATPP_LOGE("Server", "Unhandled error. '%s'. Dropping connection", error.message);
HttpProcessor::Coroutine::Action HttpProcessor::Coroutine::handleError(const std::shared_ptr<const Error>& error) {
if(error) {
if(m_currentResponse) {
OATPP_LOGE("[oatpp::web::server::HttpProcessor::Coroutine::handleError()]", "Unhandled error. '%s'. Dropping connection", error->what());
return Action::TYPE_ERROR;
}
return abort();
m_currentResponse = m_errorHandler->handleError(protocol::http::Status::CODE_500, error->what());
return yieldTo(&HttpProcessor::Coroutine::onResponseFormed);
}
if (error.isExceptionThrown) {
try{
throw;
} catch (oatpp::web::protocol::http::HttpError& error) {
m_currentResponse = m_errorHandler->handleError(error.getInfo().status, error.getMessage());
} catch (std::exception& error) {
m_currentResponse = m_errorHandler->handleError(protocol::http::Status::CODE_500, error.what());
} catch (...) {
m_currentResponse = m_errorHandler->handleError(protocol::http::Status::CODE_500, "Unknown error");
}
} else {
m_currentResponse = m_errorHandler->handleError(protocol::http::Status::CODE_500, error.message);
}
return yieldTo(&HttpProcessor::Coroutine::onResponseFormed);
return Action::TYPE_ERROR;
}
}}}

View File

@ -110,7 +110,7 @@ public:
Action onResponseFormed();
Action onRequestDone();
Action handleError(const oatpp::async::Error& error) override;
Action handleError(const std::shared_ptr<const Error>& error) override;
};

View File

@ -30,6 +30,10 @@
#include "oatpp/core/concurrency/SpinLock.hpp"
#include "oatpp/core/base/Environment.hpp"
#include "oatpp/core/async/Coroutine.hpp"
#include "oatpp/core/Types.hpp"
#include <iostream>
#ifdef OATPP_ENABLE_ALL_TESTS_MAIN
@ -54,6 +58,7 @@ public:
void runTests() {
oatpp::base::Environment::printCompilationConfig();
/*
OATPP_RUN_TEST(oatpp::test::base::RegRuleTest);
OATPP_RUN_TEST(oatpp::test::base::CommandLineArgumentsTest);
@ -83,6 +88,10 @@ void runTests() {
OATPP_RUN_TEST(oatpp::test::web::FullTest);
OATPP_RUN_TEST(oatpp::test::web::FullAsyncTest);
*/
//OATPP_LOGD("aa", "fps=%d", sizeof(std::shared_ptr<oatpp::base::StrBuffer>));
OATPP_RUN_TEST(oatpp::test::web::FullAsyncClientTest);
}

View File

@ -58,7 +58,7 @@ class TestComponent {
public:
OATPP_CREATE_COMPONENT(std::shared_ptr<oatpp::async::Executor>, executor)([] {
return std::make_shared<oatpp::async::Executor>(10);
return std::make_shared<oatpp::async::Executor>(1);
}());
OATPP_CREATE_COMPONENT(std::shared_ptr<oatpp::network::virtual_::Interface>, virtualInterface)([] {
@ -132,19 +132,11 @@ public:
return finish();
}
Action handleError(const async::Error& error) override {
if(error.isExceptionThrown) {
try {
throw;
} catch (const std::runtime_error& e) {
OATPP_LOGD("[FullAsyncClientTest::ClientCoroutine_getRootAsync::handleError()]", "Exception. %s.", e.what());
} catch (...) {
OATPP_LOGD("[FullAsyncClientTest::ClientCoroutine_getRootAsync::handleError()]", "Exception. Unknown.");
}
} else {
OATPP_LOGD("[FullAsyncClientTest::ClientCoroutine_getRootAsync::handleError()]", "Error. %s", error.message);
Action handleError(const std::shared_ptr<const Error>& error) override {
if(error) {
OATPP_LOGD("[FullAsyncClientTest::ClientCoroutine_getRootAsync::handleError()]", "Error. %s", error->what());
}
return error;
return Action::TYPE_ERROR;
}
};
@ -183,19 +175,11 @@ public:
return finish();
}
Action handleError(const async::Error& error) override {
if(error.isExceptionThrown) {
try {
throw;
} catch (const std::runtime_error& e) {
OATPP_LOGD("[FullAsyncClientTest::ClientCoroutine_echoBodyAsync::handleError()]", "Exception. %s.", e.what());
} catch (...) {
OATPP_LOGD("[FullAsyncClientTest::ClientCoroutine_echoBodyAsync::handleError()]", "Exception. Unknown.");
}
} else {
OATPP_LOGD("[FullAsyncClientTest::ClientCoroutine_echoBodyAsync::handleError()]", "Error. %s", error.message);
Action handleError(const std::shared_ptr<const Error>& error) override {
if(error) {
OATPP_LOGD("[FullAsyncClientTest::ClientCoroutine_echoBodyAsync::handleError()]", "Error. %s", error->what());
}
return error;
return Action::TYPE_ERROR;
}
};
@ -222,11 +206,11 @@ void FullAsyncClientTest::onRun() {
ClientCoroutine_getRootAsync::SUCCESS_COUNTER = 0;
ClientCoroutine_echoBodyAsync::SUCCESS_COUNTER = 0;
v_int32 iterations = 10000;
v_int32 iterations = 1;
for(v_int32 i = 0; i < iterations; i++) {
executor->execute<ClientCoroutine_getRootAsync>();
executor->execute<ClientCoroutine_echoBodyAsync>();
//executor->execute<ClientCoroutine_echoBodyAsync>();
}
while(

View File

@ -54,7 +54,7 @@ public:
ENDPOINT_ASYNC_INIT(Root)
Action act() {
//OATPP_LOGD(TAG, "GET '/'");
OATPP_LOGD(TAG, "GET '/'");
return _return(controller->createResponse(Status::CODE_200, "Hello World Async!!!"));
}