mirror of
https://github.com/oatpp/oatpp.git
synced 2025-01-18 16:43:57 +08:00
Merge branch 'master' into v1.3.0
This commit is contained in:
commit
e774517623
@ -37,6 +37,85 @@
|
||||
|
||||
namespace oatpp { namespace web { namespace client {
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
|
||||
// HttpRequestExecutor::ConnectionProxy
|
||||
|
||||
HttpRequestExecutor::ConnectionProxy::ConnectionProxy(const std::shared_ptr<ClientConnectionProvider>& connectionProvider,
|
||||
const std::shared_ptr<data::stream::IOStream>& connection)
|
||||
: m_connectionProvider(connectionProvider)
|
||||
, m_connection(connection)
|
||||
, m_valid(true)
|
||||
, m_invalidateOnDestroy(false)
|
||||
{}
|
||||
|
||||
HttpRequestExecutor::ConnectionProxy::~ConnectionProxy() {
|
||||
if(m_invalidateOnDestroy) {
|
||||
invalidate();
|
||||
}
|
||||
}
|
||||
|
||||
v_io_size HttpRequestExecutor::ConnectionProxy::read(void *buffer, v_buff_size count, async::Action& action) {
|
||||
return m_connection->read(buffer, count, action);
|
||||
}
|
||||
|
||||
v_io_size HttpRequestExecutor::ConnectionProxy::write(const void *data, v_buff_size count, async::Action& action) {
|
||||
return m_connection->write(data,count, action);
|
||||
}
|
||||
|
||||
void HttpRequestExecutor::ConnectionProxy::setInputStreamIOMode(data::stream::IOMode ioMode) {
|
||||
m_connection->setInputStreamIOMode(ioMode);
|
||||
}
|
||||
|
||||
data::stream::IOMode HttpRequestExecutor::ConnectionProxy::getInputStreamIOMode() {
|
||||
return m_connection->getInputStreamIOMode();
|
||||
}
|
||||
|
||||
data::stream::Context& HttpRequestExecutor::ConnectionProxy::getInputStreamContext() {
|
||||
return m_connection->getInputStreamContext();
|
||||
}
|
||||
|
||||
void HttpRequestExecutor::ConnectionProxy::setOutputStreamIOMode(data::stream::IOMode ioMode) {
|
||||
return m_connection->setOutputStreamIOMode(ioMode);
|
||||
}
|
||||
|
||||
data::stream::IOMode HttpRequestExecutor::ConnectionProxy::getOutputStreamIOMode() {
|
||||
return m_connection->getOutputStreamIOMode();
|
||||
}
|
||||
|
||||
data::stream::Context& HttpRequestExecutor::ConnectionProxy::getOutputStreamContext() {
|
||||
return m_connection->getOutputStreamContext();
|
||||
}
|
||||
|
||||
void HttpRequestExecutor::ConnectionProxy::invalidate() {
|
||||
if(m_valid) {
|
||||
m_connectionProvider->invalidate(m_connection);
|
||||
m_valid = false;
|
||||
}
|
||||
}
|
||||
|
||||
void HttpRequestExecutor::ConnectionProxy::setInvalidateOnDestroy(bool invalidateOnDestroy) {
|
||||
m_invalidateOnDestroy = invalidateOnDestroy;
|
||||
}
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
|
||||
// HttpRequestExecutor::HttpConnectionHandle
|
||||
|
||||
HttpRequestExecutor::HttpConnectionHandle::HttpConnectionHandle(const std::shared_ptr<ConnectionProxy>& connectionProxy)
|
||||
: m_connectionProxy(connectionProxy)
|
||||
{}
|
||||
|
||||
|
||||
std::shared_ptr<HttpRequestExecutor::ConnectionProxy> HttpRequestExecutor::HttpConnectionHandle::getConnection() {
|
||||
return m_connectionProxy;
|
||||
}
|
||||
|
||||
void HttpRequestExecutor::HttpConnectionHandle::invalidate() {
|
||||
m_connectionProxy->invalidate();
|
||||
}
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
|
||||
// HttpRequestExecutor
|
||||
|
||||
HttpRequestExecutor::HttpRequestExecutor(const std::shared_ptr<ClientConnectionProvider>& connectionProvider,
|
||||
const std::shared_ptr<RetryPolicy>& retryPolicy,
|
||||
const std::shared_ptr<const BodyDecoder>& bodyDecoder)
|
||||
@ -59,7 +138,8 @@ std::shared_ptr<HttpRequestExecutor::ConnectionHandle> HttpRequestExecutor::getC
|
||||
throw RequestExecutionError(RequestExecutionError::ERROR_CODE_CANT_CONNECT,
|
||||
"[oatpp::web::client::HttpRequestExecutor::getConnection()]: ConnectionProvider failed to provide Connection");
|
||||
}
|
||||
return std::make_shared<HttpConnectionHandle>(connection);
|
||||
auto connectionProxy = std::make_shared<ConnectionProxy>(m_connectionProvider, connection);
|
||||
return std::make_shared<HttpConnectionHandle>(connectionProxy);
|
||||
}
|
||||
|
||||
oatpp::async::CoroutineStarterForResult<const std::shared_ptr<HttpRequestExecutor::ConnectionHandle>&>
|
||||
@ -79,7 +159,8 @@ HttpRequestExecutor::getConnectionAsync() {
|
||||
}
|
||||
|
||||
Action onConnectionReady(const std::shared_ptr<oatpp::data::stream::IOStream>& connection) {
|
||||
return _return(std::make_shared<HttpConnectionHandle>(connection));
|
||||
auto connectionProxy = std::make_shared<ConnectionProxy>(m_connectionProvider, connection);
|
||||
return _return(std::make_shared<HttpConnectionHandle>(connectionProxy));
|
||||
}
|
||||
|
||||
};
|
||||
@ -91,8 +172,8 @@ HttpRequestExecutor::getConnectionAsync() {
|
||||
void HttpRequestExecutor::invalidateConnection(const std::shared_ptr<ConnectionHandle>& connectionHandle) {
|
||||
|
||||
if(connectionHandle) {
|
||||
auto connection = static_cast<HttpConnectionHandle*>(connectionHandle.get())->connection;
|
||||
m_connectionProvider->invalidate(connection);
|
||||
auto handle = static_cast<HttpConnectionHandle*>(connectionHandle.get());
|
||||
handle->invalidate();
|
||||
}
|
||||
|
||||
}
|
||||
@ -104,9 +185,10 @@ HttpRequestExecutor::executeOnce(const String& method,
|
||||
const std::shared_ptr<Body>& body,
|
||||
const std::shared_ptr<ConnectionHandle>& connectionHandle) {
|
||||
|
||||
std::shared_ptr<oatpp::data::stream::IOStream> connection;
|
||||
if(connectionHandle) {
|
||||
connection = static_cast<HttpConnectionHandle*>(connectionHandle.get())->connection;
|
||||
std::shared_ptr<ConnectionProxy> connection;
|
||||
std::shared_ptr<HttpConnectionHandle> httpCH = std::static_pointer_cast<HttpConnectionHandle>(connectionHandle);
|
||||
if(httpCH) {
|
||||
connection = httpCH->getConnection();
|
||||
}
|
||||
|
||||
if(!connection){
|
||||
@ -132,21 +214,20 @@ HttpRequestExecutor::executeOnce(const String& method,
|
||||
const auto& result = headerReader.readHeaders(connection, error);
|
||||
|
||||
if(error.status.code != 0) {
|
||||
invalidateConnection(connectionHandle);
|
||||
connection->invalidate();
|
||||
throw RequestExecutionError(RequestExecutionError::ERROR_CODE_CANT_PARSE_STARTING_LINE,
|
||||
"[oatpp::web::client::HttpRequestExecutor::executeOnce()]: Failed to parse response. Invalid response headers");
|
||||
}
|
||||
|
||||
if(error.ioStatus < 0) {
|
||||
invalidateConnection(connectionHandle);
|
||||
connection->invalidate();
|
||||
throw RequestExecutionError(RequestExecutionError::ERROR_CODE_CANT_PARSE_STARTING_LINE,
|
||||
"[oatpp::web::client::HttpRequestExecutor::executeOnce()]: Failed to read response.");
|
||||
}
|
||||
|
||||
auto con_hdr = result.headers.getAsMemoryLabel<oatpp::data::share::StringKeyLabelCI>("Connection");
|
||||
if (con_hdr == "close")
|
||||
{
|
||||
invalidateConnection(connectionHandle);
|
||||
auto connectionHeader = result.headers.getAsMemoryLabel<oatpp::data::share::StringKeyLabelCI>(Header::CONNECTION);
|
||||
if (connectionHeader == "close") {
|
||||
connection->setInvalidateOnDestroy(true);
|
||||
}
|
||||
|
||||
auto bodyStream = oatpp::data::stream::InputStreamBufferedProxy::createShared(connection,
|
||||
@ -180,12 +261,12 @@ HttpRequestExecutor::executeOnceAsync(const String& method,
|
||||
Headers m_headers;
|
||||
std::shared_ptr<Body> m_body;
|
||||
std::shared_ptr<const BodyDecoder> m_bodyDecoder;
|
||||
std::shared_ptr<ConnectionHandle> m_connectionHandle;
|
||||
std::shared_ptr<HttpConnectionHandle> m_connectionHandle;
|
||||
oatpp::data::share::MemoryLabel m_buffer;
|
||||
ResponseHeadersReader m_headersReader;
|
||||
std::shared_ptr<oatpp::data::stream::OutputStreamBufferedProxy> m_upstream;
|
||||
private:
|
||||
std::shared_ptr<oatpp::data::stream::IOStream> m_connection;
|
||||
std::shared_ptr<ConnectionProxy> m_connection;
|
||||
public:
|
||||
|
||||
ExecutorCoroutine(HttpRequestExecutor* _this,
|
||||
@ -194,7 +275,7 @@ HttpRequestExecutor::executeOnceAsync(const String& method,
|
||||
const Headers& headers,
|
||||
const std::shared_ptr<Body>& body,
|
||||
const std::shared_ptr<const BodyDecoder>& bodyDecoder,
|
||||
const std::shared_ptr<ConnectionHandle>& connectionHandle)
|
||||
const std::shared_ptr<HttpConnectionHandle>& connectionHandle)
|
||||
: m_this(_this)
|
||||
, m_method(method)
|
||||
, m_path(path)
|
||||
@ -209,7 +290,7 @@ HttpRequestExecutor::executeOnceAsync(const String& method,
|
||||
Action act() override {
|
||||
|
||||
if(m_connectionHandle) {
|
||||
m_connection = static_cast<HttpConnectionHandle*>(m_connectionHandle.get())->connection;
|
||||
m_connection = m_connectionHandle->getConnection();
|
||||
}
|
||||
|
||||
if(!m_connection) {
|
||||
@ -234,6 +315,11 @@ HttpRequestExecutor::executeOnceAsync(const String& method,
|
||||
|
||||
Action onHeadersParsed(const ResponseHeadersReader::Result& result) {
|
||||
|
||||
auto connectionHeader = result.headers.getAsMemoryLabel<oatpp::data::share::StringKeyLabelCI>(Header::CONNECTION);
|
||||
if (connectionHeader == "close") {
|
||||
m_connection->setInvalidateOnDestroy(true);
|
||||
}
|
||||
|
||||
auto bodyStream = oatpp::data::stream::InputStreamBufferedProxy::createShared(m_connection,
|
||||
m_buffer,
|
||||
result.bufferPosStart,
|
||||
@ -248,8 +334,8 @@ HttpRequestExecutor::executeOnceAsync(const String& method,
|
||||
|
||||
Action handleError(oatpp::async::Error* error) override {
|
||||
|
||||
if(m_connectionHandle) {
|
||||
m_this->invalidateConnection(m_connectionHandle);
|
||||
if(m_connection) {
|
||||
m_connection->invalidate();
|
||||
}
|
||||
|
||||
return error;
|
||||
@ -258,7 +344,8 @@ HttpRequestExecutor::executeOnceAsync(const String& method,
|
||||
|
||||
};
|
||||
|
||||
return ExecutorCoroutine::startForResult(this, method, path, headers, body, m_bodyDecoder, connectionHandle);
|
||||
auto httpCH = std::static_pointer_cast<HttpConnectionHandle>(connectionHandle);
|
||||
return ExecutorCoroutine::startForResult(this, method, path, headers, body, m_bodyDecoder, httpCH);
|
||||
|
||||
}
|
||||
|
||||
|
@ -44,6 +44,38 @@ private:
|
||||
protected:
|
||||
std::shared_ptr<ClientConnectionProvider> m_connectionProvider;
|
||||
std::shared_ptr<const BodyDecoder> m_bodyDecoder;
|
||||
public:
|
||||
|
||||
class ConnectionProxy : public data::stream::IOStream {
|
||||
private:
|
||||
/* provider which created this connection */
|
||||
std::shared_ptr<ClientConnectionProvider> m_connectionProvider;
|
||||
std::shared_ptr<data::stream::IOStream> m_connection;
|
||||
bool m_valid;
|
||||
bool m_invalidateOnDestroy;
|
||||
public:
|
||||
|
||||
ConnectionProxy(const std::shared_ptr<ClientConnectionProvider>& connectionProvider,
|
||||
const std::shared_ptr<data::stream::IOStream>& connection);
|
||||
|
||||
~ConnectionProxy() override;
|
||||
|
||||
v_io_size read(void *buffer, v_buff_size count, async::Action& action) override;
|
||||
v_io_size write(const void *data, v_buff_size count, async::Action& action) override;
|
||||
|
||||
void setInputStreamIOMode(data::stream::IOMode ioMode) override;
|
||||
data::stream::IOMode getInputStreamIOMode() override;
|
||||
data::stream::Context& getInputStreamContext() override;
|
||||
|
||||
void setOutputStreamIOMode(data::stream::IOMode ioMode) override;
|
||||
data::stream::IOMode getOutputStreamIOMode() override;
|
||||
data::stream::Context& getOutputStreamContext() override;
|
||||
|
||||
void invalidate();
|
||||
void setInvalidateOnDestroy(bool invalidateOnDestroy);
|
||||
|
||||
};
|
||||
|
||||
public:
|
||||
|
||||
/**
|
||||
@ -51,20 +83,16 @@ public:
|
||||
* For more details see &id:oatpp::web::client::RequestExecutor::ConnectionHandle;.
|
||||
*/
|
||||
class HttpConnectionHandle : public ConnectionHandle {
|
||||
private:
|
||||
std::shared_ptr<ConnectionProxy> m_connectionProxy;
|
||||
public:
|
||||
|
||||
/**
|
||||
* Constructor.
|
||||
* @param stream - &id:oatpp::data::stream::IOStream;.
|
||||
*/
|
||||
HttpConnectionHandle(const std::shared_ptr<oatpp::data::stream::IOStream>& stream)
|
||||
: connection(stream)
|
||||
{}
|
||||
HttpConnectionHandle(const std::shared_ptr<ConnectionProxy>& connectionProxy);
|
||||
|
||||
std::shared_ptr<ConnectionProxy> getConnection();
|
||||
|
||||
void invalidate();
|
||||
|
||||
/**
|
||||
* Connection.
|
||||
*/
|
||||
std::shared_ptr<oatpp::data::stream::IOStream> connection;
|
||||
};
|
||||
public:
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user