reuse connection in ApiClient

This commit is contained in:
lganzzzo 2018-11-11 00:51:56 +02:00
parent 8bb6b2c265
commit a959ba2852
7 changed files with 163 additions and 35 deletions

View File

@ -135,14 +135,17 @@ static PathPattern Z_getPathPattern_##NAME(const oatpp::String& path) { \
return pattern; \
} \
\
std::shared_ptr<oatpp::web::protocol::http::incoming::Response> NAME() { \
std::shared_ptr<oatpp::web::protocol::http::incoming::Response> NAME( \
const std::shared_ptr<oatpp::web::client::RequestExecutor::ConnectionHandle>& __connectionHandle = nullptr \
) { \
std::shared_ptr<oatpp::web::protocol::http::outgoing::Body> body; \
return executeRequest(METHOD, \
Z_getPathPattern_##NAME(PATH), \
nullptr, \
nullptr, \
nullptr, \
body); \
body, \
__connectionHandle); \
}
#define OATPP_API_CALL_1(NAME, METHOD, PATH, LIST) \
@ -152,7 +155,8 @@ static PathPattern Z_getPathPattern_##NAME(const oatpp::String& path) { \
} \
\
std::shared_ptr<oatpp::web::protocol::http::incoming::Response> NAME(\
OATPP_MACRO_FOREACH(OATPP_MACRO_API_CLIENT_PARAM_DECL, LIST) void* __reserved = nullptr \
OATPP_MACRO_FOREACH(OATPP_MACRO_API_CLIENT_PARAM_DECL, LIST) \
const std::shared_ptr<oatpp::web::client::RequestExecutor::ConnectionHandle>& __connectionHandle = nullptr \
) { \
auto __headers = oatpp::web::client::ApiClient::StringToParamMap::createShared(); \
auto __pathParams = oatpp::web::client::ApiClient::StringToParamMap::createShared(); \
@ -164,7 +168,8 @@ OATPP_MACRO_FOREACH(OATPP_MACRO_API_CLIENT_PARAM_DECL, LIST) void* __reserved =
__headers, \
__pathParams, \
__queryParams, \
__body); \
__body, \
__connectionHandle); \
}
#define OATPP_API_CALL_(X, NAME, METHOD, PATH, LIST) OATPP_API_CALL_##X(NAME, METHOD, PATH, LIST)
@ -185,7 +190,8 @@ static PathPattern Z_getPathPattern_##NAME(const oatpp::String& path) { \
template<typename ParentCoroutineType>\
oatpp::async::Action NAME(\
oatpp::async::AbstractCoroutine* parentCoroutine, \
oatpp::async::Action (ParentCoroutineType::*callback)(const std::shared_ptr<oatpp::web::protocol::http::incoming::Response>&) \
oatpp::async::Action (ParentCoroutineType::*callback)(const std::shared_ptr<oatpp::web::protocol::http::incoming::Response>&), \
const std::shared_ptr<oatpp::web::client::RequestExecutor::ConnectionHandle>& __connectionHandle = nullptr \
) { \
std::shared_ptr<oatpp::web::protocol::http::outgoing::Body> body; \
return executeRequestAsync(parentCoroutine, \
@ -195,7 +201,8 @@ oatpp::async::Action NAME(\
nullptr, \
nullptr, \
nullptr, \
body); \
body, \
__connectionHandle); \
}
#define OATPP_API_CALL_ASYNC_1(NAME, METHOD, PATH, LIST) \
@ -208,7 +215,8 @@ template<typename ParentCoroutineType>\
oatpp::async::Action NAME(\
oatpp::async::AbstractCoroutine* parentCoroutine, \
oatpp::async::Action (ParentCoroutineType::*callback)(const std::shared_ptr<oatpp::web::protocol::http::incoming::Response>&), \
OATPP_MACRO_FOREACH(OATPP_MACRO_API_CLIENT_PARAM_DECL, LIST) void* __reserved = nullptr \
OATPP_MACRO_FOREACH(OATPP_MACRO_API_CLIENT_PARAM_DECL, LIST) \
const std::shared_ptr<oatpp::web::client::RequestExecutor::ConnectionHandle>& __connectionHandle = nullptr \
) { \
auto __callback = static_cast<oatpp::web::client::RequestExecutor::AsyncCallback>(callback); \
auto __headers = oatpp::web::client::ApiClient::StringToParamMap::createShared(); \
@ -223,7 +231,8 @@ oatpp::async::Action NAME(\
__headers, \
__pathParams, \
__queryParams, \
__body); \
__body, \
__connectionHandle); \
}
#define OATPP_API_CALL_ASYNC_(X, NAME, METHOD, PATH, LIST) OATPP_API_CALL_ASYNC_##X(NAME, METHOD, PATH, LIST)

