ApiClient. Support retries. Introduce RetryPolicy.

This commit is contained in:
lganzzzo 2019-11-05 04:23:55 +02:00
parent 678e6a9419
commit 85a70499f9
10 changed files with 326 additions and 13 deletions

View File

@ -154,6 +154,8 @@ add_library(oatpp
oatpp/web/client/HttpRequestExecutor.hpp
oatpp/web/client/RequestExecutor.cpp
oatpp/web/client/RequestExecutor.hpp
oatpp/web/client/RetryPolicy.cpp
oatpp/web/client/RetryPolicy.hpp
oatpp/web/mime/multipart/FileStreamProvider.cpp
oatpp/web/mime/multipart/FileStreamProvider.hpp
oatpp/web/mime/multipart/InMemoryPartReader.cpp

View File

@ -110,7 +110,7 @@ bool ConnectionPool::ConnectionWrapper::isValid() {
ConnectionPool::ConnectionPool(const std::shared_ptr<ConnectionProvider>& connectionProvider,
v_int64 maxConnections,
const std::chrono::duration<v_int64, std::micro>& maxConnectionTTL)
: m_pool(std::make_shared<Pool>(maxConnections, std::chrono::duration_cast<std::chrono::microseconds>(maxConnectionTTL).count()))
: m_pool(std::make_shared<Pool>(maxConnections, maxConnectionTTL.count()))
, m_connectionProvider(connectionProvider)
{

View File

@ -146,6 +146,9 @@ oatpp::async::CoroutineStarterForResult<const std::shared_ptr<RequestExecutor::C
return m_requestExecutor->getConnectionAsync();
}
void ApiClient::invalidateConnection(const std::shared_ptr<RequestExecutor::ConnectionHandle>& connectionHandle) {
m_requestExecutor->invalidateConnection(connectionHandle);
}
std::shared_ptr<ApiClient::Response> ApiClient::executeRequest(const oatpp::String& method,
const PathPattern& pathPattern,

View File

@ -193,6 +193,11 @@ public:
*/
virtual oatpp::async::CoroutineStarterForResult<const std::shared_ptr<RequestExecutor::ConnectionHandle>&> getConnectionAsync();
/**
* Invalidate connection.
* @param connectionHandle
*/
void invalidateConnection(const std::shared_ptr<RequestExecutor::ConnectionHandle>& connectionHandle);
virtual std::shared_ptr<Response> executeRequest(const oatpp::String& method,
const PathPattern& pathPattern,

View File

@ -46,16 +46,19 @@
namespace oatpp { namespace web { namespace client {
HttpRequestExecutor::HttpRequestExecutor(const std::shared_ptr<ClientConnectionProvider>& connectionProvider,
const std::shared_ptr<RetryPolicy>& retryPolicy,
const std::shared_ptr<const BodyDecoder>& bodyDecoder)
: m_connectionProvider(connectionProvider)
: RequestExecutor(retryPolicy)
, m_connectionProvider(connectionProvider)
, m_bodyDecoder(bodyDecoder)
{}
std::shared_ptr<HttpRequestExecutor>
HttpRequestExecutor::createShared(const std::shared_ptr<ClientConnectionProvider>& connectionProvider,
const std::shared_ptr<RetryPolicy>& retryPolicy,
const std::shared_ptr<const BodyDecoder>& bodyDecoder)
{
return std::make_shared<HttpRequestExecutor>(connectionProvider, bodyDecoder);
return std::make_shared<HttpRequestExecutor>(connectionProvider, retryPolicy, bodyDecoder);
}
std::shared_ptr<HttpRequestExecutor::ConnectionHandle> HttpRequestExecutor::getConnection() {
@ -134,11 +137,13 @@ HttpRequestExecutor::executeOnce(const String& method,
const auto& result = headerReader.readHeaders(connection, error);
if(error.status.code != 0) {
invalidateConnection(connectionHandle);
throw RequestExecutionError(RequestExecutionError::ERROR_CODE_CANT_PARSE_STARTING_LINE,
"[oatpp::web::client::HttpRequestExecutor::executeOnce()]: Failed to parse response. Invalid response headers");
}
if(error.ioStatus < 0) {
invalidateConnection(connectionHandle);
throw RequestExecutionError(RequestExecutionError::ERROR_CODE_CANT_PARSE_STARTING_LINE,
"[oatpp::web::client::HttpRequestExecutor::executeOnce()]: Failed to read response.");
}
@ -168,7 +173,7 @@ HttpRequestExecutor::executeOnceAsync(const String& method,
private:
typedef oatpp::web::protocol::http::outgoing::Request OutgoingRequest;
private:
std::shared_ptr<ClientConnectionProvider> m_connectionProvider;
HttpRequestExecutor* m_this;
String m_method;
String m_path;
Headers m_headers;
@ -181,14 +186,14 @@ HttpRequestExecutor::executeOnceAsync(const String& method,
oatpp::data::share::MemoryLabel m_buffer;
public:
ExecutorCoroutine(const std::shared_ptr<ClientConnectionProvider>& connectionProvider,
ExecutorCoroutine(HttpRequestExecutor* _this,
const String& method,
const String& path,
const Headers& headers,
const std::shared_ptr<Body>& body,
const std::shared_ptr<const BodyDecoder>& bodyDecoder,
const std::shared_ptr<ConnectionHandle>& connectionHandle)
: m_connectionProvider(connectionProvider)
: m_this(_this)
, m_method(method)
, m_path(path)
, m_headers(headers)
@ -209,7 +214,7 @@ HttpRequestExecutor::executeOnceAsync(const String& method,
}
auto request = OutgoingRequest::createShared(m_method, m_path, m_headers, m_body);
request->putHeaderIfNotExists(Header::HOST, m_connectionProvider->getProperty("host"));
request->putHeaderIfNotExists(Header::HOST, m_this->m_connectionProvider->getProperty("host"));
request->putHeaderIfNotExists(Header::CONNECTION, Header::Value::CONNECTION_KEEP_ALIVE);
m_buffer = oatpp::data::share::MemoryLabel(oatpp::base::StrBuffer::createShared(oatpp::data::buffer::IOBuffer::BUFFER_SIZE));
m_upstream = oatpp::data::stream::OutputStreamBufferedProxy::createShared(m_connection, m_buffer);
@ -235,10 +240,20 @@ HttpRequestExecutor::executeOnceAsync(const String& method,
result.headers, bodyStream, m_bodyDecoder));
}
Action handleError(oatpp::async::Error* error) override {
if(m_connectionHandle) {
m_this->invalidateConnection(m_connectionHandle);
}
return error;
}
};
return ExecutorCoroutine::startForResult(m_connectionProvider, method, path, headers, body, m_bodyDecoder, connectionHandle);
return ExecutorCoroutine::startForResult(this, method, path, headers, body, m_bodyDecoder, connectionHandle);
}

View File

@ -74,6 +74,7 @@ public:
* @param bodyDecoder - &id:oatpp::web::protocol::http::incoming::BodyDecoder;.
*/
HttpRequestExecutor(const std::shared_ptr<ClientConnectionProvider>& connectionProvider,
const std::shared_ptr<RetryPolicy>& retryPolicy = nullptr,
const std::shared_ptr<const BodyDecoder>& bodyDecoder =
std::make_shared<oatpp::web::protocol::http::incoming::SimpleBodyDecoder>());
public:
@ -86,6 +87,7 @@ public:
*/
static std::shared_ptr<HttpRequestExecutor>
createShared(const std::shared_ptr<ClientConnectionProvider>& connectionProvider,
const std::shared_ptr<RetryPolicy>& retryPolicy = nullptr,
const std::shared_ptr<const BodyDecoder>& bodyDecoder =
std::make_shared<oatpp::web::protocol::http::incoming::SimpleBodyDecoder>());

View File

@ -24,6 +24,9 @@
#include "RequestExecutor.hpp"
#include <thread>
#include <chrono>
namespace oatpp { namespace web { namespace client {
RequestExecutor::RequestExecutionError::RequestExecutionError(v_int32 errorCode, const char* message, v_int32 readErrorCode)
@ -45,6 +48,10 @@ v_int32 RequestExecutor::RequestExecutionError::getReadErrorCode() const {
return m_readErrorCode;
}
RequestExecutor::RequestExecutor(const std::shared_ptr<RetryPolicy>& retryPolicy)
: m_retryPolicy(retryPolicy)
{}
std::shared_ptr<RequestExecutor::Response> RequestExecutor::execute(
const String& method,
const String& path,
@ -53,12 +60,58 @@ std::shared_ptr<RequestExecutor::Response> RequestExecutor::execute(
const std::shared_ptr<ConnectionHandle>& connectionHandle
) {
auto ch = connectionHandle;
if(!ch) {
ch = getConnection();
if(!m_retryPolicy) {
auto ch = connectionHandle;
if (!ch) {
ch = getConnection();
}
return executeOnce(method, path, headers, body, ch);
} else {
RetryPolicy::Context context;
auto ch = connectionHandle;
while(true) {
context.attempt ++;
try {
if (!ch) {
ch = getConnection();
}
auto response = executeOnce(method, path, headers, body, ch);
if(!m_retryPolicy->retryOnResponse(response->getStatusCode(), context) || !m_retryPolicy->canRetry(context)) {
return response;
}
} catch (...) {
if(!m_retryPolicy->canRetry(context)) {
break;
}
}
invalidateConnection(ch);
ch.reset();
v_int64 waitMicro = m_retryPolicy->waitForMicroseconds(context);
v_int64 tick0 = oatpp::base::Environment::getMicroTickCount();
v_int64 tick = tick0;
while(tick < tick0 + waitMicro) {
std::this_thread::sleep_for(std::chrono::microseconds(tick0 + waitMicro - tick));
tick = oatpp::base::Environment::getMicroTickCount();
}
}
}
return executeOnce(method, path, headers, body, ch);
return nullptr;
}
@ -79,6 +132,8 @@ RequestExecutor::executeAsync(
Headers m_headers;
std::shared_ptr<Body> m_body;
std::shared_ptr<ConnectionHandle> m_connectionHandle;
bool m_slept;
RetryPolicy::Context m_context;
public:
ExecutorCoroutine(RequestExecutor* _this,
@ -93,13 +148,19 @@ RequestExecutor::executeAsync(
, m_headers(headers)
, m_body(body)
, m_connectionHandle(connectionHandle)
, m_slept(false)
{}
Action act() override {
m_slept = false;
if(!m_connectionHandle) {
return m_this->getConnectionAsync().callbackTo(&ExecutorCoroutine::onConnection);
}
return yieldTo(&ExecutorCoroutine::execute);
}
Action onConnection(const std::shared_ptr<ConnectionHandle>& connectionHandle) {
@ -108,13 +169,52 @@ RequestExecutor::executeAsync(
}
Action execute() {
m_context.attempt ++;
return m_this->executeOnceAsync(m_method, m_path, m_headers, m_body, m_connectionHandle).callbackTo(&ExecutorCoroutine::onResponse);
}
Action onResponse(const std::shared_ptr<RequestExecutor::Response>& response) {
if( m_this->m_retryPolicy &&
m_this->m_retryPolicy->retryOnResponse(response->getStatusCode(), m_context) &&
m_this->m_retryPolicy->canRetry(m_context)
) {
return yieldTo(&ExecutorCoroutine::retry);
}
return _return(response);
}
Action retry() {
if(m_connectionHandle) {
m_this->invalidateConnection(m_connectionHandle);
m_connectionHandle.reset();
}
if(!m_slept) {
m_slept = true;
return waitRepeat(std::chrono::microseconds(m_this->m_retryPolicy->waitForMicroseconds(m_context)));
}
return yieldTo(&ExecutorCoroutine::act);
}
Action handleError(Error* error) override {
if(m_this->m_retryPolicy && m_this->m_retryPolicy->canRetry(m_context)) {
return yieldTo(&ExecutorCoroutine::retry);
}
if(m_connectionHandle) {
m_this->invalidateConnection(m_connectionHandle);
m_connectionHandle.reset();
}
return error;
}
};
return ExecutorCoroutine::startForResult(this, method, path, headers, body, connectionHandle);

View File

@ -25,6 +25,8 @@
#ifndef oatpp_web_client_RequestExecutor_hpp
#define oatpp_web_client_RequestExecutor_hpp
#include "RetryPolicy.hpp"
#include "oatpp/web/protocol/http/incoming/Response.hpp"
#include "oatpp/web/protocol/http/outgoing/Body.hpp"
#include "oatpp/web/protocol/http/Http.hpp"
@ -144,9 +146,17 @@ public:
v_int32 getReadErrorCode() const;
};
private:
std::shared_ptr<RetryPolicy> m_retryPolicy;
public:
/**
* Constructor.
* @param retryPolicy - &id:oatpp::web::client::RetryPolicy;.
*/
RequestExecutor(const std::shared_ptr<RetryPolicy>& retryPolicy);
/**
* Virtual destructor.
*/

View File

@ -0,0 +1,49 @@
/***************************************************************************
*
* Project _____ __ ____ _ _
* ( _ ) /__\ (_ _)_| |_ _| |_
* )(_)( /(__)\ )( (_ _)(_ _)
* (_____)(__)(__)(__) |_| |_|
*
*
* Copyright 2018-present, Leonid Stryzhevskyi <lganzzzo@gmail.com>
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
***************************************************************************/
#include "RetryPolicy.hpp"
namespace oatpp { namespace web { namespace client {
SimpleRetryPolicy::SimpleRetryPolicy(v_int64 maxAttempts,
const std::chrono::duration<v_int64, std::micro>& delay,
const std::unordered_set<v_int32>& httpCodes)
: m_maxAttempts(maxAttempts)
, m_delay(delay.count())
, m_httpCodes(httpCodes)
{}
bool SimpleRetryPolicy::canRetry(const Context& context) {
return context.attempt <= m_maxAttempts || m_maxAttempts == -1;
}
bool SimpleRetryPolicy::retryOnResponse(v_int32 responseStatusCode, const Context& context) {
return m_httpCodes.find(responseStatusCode) != m_httpCodes.end();
}
v_int64 SimpleRetryPolicy::waitForMicroseconds(const Context& context) {
return m_delay;
}
}}}

View File

@ -0,0 +1,127 @@
/***************************************************************************
*
* Project _____ __ ____ _ _
* ( _ ) /__\ (_ _)_| |_ _| |_
* )(_)( /(__)\ )( (_ _)(_ _)
* (_____)(__)(__)(__) |_| |_|
*
*
* Copyright 2018-present, Leonid Stryzhevskyi <lganzzzo@gmail.com>
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
***************************************************************************/
#ifndef oatpp_web_client_RetryPolicy_hpp
#define oatpp_web_client_RetryPolicy_hpp
#include "oatpp/core/Types.hpp"
#include <unordered_set>
namespace oatpp { namespace web { namespace client {
/**
* Class to control retries in RequestExecutor.
*/
class RetryPolicy {
public:
/**
* This structure holds information about request attempts.
*/
struct Context {
/**
* Attempt number.
*/
v_int64 attempt = 0;
};
public:
/**
* Virtual destructor.
*/
virtual ~RetryPolicy() = default;
/**
* Check if the context is eligible to retry.
* @param context - &l:RetryPolicy::Context ;.
* @return - `true` - to retry. `false` - do NOT retry.
*/
virtual bool canRetry(const Context& context) = 0;
/**
* Check whether the client should retry for a given response from the server.
* @param responseStatusCode - HTTP status code of the response.
* @param context - &l:RetryPolicy::Context ;.
* @return - `true` - to retry. `false` - do NOT retry.
*/
virtual bool retryOnResponse(v_int32 responseStatusCode, const Context& context) = 0;
/**
* How much client should wait before the next attempt?
* @param context - &l:RetryPolicy::Context ;.
* @return - delay in microseconds.
*/
virtual v_int64 waitForMicroseconds(const Context& context) = 0;
};
class SimpleRetryPolicy : public RetryPolicy {
private:
v_int64 m_maxAttempts;
v_int64 m_delay;
std::unordered_set<v_int32> m_httpCodes;
public:
/**
* Constructor.
* @param maxAttempts - max number of attempts to retry. `-1` - retry infinitely.
* @param delay - delay between attempts.
* @param httpCodes - set of HTTP codes to retry for.
*/
SimpleRetryPolicy(v_int64 maxAttempts,
const std::chrono::duration<v_int64, std::micro>& delay,
const std::unordered_set<v_int32>& httpCodes = {503});
/**
* Check if the context is eligible to retry.
* @param context - &l:RetryPolicy::Context ;.
* @return - `true` - to retry. `false` - do NOT retry.
*/
bool canRetry(const Context& context) override;
/**
* Check whether the client should retry for a given response from the server. <br>
* *This particular implementation returns `true` for codes from the set provided in the constructor*.
* @param responseStatusCode - HTTP status code of the response.
* @param context - &l:RetryPolicy::Context ;.
* @return - `true` - to retry. `false` - do NOT retry.
*/
bool retryOnResponse(v_int32 responseStatusCode, const Context& context) override;
/**
* How much client should wait before the next attempt? <br>
* *This particular implementation returns the delay passed to the constructor*.
* @param context - &l:RetryPolicy::Context ;.
* @return - delay in microseconds.
*/
v_int64 waitForMicroseconds(const Context& context) override;
};
}}}
#endif // oatpp_web_client_RetryPolicy_hpp