diff --git a/codegen/codegen_define_ApiClient_.hpp b/codegen/codegen_define_ApiClient_.hpp index 271a20c0..9ee627c0 100644 --- a/codegen/codegen_define_ApiClient_.hpp +++ b/codegen/codegen_define_ApiClient_.hpp @@ -135,14 +135,17 @@ static PathPattern Z_getPathPattern_##NAME(const oatpp::String& path) { \ return pattern; \ } \ \ -std::shared_ptr NAME() { \ +std::shared_ptr NAME( \ + const std::shared_ptr& __connectionHandle = nullptr \ +) { \ std::shared_ptr body; \ return executeRequest(METHOD, \ Z_getPathPattern_##NAME(PATH), \ nullptr, \ nullptr, \ nullptr, \ - body); \ + body, \ + __connectionHandle); \ } #define OATPP_API_CALL_1(NAME, METHOD, PATH, LIST) \ @@ -152,7 +155,8 @@ static PathPattern Z_getPathPattern_##NAME(const oatpp::String& path) { \ } \ \ std::shared_ptr NAME(\ -OATPP_MACRO_FOREACH(OATPP_MACRO_API_CLIENT_PARAM_DECL, LIST) void* __reserved = nullptr \ +OATPP_MACRO_FOREACH(OATPP_MACRO_API_CLIENT_PARAM_DECL, LIST) \ + const std::shared_ptr& __connectionHandle = nullptr \ ) { \ auto __headers = oatpp::web::client::ApiClient::StringToParamMap::createShared(); \ auto __pathParams = oatpp::web::client::ApiClient::StringToParamMap::createShared(); \ @@ -164,7 +168,8 @@ OATPP_MACRO_FOREACH(OATPP_MACRO_API_CLIENT_PARAM_DECL, LIST) void* __reserved = __headers, \ __pathParams, \ __queryParams, \ - __body); \ + __body, \ + __connectionHandle); \ } #define OATPP_API_CALL_(X, NAME, METHOD, PATH, LIST) OATPP_API_CALL_##X(NAME, METHOD, PATH, LIST) @@ -185,7 +190,8 @@ static PathPattern Z_getPathPattern_##NAME(const oatpp::String& path) { \ template\ oatpp::async::Action NAME(\ oatpp::async::AbstractCoroutine* parentCoroutine, \ - oatpp::async::Action (ParentCoroutineType::*callback)(const std::shared_ptr&) \ + oatpp::async::Action (ParentCoroutineType::*callback)(const std::shared_ptr&), \ + const std::shared_ptr& __connectionHandle = nullptr \ ) { \ std::shared_ptr body; \ return executeRequestAsync(parentCoroutine, \ @@ -195,7 +201,8 @@ oatpp::async::Action NAME(\ nullptr, \ nullptr, \ nullptr, \ - body); \ + body, \ + __connectionHandle); \ } #define OATPP_API_CALL_ASYNC_1(NAME, METHOD, PATH, LIST) \ @@ -208,7 +215,8 @@ template\ oatpp::async::Action NAME(\ oatpp::async::AbstractCoroutine* parentCoroutine, \ oatpp::async::Action (ParentCoroutineType::*callback)(const std::shared_ptr&), \ - OATPP_MACRO_FOREACH(OATPP_MACRO_API_CLIENT_PARAM_DECL, LIST) void* __reserved = nullptr \ + OATPP_MACRO_FOREACH(OATPP_MACRO_API_CLIENT_PARAM_DECL, LIST) \ + const std::shared_ptr& __connectionHandle = nullptr \ ) { \ auto __callback = static_cast(callback); \ auto __headers = oatpp::web::client::ApiClient::StringToParamMap::createShared(); \ @@ -223,7 +231,8 @@ oatpp::async::Action NAME(\ __headers, \ __pathParams, \ __queryParams, \ - __body); \ + __body, \ + __connectionHandle); \ } #define OATPP_API_CALL_ASYNC_(X, NAME, METHOD, PATH, LIST) OATPP_API_CALL_ASYNC_##X(NAME, METHOD, PATH, LIST) diff --git a/core/data/stream/StreamBufferedProxy.cpp b/core/data/stream/StreamBufferedProxy.cpp index aeeb4ce6..6833768c 100644 --- a/core/data/stream/StreamBufferedProxy.cpp +++ b/core/data/stream/StreamBufferedProxy.cpp @@ -115,25 +115,27 @@ oatpp::async::Action OutputStreamBufferedProxy::flushAsync(oatpp::async::Abstrac {} Action act() override { - auto amount = m_stream->m_posEnd - m_stream->m_pos; if(amount > 0){ os::io::Library::v_size result = m_stream->m_outputStream->write(&m_stream->m_buffer[m_stream->m_pos], amount); - if(result == amount){ + if(result == amount) { m_stream->m_pos = 0; m_stream->m_posEnd = 0; return finish(); } else if(result == oatpp::data::stream::Errors::ERROR_IO_WAIT_RETRY) { - return waitRetry(); + return oatpp::async::Action::_WAIT_RETRY; } else if(result == oatpp::data::stream::Errors::ERROR_IO_RETRY) { - return repeat(); - } else if(result > 0){ + return oatpp::async::Action::_REPEAT; + } else if(result == oatpp::data::stream::Errors::ERROR_IO_PIPE) { + return error("[oatpp::data::stream::OutputStreamBufferedProxy::flushAsync()]: Error - oatpp::data::stream::Errors::ERROR_IO_PIPE"); + } else if( result < 0) { + return error("[oatpp::data::stream::OutputStreamBufferedProxy::flushAsync()]: Error - Failed to flush all data"); + } else if(result < amount) { m_stream->m_pos += (v_bufferSize) result; + return oatpp::async::Action::_REPEAT; } - return error("[oatpp::data::stream::OutputStreamBufferedProxy::flushAsync()]: Error - Failed to flush all data"); } return finish(); - } }; diff --git a/network/client/SimpleTCPConnectionProvider.cpp b/network/client/SimpleTCPConnectionProvider.cpp index 795afcb5..c03d36c0 100644 --- a/network/client/SimpleTCPConnectionProvider.cpp +++ b/network/client/SimpleTCPConnectionProvider.cpp @@ -75,6 +75,8 @@ std::shared_ptr SimpleTCPConnectionProvider::getC return oatpp::network::Connection::createShared(clientHandle); } + +#include oatpp::async::Action SimpleTCPConnectionProvider::getConnectionAsync(oatpp::async::AbstractCoroutine* parentCoroutine, AsyncCallback callback) { @@ -97,7 +99,7 @@ oatpp::async::Action SimpleTCPConnectionProvider::getConnectionAsync(oatpp::asyn struct hostent* host = gethostbyname((const char*) m_host->getData()); if ((host == NULL) || (host->h_addr == NULL)) { - return error("Error retrieving DNS information."); + return error("[oatpp::network::client::SimpleTCPConnectionProvider::getConnectionAsync()]: Error retrieving DNS information."); } bzero(&m_client, sizeof(m_client)); @@ -108,7 +110,7 @@ oatpp::async::Action SimpleTCPConnectionProvider::getConnectionAsync(oatpp::asyn m_clientHandle = socket(AF_INET, SOCK_STREAM, 0); if (m_clientHandle < 0) { - return error("Error creating socket."); + return error("[oatpp::network::client::SimpleTCPConnectionProvider::getConnectionAsync()]: Error creating socket."); } fcntl(m_clientHandle, F_SETFL, O_NONBLOCK); @@ -129,6 +131,8 @@ oatpp::async::Action SimpleTCPConnectionProvider::getConnectionAsync(oatpp::asyn errno = 0; auto res = connect(m_clientHandle, (struct sockaddr *)&m_client, sizeof(m_client)); if(res == 0 || errno == EISCONN) { + int flags = 1; + setsockopt(m_clientHandle, IPPROTO_TCP, TCP_NODELAY, &flags, sizeof(flags)); return _return(oatpp::network::Connection::createShared(m_clientHandle)); } if(errno == EALREADY || errno == EINPROGRESS) { @@ -137,7 +141,7 @@ oatpp::async::Action SimpleTCPConnectionProvider::getConnectionAsync(oatpp::asyn return repeat(); } oatpp::os::io::Library::handle_close(m_clientHandle); - return error("Can't connect"); + return error("[oatpp::network::client::SimpleTCPConnectionProvider::getConnectionAsync()]: Can't connect"); } }; diff --git a/web/client/ApiClient.hpp b/web/client/ApiClient.hpp index 370a6b69..7103e8e3 100644 --- a/web/client/ApiClient.hpp +++ b/web/client/ApiClient.hpp @@ -144,17 +144,30 @@ protected: return stream.toString(); } +public: + + virtual std::shared_ptr getConnection() { + return m_requestExecutor->getConnection(); + } + + virtual oatpp::async::Action getConnectionAsync(oatpp::async::AbstractCoroutine* parentCoroutine, RequestExecutor::AsyncConnectionCallback callback) { + return m_requestExecutor->getConnectionAsync(parentCoroutine, callback); + } + + virtual std::shared_ptr executeRequest(const oatpp::String& method, const PathPattern& pathPattern, const std::shared_ptr& headers, const std::shared_ptr& pathParams, const std::shared_ptr& queryParams, - const std::shared_ptr& body) { + const std::shared_ptr& body, + const std::shared_ptr& connectionHandle = nullptr) { return m_requestExecutor->execute(method, formatPath(pathPattern, pathParams, queryParams), convertParamsMap(headers), - body); + body, + connectionHandle); } @@ -165,14 +178,16 @@ protected: const std::shared_ptr& headers, const std::shared_ptr& pathParams, const std::shared_ptr& queryParams, - const std::shared_ptr& body) { + const std::shared_ptr& body, + const std::shared_ptr& connectionHandle = nullptr) { return m_requestExecutor->executeAsync(parentCoroutine, callback, method, formatPath(pathPattern, pathParams, queryParams), convertParamsMap(headers), - body); + body, + connectionHandle); } diff --git a/web/client/HttpRequestExecutor.cpp b/web/client/HttpRequestExecutor.cpp index 2d731354..c6eef9e6 100644 --- a/web/client/HttpRequestExecutor.cpp +++ b/web/client/HttpRequestExecutor.cpp @@ -40,13 +40,55 @@ namespace oatpp { namespace web { namespace client { +std::shared_ptr HttpRequestExecutor::getConnection() { + auto connection = m_connectionProvider->getConnection(); + if(!connection){ + throw RequestExecutionError(RequestExecutionError::ERROR_CODE_CANT_CONNECT, + "[oatpp::web::client::HttpRequestExecutor::getConnection()]: ConnectionProvider failed to provide Connection"); + } + return std::make_shared(connection); +} + +HttpRequestExecutor::Action HttpRequestExecutor::getConnectionAsync(oatpp::async::AbstractCoroutine* parentCoroutine, AsyncConnectionCallback callback) { + + class GetConnectionCoroutine : public oatpp::async::CoroutineWithResult> { + private: + std::shared_ptr m_connectionProvider; + public: + + GetConnectionCoroutine(const std::shared_ptr& connectionProvider) + : m_connectionProvider(connectionProvider) + {} + + Action act() override { + oatpp::network::ClientConnectionProvider::AsyncCallback callback = + static_cast(&GetConnectionCoroutine::onConnectionReady); + return m_connectionProvider->getConnectionAsync(this, callback); + } + + Action onConnectionReady(const std::shared_ptr& connection) { + return _return(std::make_shared(connection)); + } + + }; + + return parentCoroutine->startCoroutineForResult(callback, m_connectionProvider); + +} + std::shared_ptr HttpRequestExecutor::execute(const String& method, const String& path, const std::shared_ptr& headers, - const std::shared_ptr& body) { + const std::shared_ptr& body, + const std::shared_ptr& connectionHandle) { - auto connection = m_connectionProvider->getConnection(); + std::shared_ptr connection; + if(connectionHandle) { + connection = static_cast(connectionHandle.get())->connection; + } else { + connection = m_connectionProvider->getConnection(); + } if(!connection){ throw RequestExecutionError(RequestExecutionError::ERROR_CODE_CANT_CONNECT, @@ -55,6 +97,8 @@ HttpRequestExecutor::execute(const String& method, auto request = oatpp::web::protocol::http::outgoing::Request::createShared(method, path, headers, body); request->headers->putIfNotExists(oatpp::web::protocol::http::Header::HOST, m_connectionProvider->getHost()); + request->headers->putIfNotExists(oatpp::web::protocol::http::Header::CONNECTION, + oatpp::web::protocol::http::Header::Value::CONNECTION_KEEP_ALIVE); auto ioBuffer = oatpp::data::buffer::IOBuffer::createShared(); @@ -101,7 +145,8 @@ oatpp::async::Action HttpRequestExecutor::executeAsync(oatpp::async::AbstractCor const String& method, const String& path, const std::shared_ptr& headers, - const std::shared_ptr& body) { + const std::shared_ptr& body, + const std::shared_ptr& connectionHandle) { class ExecutorCoroutine : public oatpp::async::CoroutineWithResult> { private: @@ -111,6 +156,7 @@ oatpp::async::Action HttpRequestExecutor::executeAsync(oatpp::async::AbstractCor std::shared_ptr m_headers; std::shared_ptr m_body; std::shared_ptr m_bodyDecoder; + std::shared_ptr m_connectionHandle; private: std::shared_ptr m_connection; std::shared_ptr m_ioBuffer; @@ -123,25 +169,36 @@ oatpp::async::Action HttpRequestExecutor::executeAsync(oatpp::async::AbstractCor const String& path, const std::shared_ptr& headers, const std::shared_ptr& body, - const std::shared_ptr& bodyDecoder) + const std::shared_ptr& bodyDecoder, + const std::shared_ptr& connectionHandle) : m_connectionProvider(connectionProvider) , m_method(method) , m_path(path) , m_headers(headers) , m_body(body) , m_bodyDecoder(bodyDecoder) + , m_connectionHandle(connectionHandle) {} Action act() override { - oatpp::network::ClientConnectionProvider::AsyncCallback callback = - static_cast(&ExecutorCoroutine::onConnectionReady); - return m_connectionProvider->getConnectionAsync(this, callback); + if(m_connectionHandle) { + /* Careful here onConnectionReady() should have only one possibe state */ + /* Because it is called here in synchronous manner */ + return onConnectionReady(static_cast(m_connectionHandle.get())->connection); + } else { + oatpp::network::ClientConnectionProvider::AsyncCallback callback = + static_cast(&ExecutorCoroutine::onConnectionReady); + return m_connectionProvider->getConnectionAsync(this, callback); + } } + /* Careful here onConnectionReady() should have only one possibe state */ + /* Because there is a call to it from act() in synchronous manner */ Action onConnectionReady(const std::shared_ptr& connection) { m_connection = connection; auto request = oatpp::web::protocol::http::outgoing::Request::createShared(m_method, m_path, m_headers, m_body); - request->headers->put(oatpp::web::protocol::http::Header::HOST, m_connectionProvider->getHost()); + request->headers->putIfNotExists(String(Header::HOST, false), m_connectionProvider->getHost()); + request->headers->putIfNotExists(String(Header::CONNECTION, false), String(Header::Value::CONNECTION_KEEP_ALIVE, false)); m_ioBuffer = oatpp::data::buffer::IOBuffer::createShared(); auto upStream = oatpp::data::stream::OutputStreamBufferedProxy::createShared(connection, m_ioBuffer); m_bufferPointer = m_ioBuffer->getData(); @@ -188,7 +245,7 @@ oatpp::async::Action HttpRequestExecutor::executeAsync(oatpp::async::AbstractCor }; - return parentCoroutine->startCoroutineForResult(callback, m_connectionProvider, method, path, headers, body, m_bodyDecoder); + return parentCoroutine->startCoroutineForResult(callback, m_connectionProvider, method, path, headers, body, m_bodyDecoder, connectionHandle); } diff --git a/web/client/HttpRequestExecutor.hpp b/web/client/HttpRequestExecutor.hpp index 763ab0c3..7f7f2469 100644 --- a/web/client/HttpRequestExecutor.hpp +++ b/web/client/HttpRequestExecutor.hpp @@ -33,9 +33,19 @@ namespace oatpp { namespace web { namespace client { class HttpRequestExecutor : public oatpp::base::Controllable, public RequestExecutor { +private: + typedef oatpp::web::protocol::http::Header Header; protected: std::shared_ptr m_connectionProvider; std::shared_ptr m_bodyDecoder; +public: + class HttpConnectionHandle : public ConnectionHandle { + public: + HttpConnectionHandle(const std::shared_ptr& stream) + : connection(stream) + {} + std::shared_ptr connection; + }; public: HttpRequestExecutor(const std::shared_ptr& connectionProvider, const std::shared_ptr& bodyDecoder = @@ -52,20 +62,26 @@ public: return std::make_shared(connectionProvider, bodyDecoder); } + std::shared_ptr getConnection() override; + + Action getConnectionAsync(oatpp::async::AbstractCoroutine* parentCoroutine, AsyncConnectionCallback callback) override; + /** * throws RequestExecutionError */ std::shared_ptr execute(const String& method, const String& path, const std::shared_ptr& headers, - const std::shared_ptr& body) override; + const std::shared_ptr& body, + const std::shared_ptr& connectionHandle = nullptr) override; Action executeAsync(oatpp::async::AbstractCoroutine* parentCoroutine, AsyncCallback callback, const String& method, const String& path, const std::shared_ptr& headers, - const std::shared_ptr& body) override; + const std::shared_ptr& body, + const std::shared_ptr& connectionHandle = nullptr) override; }; diff --git a/web/client/RequestExecutor.hpp b/web/client/RequestExecutor.hpp index 590422e8..64ac8ee8 100644 --- a/web/client/RequestExecutor.hpp +++ b/web/client/RequestExecutor.hpp @@ -39,8 +39,21 @@ public: typedef oatpp::web::protocol::http::Protocol::Headers Headers; typedef oatpp::web::protocol::http::incoming::Response Response; typedef oatpp::web::protocol::http::outgoing::Body Body; +public: + + /** + * ConnectionHandle is always specific to a RequestExecutor + * You can't pass ConnectionHandle retrieved by one RequestExecutor implementation + * to another + */ + class ConnectionHandle { + public: + virtual ~ConnectionHandle() {} + }; + public: typedef Action (oatpp::async::AbstractCoroutine::*AsyncCallback)(const std::shared_ptr&); + typedef Action (oatpp::async::AbstractCoroutine::*AsyncConnectionCallback)(const std::shared_ptr&); public: class RequestExecutionError : public std::runtime_error { @@ -85,17 +98,29 @@ public: public: + /** + * Obtain ConnectionHandle which then can be passed to execute() + */ + virtual std::shared_ptr getConnection() = 0; + + /** + * Same as getConnection but Async + */ + virtual Action getConnectionAsync(oatpp::async::AbstractCoroutine* parentCoroutine, AsyncConnectionCallback callback) = 0; + virtual std::shared_ptr execute(const String& method, const String& path, const std::shared_ptr& headers, - const std::shared_ptr& body) = 0; + const std::shared_ptr& body, + const std::shared_ptr& connectionHandle = nullptr) = 0; virtual Action executeAsync(oatpp::async::AbstractCoroutine* parentCoroutine, AsyncCallback callback, const String& method, const String& path, const std::shared_ptr& headers, - const std::shared_ptr& body) = 0; + const std::shared_ptr& body, + const std::shared_ptr& connectionHandle = nullptr) = 0; };