View File

@ -115,25 +115,27 @@ oatpp::async::Action OutputStreamBufferedProxy::flushAsync(oatpp::async::Abstrac
{}
Action act() override {
auto amount = m_stream->m_posEnd - m_stream->m_pos;
if(amount > 0){
os::io::Library::v_size result = m_stream->m_outputStream->write(&m_stream->m_buffer[m_stream->m_pos], amount);
if(result == amount){
if(result == amount) {
m_stream->m_pos = 0;
m_stream->m_posEnd = 0;
return finish();
} else if(result == oatpp::data::stream::Errors::ERROR_IO_WAIT_RETRY) {
return waitRetry();
return oatpp::async::Action::_WAIT_RETRY;
} else if(result == oatpp::data::stream::Errors::ERROR_IO_RETRY) {
return repeat();
} else if(result > 0){
return oatpp::async::Action::_REPEAT;
} else if(result == oatpp::data::stream::Errors::ERROR_IO_PIPE) {
return error("[oatpp::data::stream::OutputStreamBufferedProxy::flushAsync()]: Error - oatpp::data::stream::Errors::ERROR_IO_PIPE");
} else if( result < 0) {
return error("[oatpp::data::stream::OutputStreamBufferedProxy::flushAsync()]: Error - Failed to flush all data");
} else if(result < amount) {
m_stream->m_pos += (v_bufferSize) result;
return oatpp::async::Action::_REPEAT;
}
return error("[oatpp::data::stream::OutputStreamBufferedProxy::flushAsync()]: Error - Failed to flush all data");
}
return finish();
}
};

View File

@ -75,6 +75,8 @@ std::shared_ptr<oatpp::data::stream::IOStream> SimpleTCPConnectionProvider::getC
return oatpp::network::Connection::createShared(clientHandle);
}
#include <netinet/tcp.h>
oatpp::async::Action SimpleTCPConnectionProvider::getConnectionAsync(oatpp::async::AbstractCoroutine* parentCoroutine,
AsyncCallback callback) {
@ -97,7 +99,7 @@ oatpp::async::Action SimpleTCPConnectionProvider::getConnectionAsync(oatpp::asyn
struct hostent* host = gethostbyname((const char*) m_host->getData());
if ((host == NULL) || (host->h_addr == NULL)) {
return error("Error retrieving DNS information.");
return error("[oatpp::network::client::SimpleTCPConnectionProvider::getConnectionAsync()]: Error retrieving DNS information.");
}
bzero(&m_client, sizeof(m_client));
@ -108,7 +110,7 @@ oatpp::async::Action SimpleTCPConnectionProvider::getConnectionAsync(oatpp::asyn
m_clientHandle = socket(AF_INET, SOCK_STREAM, 0);
if (m_clientHandle < 0) {
return error("Error creating socket.");
return error("[oatpp::network::client::SimpleTCPConnectionProvider::getConnectionAsync()]: Error creating socket.");
}
fcntl(m_clientHandle, F_SETFL, O_NONBLOCK);
@ -129,6 +131,8 @@ oatpp::async::Action SimpleTCPConnectionProvider::getConnectionAsync(oatpp::asyn
errno = 0;
auto res = connect(m_clientHandle, (struct sockaddr *)&m_client, sizeof(m_client));
if(res == 0 || errno == EISCONN) {
int flags = 1;
setsockopt(m_clientHandle, IPPROTO_TCP, TCP_NODELAY, &flags, sizeof(flags));
return _return(oatpp::network::Connection::createShared(m_clientHandle));
}
if(errno == EALREADY || errno == EINPROGRESS) {
@ -137,7 +141,7 @@ oatpp::async::Action SimpleTCPConnectionProvider::getConnectionAsync(oatpp::asyn
return repeat();
}
oatpp::os::io::Library::handle_close(m_clientHandle);
return error("Can't connect");
return error("[oatpp::network::client::SimpleTCPConnectionProvider::getConnectionAsync()]: Can't connect");
}
};

View File

@ -144,17 +144,30 @@ protected:
return stream.toString();
}
public:
virtual std::shared_ptr<RequestExecutor::ConnectionHandle> getConnection() {
return m_requestExecutor->getConnection();
}
virtual oatpp::async::Action getConnectionAsync(oatpp::async::AbstractCoroutine* parentCoroutine, RequestExecutor::AsyncConnectionCallback callback) {
return m_requestExecutor->getConnectionAsync(parentCoroutine, callback);
}
virtual std::shared_ptr<Response> executeRequest(const oatpp::String& method,
const PathPattern& pathPattern,
const std::shared_ptr<StringToParamMap>& headers,
const std::shared_ptr<StringToParamMap>& pathParams,
const std::shared_ptr<StringToParamMap>& queryParams,
const std::shared_ptr<RequestExecutor::Body>& body) {
const std::shared_ptr<RequestExecutor::Body>& body,
const std::shared_ptr<RequestExecutor::ConnectionHandle>& connectionHandle = nullptr) {
return m_requestExecutor->execute(method,
formatPath(pathPattern, pathParams, queryParams),
convertParamsMap(headers),
body);
body,
connectionHandle);
}
@ -165,14 +178,16 @@ protected:
const std::shared_ptr<StringToParamMap>& headers,
const std::shared_ptr<StringToParamMap>& pathParams,
const std::shared_ptr<StringToParamMap>& queryParams,
const std::shared_ptr<RequestExecutor::Body>& body) {
const std::shared_ptr<RequestExecutor::Body>& body,
const std::shared_ptr<RequestExecutor::ConnectionHandle>& connectionHandle = nullptr) {
return m_requestExecutor->executeAsync(parentCoroutine,
callback,
method,
formatPath(pathPattern, pathParams, queryParams),
convertParamsMap(headers),
body);
body,
connectionHandle);
}

View File

@ -40,13 +40,55 @@
namespace oatpp { namespace web { namespace client {
std::shared_ptr<HttpRequestExecutor::ConnectionHandle> HttpRequestExecutor::getConnection() {
auto connection = m_connectionProvider->getConnection();
if(!connection){
throw RequestExecutionError(RequestExecutionError::ERROR_CODE_CANT_CONNECT,
"[oatpp::web::client::HttpRequestExecutor::getConnection()]: ConnectionProvider failed to provide Connection");
}
return std::make_shared<HttpConnectionHandle>(connection);
}
HttpRequestExecutor::Action HttpRequestExecutor::getConnectionAsync(oatpp::async::AbstractCoroutine* parentCoroutine, AsyncConnectionCallback callback) {
class GetConnectionCoroutine : public oatpp::async::CoroutineWithResult<GetConnectionCoroutine, std::shared_ptr<ConnectionHandle>> {
private:
std::shared_ptr<oatpp::network::ClientConnectionProvider> m_connectionProvider;
public:
GetConnectionCoroutine(const std::shared_ptr<oatpp::network::ClientConnectionProvider>& connectionProvider)
: m_connectionProvider(connectionProvider)
{}
Action act() override {
oatpp::network::ClientConnectionProvider::AsyncCallback callback =
static_cast<oatpp::network::ClientConnectionProvider::AsyncCallback>(&GetConnectionCoroutine::onConnectionReady);
return m_connectionProvider->getConnectionAsync(this, callback);
}
Action onConnectionReady(const std::shared_ptr<oatpp::data::stream::IOStream>& connection) {
return _return(std::make_shared<HttpConnectionHandle>(connection));
}
};
return parentCoroutine->startCoroutineForResult<GetConnectionCoroutine>(callback, m_connectionProvider);
}
std::shared_ptr<HttpRequestExecutor::Response>
HttpRequestExecutor::execute(const String& method,
const String& path,
const std::shared_ptr<Headers>& headers,
const std::shared_ptr<Body>& body) {
const std::shared_ptr<Body>& body,
const std::shared_ptr<ConnectionHandle>& connectionHandle) {
auto connection = m_connectionProvider->getConnection();
std::shared_ptr<oatpp::network::ConnectionProvider::IOStream> connection;
if(connectionHandle) {
connection = static_cast<HttpConnectionHandle*>(connectionHandle.get())->connection;
} else {
connection = m_connectionProvider->getConnection();
}
if(!connection){
throw RequestExecutionError(RequestExecutionError::ERROR_CODE_CANT_CONNECT,
@ -55,6 +97,8 @@ HttpRequestExecutor::execute(const String& method,
auto request = oatpp::web::protocol::http::outgoing::Request::createShared(method, path, headers, body);
request->headers->putIfNotExists(oatpp::web::protocol::http::Header::HOST, m_connectionProvider->getHost());
request->headers->putIfNotExists(oatpp::web::protocol::http::Header::CONNECTION,
oatpp::web::protocol::http::Header::Value::CONNECTION_KEEP_ALIVE);
auto ioBuffer = oatpp::data::buffer::IOBuffer::createShared();
@ -101,7 +145,8 @@ oatpp::async::Action HttpRequestExecutor::executeAsync(oatpp::async::AbstractCor
const String& method,
const String& path,
const std::shared_ptr<Headers>& headers,
const std::shared_ptr<Body>& body) {
const std::shared_ptr<Body>& body,
const std::shared_ptr<ConnectionHandle>& connectionHandle) {
class ExecutorCoroutine : public oatpp::async::CoroutineWithResult<ExecutorCoroutine, std::shared_ptr<HttpRequestExecutor::Response>> {
private:
@ -111,6 +156,7 @@ oatpp::async::Action HttpRequestExecutor::executeAsync(oatpp::async::AbstractCor
std::shared_ptr<Headers> m_headers;
std::shared_ptr<Body> m_body;
std::shared_ptr<const oatpp::web::protocol::http::incoming::BodyDecoder> m_bodyDecoder;
std::shared_ptr<ConnectionHandle> m_connectionHandle;
private:
std::shared_ptr<oatpp::data::stream::IOStream> m_connection;
std::shared_ptr<oatpp::data::buffer::IOBuffer> m_ioBuffer;
@ -123,25 +169,36 @@ oatpp::async::Action HttpRequestExecutor::executeAsync(oatpp::async::AbstractCor
const String& path,
const std::shared_ptr<Headers>& headers,
const std::shared_ptr<Body>& body,
const std::shared_ptr<const oatpp::web::protocol::http::incoming::BodyDecoder>& bodyDecoder)
const std::shared_ptr<const oatpp::web::protocol::http::incoming::BodyDecoder>& bodyDecoder,
const std::shared_ptr<ConnectionHandle>& connectionHandle)
: m_connectionProvider(connectionProvider)
, m_method(method)
, m_path(path)
, m_headers(headers)
, m_body(body)
, m_bodyDecoder(bodyDecoder)
, m_connectionHandle(connectionHandle)
{}
Action act() override {
oatpp::network::ClientConnectionProvider::AsyncCallback callback =
static_cast<oatpp::network::ClientConnectionProvider::AsyncCallback>(&ExecutorCoroutine::onConnectionReady);
return m_connectionProvider->getConnectionAsync(this, callback);
if(m_connectionHandle) {
/* Careful here onConnectionReady() should have only one possibe state */
/* Because it is called here in synchronous manner */
return onConnectionReady(static_cast<HttpConnectionHandle*>(m_connectionHandle.get())->connection);
} else {
oatpp::network::ClientConnectionProvider::AsyncCallback callback =
static_cast<oatpp::network::ClientConnectionProvider::AsyncCallback>(&ExecutorCoroutine::onConnectionReady);
return m_connectionProvider->getConnectionAsync(this, callback);
}
}
/* Careful here onConnectionReady() should have only one possibe state */
/* Because there is a call to it from act() in synchronous manner */
Action onConnectionReady(const std::shared_ptr<oatpp::data::stream::IOStream>& connection) {
m_connection = connection;
auto request = oatpp::web::protocol::http::outgoing::Request::createShared(m_method, m_path, m_headers, m_body);
request->headers->put(oatpp::web::protocol::http::Header::HOST, m_connectionProvider->getHost());
request->headers->putIfNotExists(String(Header::HOST, false), m_connectionProvider->getHost());
request->headers->putIfNotExists(String(Header::CONNECTION, false), String(Header::Value::CONNECTION_KEEP_ALIVE, false));
m_ioBuffer = oatpp::data::buffer::IOBuffer::createShared();
auto upStream = oatpp::data::stream::OutputStreamBufferedProxy::createShared(connection, m_ioBuffer);
m_bufferPointer = m_ioBuffer->getData();
@ -188,7 +245,7 @@ oatpp::async::Action HttpRequestExecutor::executeAsync(oatpp::async::AbstractCor
};
return parentCoroutine->startCoroutineForResult<ExecutorCoroutine>(callback, m_connectionProvider, method, path, headers, body, m_bodyDecoder);
return parentCoroutine->startCoroutineForResult<ExecutorCoroutine>(callback, m_connectionProvider, method, path, headers, body, m_bodyDecoder, connectionHandle);
}

View File

@ -33,9 +33,19 @@
namespace oatpp { namespace web { namespace client {
class HttpRequestExecutor : public oatpp::base::Controllable, public RequestExecutor {
private:
typedef oatpp::web::protocol::http::Header Header;
protected:
std::shared_ptr<oatpp::network::ClientConnectionProvider> m_connectionProvider;
std::shared_ptr<const oatpp::web::protocol::http::incoming::BodyDecoder> m_bodyDecoder;
public:
class HttpConnectionHandle : public ConnectionHandle {
public:
HttpConnectionHandle(const std::shared_ptr<oatpp::network::ConnectionProvider::IOStream>& stream)
: connection(stream)
{}
std::shared_ptr<oatpp::network::ConnectionProvider::IOStream> connection;
};
public:
HttpRequestExecutor(const std::shared_ptr<oatpp::network::ClientConnectionProvider>& connectionProvider,
const std::shared_ptr<const oatpp::web::protocol::http::incoming::BodyDecoder>& bodyDecoder =
@ -52,20 +62,26 @@ public:
return std::make_shared<HttpRequestExecutor>(connectionProvider, bodyDecoder);
}
std::shared_ptr<ConnectionHandle> getConnection() override;
Action getConnectionAsync(oatpp::async::AbstractCoroutine* parentCoroutine, AsyncConnectionCallback callback) override;
/**
* throws RequestExecutionError
*/
std::shared_ptr<Response> execute(const String& method,
const String& path,
const std::shared_ptr<Headers>& headers,
const std::shared_ptr<Body>& body) override;
const std::shared_ptr<Body>& body,
const std::shared_ptr<ConnectionHandle>& connectionHandle = nullptr) override;
Action executeAsync(oatpp::async::AbstractCoroutine* parentCoroutine,
AsyncCallback callback,
const String& method,
const String& path,
const std::shared_ptr<Headers>& headers,
const std::shared_ptr<Body>& body) override;
const std::shared_ptr<Body>& body,
const std::shared_ptr<ConnectionHandle>& connectionHandle = nullptr) override;
};

View File

@ -39,8 +39,21 @@ public:
typedef oatpp::web::protocol::http::Protocol::Headers Headers;
typedef oatpp::web::protocol::http::incoming::Response Response;
typedef oatpp::web::protocol::http::outgoing::Body Body;
public:
/**
* ConnectionHandle is always specific to a RequestExecutor
* You can't pass ConnectionHandle retrieved by one RequestExecutor implementation
* to another
*/
class ConnectionHandle {
public:
virtual ~ConnectionHandle() {}
};
public:
typedef Action (oatpp::async::AbstractCoroutine::*AsyncCallback)(const std::shared_ptr<Response>&);
typedef Action (oatpp::async::AbstractCoroutine::*AsyncConnectionCallback)(const std::shared_ptr<ConnectionHandle>&);
public:
class RequestExecutionError : public std::runtime_error {
@ -85,17 +98,29 @@ public:
public:
/**
* Obtain ConnectionHandle which then can be passed to execute()
*/
virtual std::shared_ptr<ConnectionHandle> getConnection() = 0;
/**
* Same as getConnection but Async
*/
virtual Action getConnectionAsync(oatpp::async::AbstractCoroutine* parentCoroutine, AsyncConnectionCallback callback) = 0;
virtual std::shared_ptr<Response> execute(const String& method,
const String& path,
const std::shared_ptr<Headers>& headers,
const std::shared_ptr<Body>& body) = 0;
const std::shared_ptr<Body>& body,
const std::shared_ptr<ConnectionHandle>& connectionHandle = nullptr) = 0;
virtual Action executeAsync(oatpp::async::AbstractCoroutine* parentCoroutine,
AsyncCallback callback,
const String& method,
const String& path,
const std::shared_ptr<Headers>& headers,
const std::shared_ptr<Body>& body) = 0;
const std::shared_ptr<Body>& body,
const std::shared_ptr<ConnectionHandle>& connectionHandle = nullptr) = 0;
};