From d74d6b2308bc9f7819d545481a2b1925277b00f6 Mon Sep 17 00:00:00 2001 From: lganzzzo Date: Tue, 19 Oct 2021 03:34:16 +0300 Subject: [PATCH] provider: Decouple Provider and Invalidator. --- src/CMakeLists.txt | 1 + src/oatpp/core/provider/Invalidator.hpp | 57 ++++++++ src/oatpp/core/provider/Pool.hpp | 85 +++++++----- src/oatpp/core/provider/Provider.hpp | 124 ++++++++++++++++-- src/oatpp/network/ConnectionHandler.hpp | 6 +- src/oatpp/network/ConnectionPool.cpp | 19 ++- src/oatpp/network/ConnectionPool.hpp | 2 +- src/oatpp/network/Server.cpp | 16 +-- .../network/monitor/ConnectionMonitor.cpp | 63 +++++---- .../network/monitor/ConnectionMonitor.hpp | 23 ++-- .../network/tcp/client/ConnectionProvider.cpp | 78 ++++++----- .../network/tcp/client/ConnectionProvider.hpp | 23 ++-- .../network/tcp/server/ConnectionProvider.cpp | 73 ++++++----- .../network/tcp/server/ConnectionProvider.hpp | 25 ++-- .../virtual_/client/ConnectionProvider.cpp | 27 ++-- .../virtual_/client/ConnectionProvider.hpp | 24 ++-- .../virtual_/server/ConnectionProvider.cpp | 11 +- .../virtual_/server/ConnectionProvider.hpp | 23 ++-- src/oatpp/web/client/HttpRequestExecutor.cpp | 30 ++--- src/oatpp/web/client/HttpRequestExecutor.hpp | 7 +- .../web/server/AsyncHttpConnectionHandler.cpp | 6 +- .../web/server/AsyncHttpConnectionHandler.hpp | 3 +- .../web/server/HttpConnectionHandler.cpp | 6 +- .../web/server/HttpConnectionHandler.hpp | 3 +- src/oatpp/web/server/HttpProcessor.cpp | 22 ++-- src/oatpp/web/server/HttpProcessor.hpp | 12 +- test/oatpp/core/provider/PoolTemplateTest.cpp | 68 ++++++---- test/oatpp/core/provider/PoolTest.cpp | 45 ++++--- test/oatpp/network/ConnectionPoolTest.cpp | 49 ++++--- test/oatpp/web/PipelineAsyncTest.cpp | 6 +- test/oatpp/web/PipelineTest.cpp | 6 +- 31 files changed, 609 insertions(+), 334 deletions(-) create mode 100644 src/oatpp/core/provider/Invalidator.hpp diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 9d5154c0..ccba2cd5 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -126,6 +126,7 @@ add_library(oatpp oatpp/core/parser/Caret.hpp oatpp/core/parser/ParsingError.cpp oatpp/core/parser/ParsingError.hpp + oatpp/core/provider/Invalidator.hpp oatpp/core/provider/Pool.hpp oatpp/core/provider/Provider.hpp oatpp/core/utils/Binary.cpp diff --git a/src/oatpp/core/provider/Invalidator.hpp b/src/oatpp/core/provider/Invalidator.hpp new file mode 100644 index 00000000..95bb6390 --- /dev/null +++ b/src/oatpp/core/provider/Invalidator.hpp @@ -0,0 +1,57 @@ +/*************************************************************************** + * + * Project _____ __ ____ _ _ + * ( _ ) /__\ (_ _)_| |_ _| |_ + * )(_)( /(__)\ )( (_ _)(_ _) + * (_____)(__)(__)(__) |_| |_| + * + * + * Copyright 2018-present, Leonid Stryzhevskyi + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + ***************************************************************************/ + +#ifndef oatpp_provider_Invalidator_hpp +#define oatpp_provider_Invalidator_hpp + +#include "oatpp/core/base/Countable.hpp" +#include + +namespace oatpp { namespace provider { + +/** + * Abstract resource invalidator. + * @tparam T - resource class. + */ +template +class Invalidator : public oatpp::base::Countable { +public: + + /** + * Default virtual destructor. + */ + virtual ~Invalidator() = default; + + /** + * Invalidate resource that was previously created by the correspondent provider.
+ * Use-case: if provider is pool based - you can signal that this resource should not be reused anymore. + * @param resource + */ + virtual void invalidate(const std::shared_ptr &resource) = 0; + +}; + +}} + +#endif //oatpp_provider_Invalidator_hpp diff --git a/src/oatpp/core/provider/Pool.hpp b/src/oatpp/core/provider/Pool.hpp index 05103e4a..fdf93529 100644 --- a/src/oatpp/core/provider/Pool.hpp +++ b/src/oatpp/core/provider/Pool.hpp @@ -56,12 +56,12 @@ private: m_valid = false; } - std::shared_ptr __pool__getUnderlyingResource() { - return _obj; + provider::ResourceHandle __pool__getUnderlyingResource() { + return _handle; } protected: - std::shared_ptr _obj; + provider::ResourceHandle _handle; private: std::shared_ptr m_pool; bool m_valid; @@ -72,8 +72,8 @@ public: * @param resource - base resource. * @param pool - &l:AcquisitionProxy::PoolInstance;. */ - AcquisitionProxy(const std::shared_ptr& resource, const std::shared_ptr& pool) - : _obj(resource) + AcquisitionProxy(const provider::ResourceHandle& resource, const std::shared_ptr& pool) + : _handle(resource) , m_pool(pool) , m_valid(true) {} @@ -82,7 +82,7 @@ public: * Virtual destructor. */ virtual ~AcquisitionProxy() { - m_pool->release(std::move(_obj), m_valid); + m_pool->release(std::move(_handle), m_valid); } }; @@ -93,10 +93,24 @@ class PoolTemplate : public oatpp::base::Countable, public async::CoroutineWaitL private: struct PoolRecord { - std::shared_ptr resource; + provider::ResourceHandle resource; v_int64 timestamp; }; +private: + + class ResourceInvalidator : public provider::Invalidator { + public: + + void invalidate(const std::shared_ptr& resource) override { + auto proxy = std::static_pointer_cast(resource); + proxy->__pool__invalidate(); + const auto& handle = proxy->__pool__getUnderlyingResource(); + handle.invalidator->invalidate(handle.object); + } + + }; + private: void onNewItem(async::CoroutineWaitList& list) override { @@ -116,7 +130,7 @@ private: } - void release(std::shared_ptr&& resource, bool canReuse) { + void release(provider::ResourceHandle&& resource, bool canReuse) { { @@ -155,7 +169,7 @@ private: auto elapsed = ticks - i->timestamp; if(elapsed > pool->m_maxResourceTTL) { - pool->m_provider->invalidate(i->resource); + i->resource.invalidator->invalidate(i->resource.object); i = pool->m_bench.erase(i); pool->m_counter --; } else { @@ -175,7 +189,7 @@ private: std::lock_guard guard(pool->m_lock); auto i = pool->m_bench.begin(); while (i != pool->m_bench.end()) { - pool->m_provider->invalidate(i->resource); + i->resource.invalidator->invalidate(i->resource.object); i = pool->m_bench.erase(i); pool->m_counter --; } @@ -189,6 +203,7 @@ private: } private: + std::shared_ptr m_invalidator; std::shared_ptr> m_provider; v_int64 m_counter{0}; v_int64 m_maxResources; @@ -204,7 +219,8 @@ private: protected: PoolTemplate(const std::shared_ptr>& provider, v_int64 maxResources, v_int64 maxResourceTTL, const std::chrono::duration& timeout) - : m_provider(provider) + : m_invalidator(std::make_shared()) + , m_provider(provider) , m_maxResources(maxResources) , m_maxResourceTTL(maxResourceTTL) , m_timeout(timeout) @@ -215,7 +231,7 @@ protected: poolCleanupTask.detach(); } - static std::shared_ptr get(const std::shared_ptr& _this) { + static provider::ResourceHandle get(const std::shared_ptr& _this) { auto readyPredicate = [&_this]() { return !_this->m_running || !_this->m_bench.empty() || _this->m_counter < _this->m_maxResources; }; std::unique_lock guard{_this->m_lock}; @@ -235,7 +251,10 @@ protected: if (_this->m_bench.size() > 0) { auto record = _this->m_bench.front(); _this->m_bench.pop_front(); - return std::make_shared(record.resource, _this); + return provider::ResourceHandle( + std::make_shared(record.resource, _this), + _this->m_invalidator + ); } else { ++ _this->m_counter; } @@ -243,7 +262,10 @@ protected: guard.unlock(); try { - return std::make_shared(_this->m_provider->get(), _this); + return provider::ResourceHandle( + std::make_shared(_this->m_provider->get(), _this), + _this->m_invalidator + ); } catch (...) { guard.lock(); --_this->m_counter; @@ -251,12 +273,11 @@ protected: } } - static async::CoroutineStarterForResult&> getAsync(const std::shared_ptr& _this) { + static async::CoroutineStarterForResult&> getAsync(const std::shared_ptr& _this) { - class GetCoroutine : public oatpp::async::CoroutineWithResult&> { + class GetCoroutine : public oatpp::async::CoroutineWithResult&> { private: std::shared_ptr m_pool; - std::chrono::steady_clock::time_point m_startTime{std::chrono::steady_clock::now()}; public: @@ -292,7 +313,10 @@ protected: auto record = m_pool->m_bench.front(); m_pool->m_bench.pop_front(); guard.unlock(); - return this->_return(std::make_shared(record.resource, m_pool)); + return this->_return(provider::ResourceHandle( + std::make_shared(record.resource, m_pool), + m_pool->m_invalidator + )); } else { ++ m_pool->m_counter; } @@ -303,8 +327,11 @@ protected: } - async::Action onGet(const std::shared_ptr& resource) { - return this->_return(std::make_shared(resource, m_pool)); + async::Action onGet(const provider::ResourceHandle& resource) { + return this->_return(provider::ResourceHandle( + std::make_shared(resource, m_pool), + m_pool->m_invalidator + )); } async::Action handleError(oatpp::async::Error* error) override { @@ -338,12 +365,6 @@ public: stop(); } - void invalidate(const std::shared_ptr& resource) { - auto proxy = std::static_pointer_cast(resource); - proxy->__pool__invalidate(); - m_provider->invalidate(proxy->__pool__getUnderlyingResource()); - } - void stop() { { @@ -437,7 +458,7 @@ public: * Get resource. * @return */ - std::shared_ptr get() override { + provider::ResourceHandle get() override { return TPool::get(this->shared_from_this()); } @@ -445,18 +466,10 @@ public: * Get resource asynchronously. * @return */ - async::CoroutineStarterForResult&> getAsync() override { + async::CoroutineStarterForResult&> getAsync() override { return TPool::getAsync(this->shared_from_this()); } - /** - * Invalidate resource. - * @param resource - */ - void invalidate(const std::shared_ptr& resource) override { - TPool::invalidate(resource); - } - /** * Stop pool.
* *Note: call to stop() may block.* diff --git a/src/oatpp/core/provider/Provider.hpp b/src/oatpp/core/provider/Provider.hpp index f101b89a..c98379bb 100644 --- a/src/oatpp/core/provider/Provider.hpp +++ b/src/oatpp/core/provider/Provider.hpp @@ -25,11 +25,122 @@ #ifndef oatpp_provider_Provider_hpp #define oatpp_provider_Provider_hpp +#include "Invalidator.hpp" + #include "oatpp/core/async/Coroutine.hpp" #include "oatpp/core/data/share/MemoryLabel.hpp" namespace oatpp { namespace provider { +/** + * Resource handle template. + * @tparam T + */ +template +struct ResourceHandleTemplate { + + /** + * Default constructor. + */ + ResourceHandleTemplate() = default; + + /** + * Nullptr constructor. + */ + ResourceHandleTemplate(std::nullptr_t) {} + + /** + * Constructor. + * @param resourceObject + * @param resourceInvalidator + */ + ResourceHandleTemplate(const PTR& resourceObject, + const std::shared_ptr> &resourceInvalidator) + : object(resourceObject), invalidator(resourceInvalidator) + {} + + /** + * Pointer to the resource. + */ + PTR object; + + /** + * Invalidator that can be used to invalidate the resource. + */ + std::shared_ptr> invalidator; + + inline bool operator == (std::nullptr_t) const { + return object.get() == nullptr; + } + + inline bool operator != (std::nullptr_t) const { + return object.get() != nullptr; + } + + explicit inline operator bool() const { + return object.operator bool(); + } + +}; + +/** + * Resource handle. + * @tparam T + */ +template +struct ResourceHandle : public ResourceHandleTemplate> { + + /** + * Default constructor. + */ + ResourceHandle() = default; + + /** + * Nullptr constructor. + */ + ResourceHandle(std::nullptr_t) {} + + /** + * Constructor. + * @param resourceObject + * @param resourceInvalidator + */ + ResourceHandle(const std::shared_ptr& resourceObject, + const std::shared_ptr>& resourceInvalidator) + : ResourceHandleTemplate>(resourceObject, resourceInvalidator) + {} + +}; + +/** + * Weak Resource handle. + * @tparam T + */ +template +struct WeakResourceHandle : public ResourceHandleTemplate> { + + /** + * Default constructor. + */ + WeakResourceHandle() = default; + + /** + * Nullptr constructor. + */ + WeakResourceHandle(std::nullptr_t) {} + + /** + * Constructor. + * @param resourceObject + * @param resourceInvalidator + */ + WeakResourceHandle(const std::weak_ptr& resourceObject, + const std::shared_ptr>& resourceInvalidator) + : ResourceHandleTemplate>(resourceObject, resourceInvalidator) + {} + +}; + /** * Abstract resource provider. * @tparam T - resource class. @@ -68,7 +179,7 @@ public: * Some optional properties that user might want to know.
* Note: All properties are optional and user should not rely on this. */ - const std::unordered_map& getProperties() const { + const std::unordered_map& getProperties() const { return m_properties; } @@ -87,20 +198,13 @@ public: * Get resource. * @return - resource. */ - virtual std::shared_ptr get() = 0; + virtual ResourceHandle get() = 0; /** * Get resource in Async manner. * @return - &id:oatpp::async::CoroutineStarterForResult; of `T`. */ - virtual async::CoroutineStarterForResult&> getAsync() = 0; - - /** - * Invalidate resource that was previously created by this provider.
- * Use-case: if provider is pool based - you can signal that this resource should not be reused anymore. - * @param resource - */ - virtual void invalidate(const std::shared_ptr& resource) = 0; + virtual async::CoroutineStarterForResult&> getAsync() = 0; /** * Stop provider and free associated resources. diff --git a/src/oatpp/network/ConnectionHandler.hpp b/src/oatpp/network/ConnectionHandler.hpp index 36b5d2d2..1e5468e7 100644 --- a/src/oatpp/network/ConnectionHandler.hpp +++ b/src/oatpp/network/ConnectionHandler.hpp @@ -25,6 +25,7 @@ #ifndef oatpp_network_ConnectionHandler_hpp #define oatpp_network_ConnectionHandler_hpp +#include "oatpp/core/provider/Provider.hpp" #include "oatpp/core/data/stream/Stream.hpp" #include @@ -53,10 +54,11 @@ public: /** * Handle provided connection. - * @param connection - see &id:oatpp::data::stream::IOStream;. + * @param connectionData - see &id:oatpp::data::stream::IOStream;. * @param params - accompanying parameters. */ - virtual void handleConnection(const std::shared_ptr& connection, const std::shared_ptr& params) = 0; + virtual void handleConnection(const provider::ResourceHandle& connectionData, + const std::shared_ptr& params) = 0; /** * Stop all threads here diff --git a/src/oatpp/network/ConnectionPool.cpp b/src/oatpp/network/ConnectionPool.cpp index c17fff75..a67328a3 100644 --- a/src/oatpp/network/ConnectionPool.cpp +++ b/src/oatpp/network/ConnectionPool.cpp @@ -24,44 +24,41 @@ #include "ConnectionPool.hpp" -#include -#include - namespace oatpp { namespace network { //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// // ConnectionAcquisitionProxy v_io_size ConnectionAcquisitionProxy::write(const void *buff, v_buff_size count, async::Action& action) { - return _obj->write(buff, count, action); + return _handle.object->write(buff, count, action); } v_io_size ConnectionAcquisitionProxy::read(void *buff, v_buff_size count, async::Action& action) { - return _obj->read(buff, count, action); + return _handle.object->read(buff, count, action); } void ConnectionAcquisitionProxy::setOutputStreamIOMode(oatpp::data::stream::IOMode ioMode) { - return _obj->setOutputStreamIOMode(ioMode); + return _handle.object->setOutputStreamIOMode(ioMode); } oatpp::data::stream::IOMode ConnectionAcquisitionProxy::getOutputStreamIOMode() { - return _obj->getOutputStreamIOMode(); + return _handle.object->getOutputStreamIOMode(); } void ConnectionAcquisitionProxy::setInputStreamIOMode(oatpp::data::stream::IOMode ioMode) { - return _obj->setInputStreamIOMode(ioMode); + return _handle.object->setInputStreamIOMode(ioMode); } oatpp::data::stream::IOMode ConnectionAcquisitionProxy::getInputStreamIOMode() { - return _obj->getInputStreamIOMode(); + return _handle.object->getInputStreamIOMode(); } oatpp::data::stream::Context& ConnectionAcquisitionProxy::getOutputStreamContext() { - return _obj->getOutputStreamContext(); + return _handle.object->getOutputStreamContext(); } oatpp::data::stream::Context& ConnectionAcquisitionProxy::getInputStreamContext() { - return _obj->getInputStreamContext(); + return _handle.object->getInputStreamContext(); } }} diff --git a/src/oatpp/network/ConnectionPool.hpp b/src/oatpp/network/ConnectionPool.hpp index 0fa3d181..14e495ed 100644 --- a/src/oatpp/network/ConnectionPool.hpp +++ b/src/oatpp/network/ConnectionPool.hpp @@ -36,7 +36,7 @@ namespace oatpp { namespace network { */ struct ConnectionAcquisitionProxy : public provider::AcquisitionProxy { - ConnectionAcquisitionProxy(const std::shared_ptr& resource, + ConnectionAcquisitionProxy(const provider::ResourceHandle& resource, const std::shared_ptr& pool) : provider::AcquisitionProxy(resource, pool) {} diff --git a/src/oatpp/network/Server.cpp b/src/oatpp/network/Server.cpp index b532cb66..7b062819 100644 --- a/src/oatpp/network/Server.cpp +++ b/src/oatpp/network/Server.cpp @@ -50,16 +50,16 @@ void Server::conditionalMainLoop() { while (getStatus() == STATUS_RUNNING) { if (m_condition()) { - std::shared_ptr connection; + provider::ResourceHandle connectionHandle; { std::lock_guard lg(m_spinlock); - connection = m_connectionProvider->get(); + connectionHandle = m_connectionProvider->get(); } - if (connection) { + if (connectionHandle.object) { if (getStatus() == STATUS_RUNNING) { if (m_condition()) { std::lock_guard lg(m_spinlock); - m_connectionHandler->handleConnection(connection, params /* null params */); + m_connectionHandler->handleConnection(connectionHandle, params /* null params */); } else { setStatus(STATUS_STOPPING); } @@ -79,16 +79,16 @@ void Server::mainLoop(Server *instance) { std::shared_ptr> params; while (instance->getStatus() == STATUS_RUNNING) { - std::shared_ptr connection; + provider::ResourceHandle connectionHandle; { std::lock_guard lg(instance->m_spinlock); - connection = instance->m_connectionProvider->get(); + connectionHandle = instance->m_connectionProvider->get(); } - if (connection) { + if (connectionHandle) { if (instance->getStatus() == STATUS_RUNNING) { std::lock_guard lg(instance->m_spinlock); - instance->m_connectionHandler->handleConnection(connection, params /* null params */); + instance->m_connectionHandler->handleConnection(connectionHandle, params /* null params */); } else { OATPP_LOGD("[oatpp::network::server::mainLoop()]", "Error. Server already stopped - closing connection..."); } diff --git a/src/oatpp/network/monitor/ConnectionMonitor.cpp b/src/oatpp/network/monitor/ConnectionMonitor.cpp index b4c6d75a..52fe1209 100644 --- a/src/oatpp/network/monitor/ConnectionMonitor.cpp +++ b/src/oatpp/network/monitor/ConnectionMonitor.cpp @@ -29,15 +29,21 @@ namespace oatpp { namespace network { namespace monitor { +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// +// ConnectionMonitor::ConnectionInvalidator + +void ConnectionMonitor::ConnectionInvalidator::invalidate(const std::shared_ptr &connection) { + auto proxy = std::static_pointer_cast(connection); + proxy->invalidate(); +} + //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// // ConnectionMonitor::ConnectionProxy ConnectionMonitor::ConnectionProxy::ConnectionProxy(const std::shared_ptr& monitor, - const std::shared_ptr& connectionProvider, - const std::shared_ptr& connection) + const provider::ResourceHandle& connectionHandle) : m_monitor(monitor) - , m_connectionProvider(connectionProvider) - , m_connection(connection) + , m_connectionHandle(connectionHandle) { m_stats.timestampCreated = base::Environment::getMicroTickCount(); } @@ -62,43 +68,43 @@ ConnectionMonitor::ConnectionProxy::~ConnectionProxy() { } v_io_size ConnectionMonitor::ConnectionProxy::read(void *buffer, v_buff_size count, async::Action& action) { - auto res = m_connection->read(buffer, count, action); + auto res = m_connectionHandle.object->read(buffer, count, action); std::lock_guard lock(m_statsMutex); m_monitor->onConnectionRead(m_stats, res); return res; } v_io_size ConnectionMonitor::ConnectionProxy::write(const void *data, v_buff_size count, async::Action& action) { - auto res = m_connection->write(data, count, action); + auto res = m_connectionHandle.object->write(data, count, action); std::lock_guard lock(m_statsMutex); m_monitor->onConnectionWrite(m_stats, res); return res; } void ConnectionMonitor::ConnectionProxy::setInputStreamIOMode(data::stream::IOMode ioMode) { - m_connection->setInputStreamIOMode(ioMode); + m_connectionHandle.object->setInputStreamIOMode(ioMode); } data::stream::IOMode ConnectionMonitor::ConnectionProxy::getInputStreamIOMode() { - return m_connection->getInputStreamIOMode(); + return m_connectionHandle.object->getInputStreamIOMode(); } data::stream::Context& ConnectionMonitor::ConnectionProxy::getInputStreamContext() { - return m_connection->getInputStreamContext(); + return m_connectionHandle.object->getInputStreamContext(); } void ConnectionMonitor::ConnectionProxy::setOutputStreamIOMode(data::stream::IOMode ioMode) { - m_connection->setOutputStreamIOMode(ioMode); + m_connectionHandle.object->setOutputStreamIOMode(ioMode); } data::stream::IOMode ConnectionMonitor::ConnectionProxy::getOutputStreamIOMode() { - return m_connection->getOutputStreamIOMode(); + return m_connectionHandle.object->getOutputStreamIOMode(); } data::stream::Context& ConnectionMonitor::ConnectionProxy::getOutputStreamContext() { - return m_connection->getOutputStreamContext(); + return m_connectionHandle.object->getOutputStreamContext(); } void ConnectionMonitor::ConnectionProxy::invalidate() { - m_connectionProvider->invalidate(m_connection); + m_connectionHandle.invalidator->invalidate(m_connectionHandle.object); } //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// @@ -264,52 +270,56 @@ void ConnectionMonitor::Monitor::stop() { // ConnectionMonitor ConnectionMonitor::ConnectionMonitor(const std::shared_ptr& connectionProvider) - : m_monitor(Monitor::createShared()) + : m_invalidator(std::make_shared()) + , m_monitor(Monitor::createShared()) , m_connectionProvider(connectionProvider) { } -std::shared_ptr ConnectionMonitor::get() { +provider::ResourceHandle ConnectionMonitor::get() { auto connection = m_connectionProvider->get(); if(!connection) { return nullptr; } - auto proxy = std::make_shared(m_monitor, m_connectionProvider, connection); + auto proxy = std::make_shared(m_monitor, connection); m_monitor->addConnection((v_uint64) proxy.get(), proxy); - return proxy; + return provider::ResourceHandle(proxy, m_invalidator); } -async::CoroutineStarterForResult&> +async::CoroutineStarterForResult&> ConnectionMonitor::getAsync() { - class GetConnectionCoroutine : public async::CoroutineWithResult&> { + class GetConnectionCoroutine : public async::CoroutineWithResult&> { private: std::shared_ptr m_monitor; std::shared_ptr m_connectionProvider; + std::shared_ptr m_invalidator; public: GetConnectionCoroutine(const std::shared_ptr& monitor, - const std::shared_ptr& connectionProvider) + const std::shared_ptr& connectionProvider, + const std::shared_ptr& invalidator) : m_monitor(monitor) , m_connectionProvider(connectionProvider) + , m_invalidator(invalidator) {} Action act() override { return m_connectionProvider->getAsync().callbackTo(&GetConnectionCoroutine::onConnection); } - Action onConnection(const std::shared_ptr& connection) { + Action onConnection(const provider::ResourceHandle& connection) { if(!connection) { return _return(nullptr); } - auto proxy = std::make_shared(m_monitor, m_connectionProvider, connection); + auto proxy = std::make_shared(m_monitor, connection); m_monitor->addConnection((v_uint64) proxy.get(), proxy); - return _return(proxy); + return _return(provider::ResourceHandle(proxy, m_invalidator)); } }; - return GetConnectionCoroutine::startForResult(m_monitor, m_connectionProvider); + return GetConnectionCoroutine::startForResult(m_monitor, m_connectionProvider, m_invalidator); } @@ -321,11 +331,6 @@ void ConnectionMonitor::addMetricsChecker(const std::shared_ptr& m_monitor->addMetricsChecker(checker); } -void ConnectionMonitor::invalidate(const std::shared_ptr& connection) { - auto proxy = std::static_pointer_cast(connection); - proxy->invalidate(); -} - void ConnectionMonitor::stop() { m_monitor->stop(); } diff --git a/src/oatpp/network/monitor/ConnectionMonitor.hpp b/src/oatpp/network/monitor/ConnectionMonitor.hpp index 766d57e0..970485d3 100644 --- a/src/oatpp/network/monitor/ConnectionMonitor.hpp +++ b/src/oatpp/network/monitor/ConnectionMonitor.hpp @@ -40,6 +40,15 @@ namespace oatpp { namespace network { namespace monitor { * and close those ones that are not satisfy selected rules. */ class ConnectionMonitor : public ClientConnectionProvider, public ServerConnectionProvider { +private: + + class ConnectionInvalidator : public provider::Invalidator { + public: + + void invalidate(const std::shared_ptr& connection) override; + + }; + private: class Monitor; // FWD @@ -48,16 +57,13 @@ private: friend Monitor; private: std::shared_ptr m_monitor; - /* provider which created this connection */ - std::shared_ptr m_connectionProvider; - std::shared_ptr m_connection; + provider::ResourceHandle m_connectionHandle; std::mutex m_statsMutex; ConnectionStats m_stats; public: ConnectionProxy(const std::shared_ptr& monitor, - const std::shared_ptr& connectionProvider, - const std::shared_ptr& connection); + const provider::ResourceHandle& connectionHandle); ~ConnectionProxy() override; @@ -118,6 +124,7 @@ private: }; private: + std::shared_ptr m_invalidator; std::shared_ptr m_monitor; std::shared_ptr m_connectionProvider; public: @@ -128,9 +135,9 @@ public: */ ConnectionMonitor(const std::shared_ptr& connectionProvider); - std::shared_ptr get() override; + provider::ResourceHandle get() override; - async::CoroutineStarterForResult&> getAsync() override; + async::CoroutineStarterForResult&> getAsync() override; void addStatCollector(const std::shared_ptr& collector); @@ -140,8 +147,6 @@ public: */ void addMetricsChecker(const std::shared_ptr& checker); - void invalidate(const std::shared_ptr& connection) override; - void stop() override; }; diff --git a/src/oatpp/network/tcp/client/ConnectionProvider.cpp b/src/oatpp/network/tcp/client/ConnectionProvider.cpp index 03446213..1896e20b 100644 --- a/src/oatpp/network/tcp/client/ConnectionProvider.cpp +++ b/src/oatpp/network/tcp/client/ConnectionProvider.cpp @@ -42,14 +42,40 @@ namespace oatpp { namespace network { namespace tcp { namespace client { +void ConnectionProvider::ConnectionInvalidator::invalidate(const std::shared_ptr& connection) { + + /************************************************ + * WARNING!!! + * + * shutdown(handle, SHUT_RDWR) <--- DO! + * close(handle); <--- DO NOT! + * + * DO NOT CLOSE file handle here - + * USE shutdown instead. + * Using close prevent FDs popping out of epoll, + * and they'll be stuck there forever. + ************************************************/ + + auto c = std::static_pointer_cast(connection); + v_io_handle handle = c->getHandle(); + +#if defined(WIN32) || defined(_WIN32) + shutdown(handle, SD_BOTH); +#else + shutdown(handle, SHUT_RDWR); +#endif + +} + ConnectionProvider::ConnectionProvider(const network::Address& address) - : m_address(address) + : m_invalidator(std::make_shared()) + , m_address(address) { setProperty(PROPERTY_HOST, address.host); setProperty(PROPERTY_PORT, oatpp::utils::conversion::int32ToStr(address.port)); } -std::shared_ptr ConnectionProvider::get() { +provider::ResourceHandle ConnectionProvider::get() { auto portStr = oatpp::utils::conversion::int32ToStr(m_address.port); @@ -123,14 +149,18 @@ std::shared_ptr ConnectionProvider::get() { } #endif - return std::make_shared(clientHandle); + return provider::ResourceHandle( + std::make_shared(clientHandle), + m_invalidator + ); } -oatpp::async::CoroutineStarterForResult&> ConnectionProvider::getAsync() { +oatpp::async::CoroutineStarterForResult&> ConnectionProvider::getAsync() { - class ConnectCoroutine : public oatpp::async::CoroutineWithResult&> { + class ConnectCoroutine : public oatpp::async::CoroutineWithResult&> { private: + std::shared_ptr m_connectionInvalidator; network::Address m_address; oatpp::v_io_handle m_clientHandle; private: @@ -139,8 +169,10 @@ oatpp::async::CoroutineStarterForResult& connectionInvalidator, + const network::Address& address) + : m_connectionInvalidator(connectionInvalidator) + , m_address(address) , m_result(nullptr) , m_currentResult(nullptr) , m_isHandleOpened(false) @@ -261,7 +293,10 @@ oatpp::async::CoroutineStarterForResult(m_clientHandle)); + return _return(provider::ResourceHandle( + std::make_shared(m_clientHandle), + m_connectionInvalidator + )); } if(errno == EALREADY || errno == EINPROGRESS) { return ioWait(m_clientHandle, oatpp::async::Action::IOEventType::IO_EVENT_WRITE); @@ -278,32 +313,7 @@ oatpp::async::CoroutineStarterForResult& connection) { - - /************************************************ - * WARNING!!! - * - * shutdown(handle, SHUT_RDWR) <--- DO! - * close(handle); <--- DO NOT! - * - * DO NOT CLOSE file handle here - - * USE shutdown instead. - * Using close prevent FDs popping out of epoll, - * and they'll be stuck there forever. - ************************************************/ - - auto c = std::static_pointer_cast(connection); - v_io_handle handle = c->getHandle(); - -#if defined(WIN32) || defined(_WIN32) - shutdown(handle, SD_BOTH); -#else - shutdown(handle, SHUT_RDWR); -#endif + return ConnectCoroutine::startForResult(m_invalidator, m_address); } diff --git a/src/oatpp/network/tcp/client/ConnectionProvider.hpp b/src/oatpp/network/tcp/client/ConnectionProvider.hpp index 5e657784..ff9aa29b 100644 --- a/src/oatpp/network/tcp/client/ConnectionProvider.hpp +++ b/src/oatpp/network/tcp/client/ConnectionProvider.hpp @@ -28,6 +28,7 @@ #include "oatpp/network/Address.hpp" #include "oatpp/network/ConnectionProvider.hpp" +#include "oatpp/core/provider/Invalidator.hpp" #include "oatpp/core/Types.hpp" namespace oatpp { namespace network { namespace tcp { namespace client { @@ -36,6 +37,17 @@ namespace oatpp { namespace network { namespace tcp { namespace client { * Simple provider of clinet TCP connections. */ class ConnectionProvider : public ClientConnectionProvider { +private: + + class ConnectionInvalidator : public provider::Invalidator { + public: + + void invalidate(const std::shared_ptr& connection) override; + + }; + +private: + std::shared_ptr m_invalidator; protected: network::Address m_address; public: @@ -66,20 +78,13 @@ public: * Get connection. * @return - `std::shared_ptr` to &id:oatpp::data::stream::IOStream;. */ - std::shared_ptr get() override; + provider::ResourceHandle get() override; /** * Get connection in asynchronous manner. * @return - &id:oatpp::async::CoroutineStarterForResult;. */ - oatpp::async::CoroutineStarterForResult&> getAsync() override; - - /** - * Call shutdown read and write on an underlying file descriptor. - * `connection` **MUST** be an object previously obtained from **THIS** connection provider. - * @param connection - */ - void invalidate(const std::shared_ptr& connection) override; + oatpp::async::CoroutineStarterForResult&> getAsync() override; /** * Get address - &id:oatpp::network::Address;. diff --git a/src/oatpp/network/tcp/server/ConnectionProvider.cpp b/src/oatpp/network/tcp/server/ConnectionProvider.cpp index 36bb2c04..0fbc5fa1 100644 --- a/src/oatpp/network/tcp/server/ConnectionProvider.cpp +++ b/src/oatpp/network/tcp/server/ConnectionProvider.cpp @@ -90,11 +90,41 @@ oatpp::data::stream::Context& ConnectionProvider::ExtendedConnection::getInputSt return m_context; } +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// +// ConnectionProvider::ConnectionInvalidator + +void ConnectionProvider::ConnectionInvalidator::invalidate(const std::shared_ptr& connection) { + + /************************************************ + * WARNING!!! + * + * shutdown(handle, SHUT_RDWR) <--- DO! + * close(handle); <--- DO NOT! + * + * DO NOT CLOSE file handle here - + * USE shutdown instead. + * Using close prevent FDs popping out of epoll, + * and they'll be stuck there forever. + ************************************************/ + + auto c = std::static_pointer_cast(connection); + v_io_handle handle = c->getHandle(); + +#if defined(WIN32) || defined(_WIN32) + shutdown(handle, SD_BOTH); +#else + shutdown(handle, SHUT_RDWR); +#endif + + +} + //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// // ConnectionProvider ConnectionProvider::ConnectionProvider(const network::Address& address, bool useExtendedConnections) - : m_address(address) + : m_invalidator(std::make_shared()) + , m_address(address) , m_closed(false) , m_useExtendedConnections(useExtendedConnections) { @@ -301,7 +331,7 @@ bool ConnectionProvider::prepareConnectionHandle(oatpp::v_io_handle handle) { } -std::shared_ptr ConnectionProvider::getDefaultConnection() { +provider::ResourceHandle ConnectionProvider::getDefaultConnection() { oatpp::v_io_handle handle = accept(m_serverHandle, nullptr, nullptr); @@ -310,14 +340,17 @@ std::shared_ptr ConnectionProvider::getDefaultCon } if(prepareConnectionHandle(handle)) { - return std::make_shared(handle); + return provider::ResourceHandle( + std::make_shared(handle), + m_invalidator + ); } return nullptr; } -std::shared_ptr ConnectionProvider::getExtendedConnection() { +provider::ResourceHandle ConnectionProvider::getExtendedConnection() { struct sockaddr_storage clientAddress; socklen_t clientAddressSize = sizeof(clientAddress); @@ -364,14 +397,17 @@ std::shared_ptr ConnectionProvider::getExtendedCo } if(prepareConnectionHandle(handle)) { - return std::make_shared(handle, std::move(properties)); + return provider::ResourceHandle( + std::make_shared(handle, std::move(properties)), + m_invalidator + ); } return nullptr; } -std::shared_ptr ConnectionProvider::get() { +provider::ResourceHandle ConnectionProvider::get() { fd_set set; struct timeval timeout; @@ -399,29 +435,4 @@ std::shared_ptr ConnectionProvider::get() { } -void ConnectionProvider::invalidate(const std::shared_ptr& connection) { - - /************************************************ - * WARNING!!! - * - * shutdown(handle, SHUT_RDWR) <--- DO! - * close(handle); <--- DO NOT! - * - * DO NOT CLOSE file handle here - - * USE shutdown instead. - * Using close prevent FDs popping out of epoll, - * and they'll be stuck there forever. - ************************************************/ - - auto c = std::static_pointer_cast(connection); - v_io_handle handle = c->getHandle(); - -#if defined(WIN32) || defined(_WIN32) - shutdown(handle, SD_BOTH); -#else - shutdown(handle, SHUT_RDWR); -#endif - -} - }}}} diff --git a/src/oatpp/network/tcp/server/ConnectionProvider.hpp b/src/oatpp/network/tcp/server/ConnectionProvider.hpp index 5c3fdf3e..871fa65d 100644 --- a/src/oatpp/network/tcp/server/ConnectionProvider.hpp +++ b/src/oatpp/network/tcp/server/ConnectionProvider.hpp @@ -37,6 +37,15 @@ namespace oatpp { namespace network { namespace tcp { namespace server { * Simple provider of TCP connections. */ class ConnectionProvider : public ServerConnectionProvider { +private: + + class ConnectionInvalidator : public provider::Invalidator { + public: + + void invalidate(const std::shared_ptr& connection) override; + + }; + public: /** @@ -75,6 +84,7 @@ public: }; private: + std::shared_ptr m_invalidator; network::Address m_address; std::atomic m_closed; oatpp::v_io_handle m_serverHandle; @@ -83,8 +93,8 @@ private: oatpp::v_io_handle instantiateServer(); private: bool prepareConnectionHandle(oatpp::v_io_handle handle); - std::shared_ptr getDefaultConnection(); - std::shared_ptr getExtendedConnection(); + provider::ResourceHandle getDefaultConnection(); + provider::ResourceHandle getExtendedConnection(); public: /** @@ -122,7 +132,7 @@ public: * Get incoming connection. * @return &id:oatpp::data::stream::IOStream;. */ - std::shared_ptr get() override; + provider::ResourceHandle get() override; /** * No need to implement this.
@@ -132,7 +142,7 @@ public: *
* *It may be implemented later* */ - oatpp::async::CoroutineStarterForResult&> getAsync() override { + oatpp::async::CoroutineStarterForResult&> getAsync() override { /* * No need to implement this. * For Asynchronous IO in oatpp it is considered to be a good practice @@ -144,13 +154,6 @@ public: throw std::runtime_error("[oatpp::network::tcp::server::ConnectionProvider::getAsync()]: Error. Not implemented."); } - /** - * Call shutdown read and write on an underlying file descriptor. - * `connection` **MUST** be an object previously obtained from **THIS** connection provider. - * @param connection - */ - void invalidate(const std::shared_ptr& connection) override; - /** * Get address - &id:oatpp::network::Address;. * @return diff --git a/src/oatpp/network/virtual_/client/ConnectionProvider.cpp b/src/oatpp/network/virtual_/client/ConnectionProvider.cpp index 38c355df..4462c5f7 100644 --- a/src/oatpp/network/virtual_/client/ConnectionProvider.cpp +++ b/src/oatpp/network/virtual_/client/ConnectionProvider.cpp @@ -26,8 +26,13 @@ namespace oatpp { namespace network { namespace virtual_ { namespace client { +void ConnectionProvider::ConnectionInvalidator::invalidate(const std::shared_ptr& connection) { + (void) connection; +} + ConnectionProvider::ConnectionProvider(const std::shared_ptr& interface) - : m_interface(interface) + : m_invalidator(std::make_shared()) + , m_interface(interface) , m_maxAvailableToRead(-1) , m_maxAvailableToWrite(-1) { @@ -43,7 +48,7 @@ void ConnectionProvider::stop() { } -std::shared_ptr ConnectionProvider::get() { +provider::ResourceHandle ConnectionProvider::get() { auto submission = m_interface->connect(); if(submission->isValid()) { auto socket = submission->getSocket(); @@ -51,26 +56,30 @@ std::shared_ptr ConnectionProvider::get() { socket->setOutputStreamIOMode(oatpp::data::stream::IOMode::BLOCKING); socket->setInputStreamIOMode(oatpp::data::stream::IOMode::BLOCKING); socket->setMaxAvailableToReadWrtie(m_maxAvailableToRead, m_maxAvailableToWrite); - return socket; + return provider::ResourceHandle(socket, m_invalidator); } } throw std::runtime_error("[oatpp::network::virtual_::client::getConnection()]: Error. Can't connect. " + *m_interface->getName()); } -oatpp::async::CoroutineStarterForResult&> ConnectionProvider::getAsync() { +oatpp::async::CoroutineStarterForResult&> +ConnectionProvider::getAsync() { - class ConnectCoroutine : public oatpp::async::CoroutineWithResult&> { + class ConnectCoroutine : public oatpp::async::CoroutineWithResult&> { private: + std::shared_ptr m_invalidator; std::shared_ptr m_interface; v_io_size m_maxAvailableToRead; v_io_size m_maxAvailableToWrite; std::shared_ptr m_submission; public: - ConnectCoroutine(const std::shared_ptr& interface, + ConnectCoroutine(const std::shared_ptr& invalidator, + const std::shared_ptr& interface, v_io_size maxAvailableToRead, v_io_size maxAvailableToWrite) - : m_interface(interface) + : m_invalidator(invalidator) + , m_interface(interface) , m_maxAvailableToRead(maxAvailableToRead) , m_maxAvailableToWrite(maxAvailableToWrite) {} @@ -93,7 +102,7 @@ oatpp::async::CoroutineStarterForResultsetOutputStreamIOMode(oatpp::data::stream::IOMode::ASYNCHRONOUS); socket->setInputStreamIOMode(oatpp::data::stream::IOMode::ASYNCHRONOUS); socket->setMaxAvailableToReadWrtie(m_maxAvailableToRead, m_maxAvailableToWrite); - return _return(socket); + return _return(provider::ResourceHandle(socket, m_invalidator)); } return waitRepeat(std::chrono::milliseconds(100)); @@ -105,7 +114,7 @@ oatpp::async::CoroutineStarterForResult { + public: + + void invalidate(const std::shared_ptr& connection) override; + + }; + +private: + std::shared_ptr m_invalidator; private: std::shared_ptr m_interface; v_io_size m_maxAvailableToRead; @@ -75,22 +86,13 @@ public: * Get connection. * @return - `std::shared_ptr` to &id:oatpp::data::stream::IOStream;. */ - std::shared_ptr get() override; + provider::ResourceHandle get() override; /** * Get connection in asynchronous manner. * @return - &id:oatpp::async::CoroutineStarterForResult;. */ - oatpp::async::CoroutineStarterForResult&> getAsync() override; - - /** - * Does nothing. - * @param connection - */ - void invalidate(const std::shared_ptr& connection) override { - (void)connection; - // DO Nothing. - } + oatpp::async::CoroutineStarterForResult&> getAsync() override; }; diff --git a/src/oatpp/network/virtual_/server/ConnectionProvider.cpp b/src/oatpp/network/virtual_/server/ConnectionProvider.cpp index 9ac2405f..5d92435b 100644 --- a/src/oatpp/network/virtual_/server/ConnectionProvider.cpp +++ b/src/oatpp/network/virtual_/server/ConnectionProvider.cpp @@ -26,8 +26,13 @@ namespace oatpp { namespace network { namespace virtual_ { namespace server { +void ConnectionProvider::ConnectionInvalidator::invalidate(const std::shared_ptr& connection) { + (void) connection; +} + ConnectionProvider::ConnectionProvider(const std::shared_ptr& interface) - : m_interface(interface) + : m_invalidator(std::make_shared()) + , m_interface(interface) , m_listenerLock(interface->bind()) , m_open(true) , m_maxAvailableToRead(-1) @@ -52,12 +57,12 @@ void ConnectionProvider::stop() { m_interface->notifyAcceptors(); } -std::shared_ptr ConnectionProvider::get() { +provider::ResourceHandle ConnectionProvider::get() { auto socket = m_interface->accept(m_open); if(socket) { socket->setMaxAvailableToReadWrtie(m_maxAvailableToRead, m_maxAvailableToWrite); } - return socket; + return provider::ResourceHandle(socket, m_invalidator); } }}}} diff --git a/src/oatpp/network/virtual_/server/ConnectionProvider.hpp b/src/oatpp/network/virtual_/server/ConnectionProvider.hpp index 99cd27da..016bdd13 100644 --- a/src/oatpp/network/virtual_/server/ConnectionProvider.hpp +++ b/src/oatpp/network/virtual_/server/ConnectionProvider.hpp @@ -37,6 +37,16 @@ namespace oatpp { namespace network { namespace virtual_ { namespace server { */ class ConnectionProvider : public oatpp::network::ServerConnectionProvider { private: + + class ConnectionInvalidator : public provider::Invalidator { + public: + + void invalidate(const std::shared_ptr& connection) override; + + }; + +private: + std::shared_ptr m_invalidator; std::shared_ptr m_interface; std::shared_ptr m_listenerLock; bool m_open; @@ -74,7 +84,7 @@ public: * Get incoming connection. * @return &id:oatpp::data::stream::IOStream;. */ - std::shared_ptr get() override; + provider::ResourceHandle get() override; /** * **NOT IMPLEMENTED!**
@@ -85,7 +95,7 @@ public: *
* *It may be implemented later.* */ - oatpp::async::CoroutineStarterForResult&> getAsync() override { + oatpp::async::CoroutineStarterForResult&> getAsync() override { /* * No need to implement this. * For Asynchronous IO in oatpp it is considered to be a good practice @@ -96,15 +106,6 @@ public: */ throw std::runtime_error("[oatpp::network::virtual_::server::ConnectionProvider::getConnectionAsync()]: Error. Not implemented."); } - - /** - * Does nothing. - * @param connection - */ - void invalidate(const std::shared_ptr& connection) override { - (void)connection; - // DO Nothing. - } }; diff --git a/src/oatpp/web/client/HttpRequestExecutor.cpp b/src/oatpp/web/client/HttpRequestExecutor.cpp index 469a9009..0e050ba1 100644 --- a/src/oatpp/web/client/HttpRequestExecutor.cpp +++ b/src/oatpp/web/client/HttpRequestExecutor.cpp @@ -40,10 +40,8 @@ namespace oatpp { namespace web { namespace client { //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// // HttpRequestExecutor::ConnectionProxy -HttpRequestExecutor::ConnectionProxy::ConnectionProxy(const std::shared_ptr& connectionProvider, - const std::shared_ptr& connection) - : m_connectionProvider(connectionProvider) - , m_connection(connection) +HttpRequestExecutor::ConnectionProxy::ConnectionProxy(const provider::ResourceHandle& connectionHandle) + : m_connectionHandle(connectionHandle) , m_valid(true) , m_invalidateOnDestroy(false) {} @@ -55,40 +53,40 @@ HttpRequestExecutor::ConnectionProxy::~ConnectionProxy() { } v_io_size HttpRequestExecutor::ConnectionProxy::read(void *buffer, v_buff_size count, async::Action& action) { - return m_connection->read(buffer, count, action); + return m_connectionHandle.object->read(buffer, count, action); } v_io_size HttpRequestExecutor::ConnectionProxy::write(const void *data, v_buff_size count, async::Action& action) { - return m_connection->write(data,count, action); + return m_connectionHandle.object->write(data,count, action); } void HttpRequestExecutor::ConnectionProxy::setInputStreamIOMode(data::stream::IOMode ioMode) { - m_connection->setInputStreamIOMode(ioMode); + m_connectionHandle.object->setInputStreamIOMode(ioMode); } data::stream::IOMode HttpRequestExecutor::ConnectionProxy::getInputStreamIOMode() { - return m_connection->getInputStreamIOMode(); + return m_connectionHandle.object->getInputStreamIOMode(); } data::stream::Context& HttpRequestExecutor::ConnectionProxy::getInputStreamContext() { - return m_connection->getInputStreamContext(); + return m_connectionHandle.object->getInputStreamContext(); } void HttpRequestExecutor::ConnectionProxy::setOutputStreamIOMode(data::stream::IOMode ioMode) { - return m_connection->setOutputStreamIOMode(ioMode); + return m_connectionHandle.object->setOutputStreamIOMode(ioMode); } data::stream::IOMode HttpRequestExecutor::ConnectionProxy::getOutputStreamIOMode() { - return m_connection->getOutputStreamIOMode(); + return m_connectionHandle.object->getOutputStreamIOMode(); } data::stream::Context& HttpRequestExecutor::ConnectionProxy::getOutputStreamContext() { - return m_connection->getOutputStreamContext(); + return m_connectionHandle.object->getOutputStreamContext(); } void HttpRequestExecutor::ConnectionProxy::invalidate() { if(m_valid) { - m_connectionProvider->invalidate(m_connection); + m_connectionHandle.invalidator->invalidate(m_connectionHandle.object); m_valid = false; } } @@ -138,7 +136,7 @@ std::shared_ptr HttpRequestExecutor::getC throw RequestExecutionError(RequestExecutionError::ERROR_CODE_CANT_CONNECT, "[oatpp::web::client::HttpRequestExecutor::getConnection()]: ConnectionProvider failed to provide Connection"); } - auto connectionProxy = std::make_shared(m_connectionProvider, connection); + auto connectionProxy = std::make_shared(connection); return std::make_shared(connectionProxy); } @@ -158,8 +156,8 @@ HttpRequestExecutor::getConnectionAsync() { return m_connectionProvider->getAsync().callbackTo(&GetConnectionCoroutine::onConnectionReady); } - Action onConnectionReady(const std::shared_ptr& connection) { - auto connectionProxy = std::make_shared(m_connectionProvider, connection); + Action onConnectionReady(const provider::ResourceHandle& connection) { + auto connectionProxy = std::make_shared(connection); return _return(std::make_shared(connectionProxy)); } diff --git a/src/oatpp/web/client/HttpRequestExecutor.hpp b/src/oatpp/web/client/HttpRequestExecutor.hpp index deef4b1d..bdecd8d6 100644 --- a/src/oatpp/web/client/HttpRequestExecutor.hpp +++ b/src/oatpp/web/client/HttpRequestExecutor.hpp @@ -48,15 +48,12 @@ public: class ConnectionProxy : public data::stream::IOStream { private: - /* provider which created this connection */ - std::shared_ptr m_connectionProvider; - std::shared_ptr m_connection; + provider::ResourceHandle m_connectionHandle; bool m_valid; bool m_invalidateOnDestroy; public: - ConnectionProxy(const std::shared_ptr& connectionProvider, - const std::shared_ptr& connection); + ConnectionProxy(const provider::ResourceHandle& connectionHandle); ~ConnectionProxy() override; diff --git a/src/oatpp/web/server/AsyncHttpConnectionHandler.cpp b/src/oatpp/web/server/AsyncHttpConnectionHandler.cpp index 2ac11d9a..716b2f9a 100644 --- a/src/oatpp/web/server/AsyncHttpConnectionHandler.cpp +++ b/src/oatpp/web/server/AsyncHttpConnectionHandler.cpp @@ -67,7 +67,7 @@ void AsyncHttpConnectionHandler::addResponseInterceptor(const std::shared_ptrresponseInterceptors.push_back(interceptor); } -void AsyncHttpConnectionHandler::handleConnection(const std::shared_ptr& connection, +void AsyncHttpConnectionHandler::handleConnection(const provider::ResourceHandle& connection, const std::shared_ptr& params) { @@ -75,8 +75,8 @@ void AsyncHttpConnectionHandler::handleConnection(const std::shared_ptrsetOutputStreamIOMode(oatpp::data::stream::IOMode::ASYNCHRONOUS); - connection->setInputStreamIOMode(oatpp::data::stream::IOMode::ASYNCHRONOUS); + connection.object->setOutputStreamIOMode(oatpp::data::stream::IOMode::ASYNCHRONOUS); + connection.object->setInputStreamIOMode(oatpp::data::stream::IOMode::ASYNCHRONOUS); m_executor->execute(m_components, connection, &m_spawns); diff --git a/src/oatpp/web/server/AsyncHttpConnectionHandler.hpp b/src/oatpp/web/server/AsyncHttpConnectionHandler.hpp index 7699432c..6a8d8f74 100644 --- a/src/oatpp/web/server/AsyncHttpConnectionHandler.hpp +++ b/src/oatpp/web/server/AsyncHttpConnectionHandler.hpp @@ -123,7 +123,8 @@ public: void addResponseInterceptor(const std::shared_ptr& interceptor); - void handleConnection(const std::shared_ptr& connection, const std::shared_ptr& params) override; + void handleConnection(const provider::ResourceHandle& connection, + const std::shared_ptr& params) override; /** * Will call m_executor.stop() diff --git a/src/oatpp/web/server/HttpConnectionHandler.cpp b/src/oatpp/web/server/HttpConnectionHandler.cpp index a0ddd67e..a271d7cc 100644 --- a/src/oatpp/web/server/HttpConnectionHandler.cpp +++ b/src/oatpp/web/server/HttpConnectionHandler.cpp @@ -62,7 +62,7 @@ void HttpConnectionHandler::addResponseInterceptor(const std::shared_ptrresponseInterceptors.push_back(interceptor); } -void HttpConnectionHandler::handleConnection(const std::shared_ptr& connection, +void HttpConnectionHandler::handleConnection(const provider::ResourceHandle& connection, const std::shared_ptr& params) { @@ -70,8 +70,8 @@ void HttpConnectionHandler::handleConnection(const std::shared_ptrsetOutputStreamIOMode(oatpp::data::stream::IOMode::BLOCKING); - connection->setInputStreamIOMode(oatpp::data::stream::IOMode::BLOCKING); + connection.object->setOutputStreamIOMode(oatpp::data::stream::IOMode::BLOCKING); + connection.object->setInputStreamIOMode(oatpp::data::stream::IOMode::BLOCKING); /* Create working thread */ std::thread thread(&HttpProcessor::Task::run, std::move(HttpProcessor::Task(m_components, connection, &m_spawns))); diff --git a/src/oatpp/web/server/HttpConnectionHandler.hpp b/src/oatpp/web/server/HttpConnectionHandler.hpp index a0ca83cd..0b8ba769 100644 --- a/src/oatpp/web/server/HttpConnectionHandler.hpp +++ b/src/oatpp/web/server/HttpConnectionHandler.hpp @@ -99,7 +99,8 @@ public: * Implementation of &id:oatpp::network::ConnectionHandler::handleConnection;. * @param connection - &id:oatpp::data::stream::IOStream; representing connection. */ - void handleConnection(const std::shared_ptr& connection, const std::shared_ptr& params) override; + void handleConnection(const provider::ResourceHandle& connection, + const std::shared_ptr& params) override; /** * Tell all worker threads to exit when done. diff --git a/src/oatpp/web/server/HttpProcessor.cpp b/src/oatpp/web/server/HttpProcessor.cpp index 3de10f9c..16dd0f33 100644 --- a/src/oatpp/web/server/HttpProcessor.cpp +++ b/src/oatpp/web/server/HttpProcessor.cpp @@ -72,13 +72,13 @@ HttpProcessor::Components::Components(const std::shared_ptr& pRouter // Other HttpProcessor::ProcessingResources::ProcessingResources(const std::shared_ptr& pComponents, - const std::shared_ptr& pConnection) + const provider::ResourceHandle& pConnection) : components(pComponents) , connection(pConnection) , headersInBuffer(components->config->headersInBufferInitial) , headersOutBuffer(components->config->headersOutBufferInitial) , headersReader(&headersInBuffer, components->config->headersReaderChunkSize, components->config->headersReaderMaxSize) - , inStream(data::stream::InputStreamBufferedProxy::createShared(connection, std::make_shared(data::buffer::IOBuffer::BUFFER_SIZE, 0))) + , inStream(data::stream::InputStreamBufferedProxy::createShared(connection.object, std::make_shared(data::buffer::IOBuffer::BUFFER_SIZE, 0))) {} std::shared_ptr @@ -147,7 +147,7 @@ HttpProcessor::ConnectionState HttpProcessor::processNextRequest(ProcessingResou connectionState = ConnectionState::CLOSING; } else { - request = protocol::http::incoming::Request::createShared(resources.connection, + request = protocol::http::incoming::Request::createShared(resources.connection.object, headersReadResult.startingLine, headersReadResult.headers, resources.inStream, @@ -209,7 +209,7 @@ HttpProcessor::ConnectionState HttpProcessor::processNextRequest(ProcessingResou auto contentEncoderProvider = protocol::http::utils::CommunicationUtils::selectEncoder(request, resources.components->contentEncodingProviders); - response->send(resources.connection.get(), &resources.headersOutBuffer, contentEncoderProvider.get()); + response->send(resources.connection.object.get(), &resources.headersOutBuffer, contentEncoderProvider.get()); return connectionState; @@ -219,7 +219,7 @@ HttpProcessor::ConnectionState HttpProcessor::processNextRequest(ProcessingResou // Task HttpProcessor::Task::Task(const std::shared_ptr& components, - const std::shared_ptr& connection, + const provider::ResourceHandle& connection, std::atomic_long *taskCounter) : m_components(components) , m_connection(connection) @@ -264,7 +264,7 @@ HttpProcessor::Task &HttpProcessor::Task::operator=(HttpProcessor::Task &&t) { void HttpProcessor::Task::run(){ - m_connection->initContexts(); + m_connection.object->initContexts(); ProcessingResources resources(m_components, m_connection); @@ -293,14 +293,14 @@ HttpProcessor::Task::~Task() { // HttpProcessor::Coroutine HttpProcessor::Coroutine::Coroutine(const std::shared_ptr& components, - const std::shared_ptr& connection, + const provider::ResourceHandle& connection, std::atomic_long *taskCounter) : m_components(components) , m_connection(connection) , m_headersInBuffer(components->config->headersInBufferInitial) , m_headersReader(&m_headersInBuffer, components->config->headersReaderChunkSize, components->config->headersReaderMaxSize) , m_headersOutBuffer(std::make_shared(components->config->headersOutBufferInitial)) - , m_inStream(data::stream::InputStreamBufferedProxy::createShared(m_connection, std::make_shared(data::buffer::IOBuffer::BUFFER_SIZE, 0))) + , m_inStream(data::stream::InputStreamBufferedProxy::createShared(m_connection.object, std::make_shared(data::buffer::IOBuffer::BUFFER_SIZE, 0))) , m_connectionState(ConnectionState::ALIVE) , m_counter(taskCounter) { @@ -312,7 +312,7 @@ HttpProcessor::Coroutine::~Coroutine() { } HttpProcessor::Coroutine::Action HttpProcessor::Coroutine::act() { - return m_connection->initContextsAsync().next(yieldTo(&HttpProcessor::Coroutine::parseHeaders)); + return m_connection.object->initContextsAsync().next(yieldTo(&HttpProcessor::Coroutine::parseHeaders)); } HttpProcessor::Coroutine::Action HttpProcessor::Coroutine::parseHeaders() { @@ -321,7 +321,7 @@ HttpProcessor::Coroutine::Action HttpProcessor::Coroutine::parseHeaders() { oatpp::async::Action HttpProcessor::Coroutine::onHeadersParsed(const RequestHeadersReader::Result& headersReadResult) { - m_currentRequest = protocol::http::incoming::Request::createShared(m_connection, + m_currentRequest = protocol::http::incoming::Request::createShared(m_connection.object, headersReadResult.startingLine, headersReadResult.headers, m_inStream, @@ -404,7 +404,7 @@ HttpProcessor::Coroutine::Action HttpProcessor::Coroutine::onResponseFormed() { auto contentEncoderProvider = protocol::http::utils::CommunicationUtils::selectEncoder(m_currentRequest, m_components->contentEncodingProviders); - return protocol::http::outgoing::Response::sendAsync(m_currentResponse, m_connection, m_headersOutBuffer, contentEncoderProvider) + return protocol::http::outgoing::Response::sendAsync(m_currentResponse, m_connection.object, m_headersOutBuffer, contentEncoderProvider) .next(yieldTo(&HttpProcessor::Coroutine::onRequestDone)); } diff --git a/src/oatpp/web/server/HttpProcessor.hpp b/src/oatpp/web/server/HttpProcessor.hpp index 87393dd8..ccc618e8 100644 --- a/src/oatpp/web/server/HttpProcessor.hpp +++ b/src/oatpp/web/server/HttpProcessor.hpp @@ -161,10 +161,10 @@ private: struct ProcessingResources { ProcessingResources(const std::shared_ptr& pComponents, - const std::shared_ptr& pConnection); + const provider::ResourceHandle& pConnection); std::shared_ptr components; - std::shared_ptr connection; + provider::ResourceHandle connection; oatpp::data::stream::BufferOutputStream headersInBuffer; oatpp::data::stream::BufferOutputStream headersOutBuffer; RequestHeadersReader headersReader; @@ -189,7 +189,7 @@ public: class Task : public base::Countable { private: std::shared_ptr m_components; - std::shared_ptr m_connection; + provider::ResourceHandle m_connection; std::atomic_long *m_counter; public: @@ -199,7 +199,7 @@ public: * @param connection - &id:oatpp::data::stream::IOStream;. */ Task(const std::shared_ptr& components, - const std::shared_ptr& connection, + const provider::ResourceHandle& connection, std::atomic_long *taskCounter); /** @@ -248,7 +248,7 @@ public: class Coroutine : public oatpp::async::Coroutine { private: std::shared_ptr m_components; - std::shared_ptr m_connection; + provider::ResourceHandle m_connection; oatpp::data::stream::BufferOutputStream m_headersInBuffer; RequestHeadersReader m_headersReader; std::shared_ptr m_headersOutBuffer; @@ -268,7 +268,7 @@ public: * @param connection - &id:oatpp::data::stream::IOStream;. */ Coroutine(const std::shared_ptr& components, - const std::shared_ptr& connection, + const provider::ResourceHandle& connection, std::atomic_long *taskCounter); Action act() override; diff --git a/test/oatpp/core/provider/PoolTemplateTest.cpp b/test/oatpp/core/provider/PoolTemplateTest.cpp index 8718b3d3..22a83687 100644 --- a/test/oatpp/core/provider/PoolTemplateTest.cpp +++ b/test/oatpp/core/provider/PoolTemplateTest.cpp @@ -37,15 +37,28 @@ struct Resource { }; class Provider : public oatpp::provider::Provider { +private: + + class ResourceInvalidator : public oatpp::provider::Invalidator { + public: + + void invalidate(const std::shared_ptr& resource) override { + (void) resource; + } + + }; + +private: + std::shared_ptr m_invalidator; public: - std::shared_ptr get() override { - return std::make_shared(); + oatpp::provider::ResourceHandle get() override { + return oatpp::provider::ResourceHandle(std::make_shared(), m_invalidator); } - async::CoroutineStarterForResult &> getAsync() override { + async::CoroutineStarterForResult &> getAsync() override { - class GetCoroutine : public oatpp::async::CoroutineWithResult&> { + class GetCoroutine : public oatpp::async::CoroutineWithResult&> { private: Provider* m_provider; public: @@ -55,7 +68,7 @@ public: {} Action act() override { - return _return(std::make_shared()); + return _return(oatpp::provider::ResourceHandle(std::make_shared(), m_provider->m_invalidator)); } }; @@ -63,10 +76,6 @@ public: return GetCoroutine::startForResult(this); } - void invalidate(const std::shared_ptr& resource) override { - (void) resource; - } - void stop() override { OATPP_LOGD("Provider", "stop()"); } @@ -75,7 +84,7 @@ public: struct AcquisitionProxy : public oatpp::provider::AcquisitionProxy { - AcquisitionProxy(const std::shared_ptr& resource, const std::shared_ptr& pool) + AcquisitionProxy(const oatpp::provider::ResourceHandle& resource, const std::shared_ptr& pool) : oatpp::provider::AcquisitionProxy(resource, pool) {} @@ -87,11 +96,11 @@ struct Pool : public oatpp::provider::PoolTemplate { : oatpp::provider::PoolTemplate(provider, maxResources, maxResourceTTL, timeout) {} - static std::shared_ptr get(const std::shared_ptr& _this) { + static oatpp::provider::ResourceHandle get(const std::shared_ptr& _this) { return oatpp::provider::PoolTemplate::get(_this); } - static async::CoroutineStarterForResult&> getAsync(const std::shared_ptr& _this) { + static async::CoroutineStarterForResult&> getAsync(const std::shared_ptr& _this) { return oatpp::provider::PoolTemplate::getAsync(_this); } @@ -109,10 +118,10 @@ struct Pool : public oatpp::provider::PoolTemplate { class ClientCoroutine : public oatpp::async::Coroutine { private: std::shared_ptr> m_pool; - std::promise>* m_promise; + std::promise>* m_promise; public: - ClientCoroutine(const std::shared_ptr>& pool, std::promise>* promise) + ClientCoroutine(const std::shared_ptr>& pool, std::promise>* promise) : m_pool(pool) , m_promise(promise) {} @@ -121,7 +130,7 @@ public: return Pool::getAsync(m_pool).callbackTo(&ClientCoroutine::onGet); } - Action onGet(const std::shared_ptr& resource) { + Action onGet(const oatpp::provider::ResourceHandle& resource) { m_promise->set_value(resource); return finish(); } @@ -137,8 +146,8 @@ void PoolTemplateTest::onRun() { { OATPP_LOGD(TAG, "Synchronously with timeout"); auto poolTemplate = Pool::createShared(provider, maxResources, std::chrono::seconds(10), std::chrono::milliseconds(500)); - - std::shared_ptr resource = Pool::get(poolTemplate); + + oatpp::provider::ResourceHandle resource = Pool::get(poolTemplate); OATPP_ASSERT(resource != nullptr); OATPP_ASSERT(Pool::get(poolTemplate) == nullptr); @@ -150,9 +159,9 @@ void PoolTemplateTest::onRun() { OATPP_LOGD(TAG, "Synchronously without timeout"); auto poolTemplate = Pool::createShared(provider, maxResources, std::chrono::seconds(10), std::chrono::milliseconds::zero()); - std::shared_ptr resource = Pool::get(poolTemplate); + oatpp::provider::ResourceHandle resource = Pool::get(poolTemplate); OATPP_ASSERT(resource != nullptr); - std::future> futureResource = std::async(std::launch::async, [&poolTemplate]() { + std::future> futureResource = std::async(std::launch::async, [&poolTemplate]() { return Pool::get(poolTemplate); }); OATPP_ASSERT(futureResource.wait_for(std::chrono::seconds(1)) == std::future_status::timeout); @@ -166,19 +175,24 @@ void PoolTemplateTest::onRun() { oatpp::async::Executor executor(1, 1, 1); auto poolTemplate = Pool::createShared(provider, maxResources, std::chrono::seconds(10), std::chrono::milliseconds(500)); - std::shared_ptr resource; + oatpp::provider::ResourceHandle resourceHandle; { - std::promise> promise; + std::promise> promise; auto future = promise.get_future(); executor.execute(poolTemplate, &promise); - resource = future.get(); - OATPP_ASSERT(resource != nullptr); + resourceHandle = future.get(); + OATPP_ASSERT(resourceHandle != nullptr); + OATPP_ASSERT(resourceHandle.object != nullptr) + OATPP_ASSERT(resourceHandle.invalidator != nullptr) } { - std::promise> promise; + std::promise> promise; auto future = promise.get_future(); executor.execute(poolTemplate, &promise); - OATPP_ASSERT(future.get() == nullptr); + resourceHandle = future.get(); + OATPP_ASSERT(resourceHandle == nullptr); + OATPP_ASSERT(resourceHandle.object == nullptr) + OATPP_ASSERT(resourceHandle.invalidator == nullptr) } poolTemplate->stop(); @@ -190,10 +204,10 @@ void PoolTemplateTest::onRun() { oatpp::async::Executor executor(1, 1, 1); auto poolTemplate = Pool::createShared(provider, maxResources, std::chrono::seconds(10), std::chrono::milliseconds::zero()); - std::shared_ptr resource = Pool::get(poolTemplate); + oatpp::provider::ResourceHandle resource = Pool::get(poolTemplate); OATPP_ASSERT(resource != nullptr); - std::promise> promise; + std::promise> promise; auto future = promise.get_future(); executor.execute(poolTemplate, &promise); OATPP_ASSERT(future.wait_for(std::chrono::seconds(1)) == std::future_status::timeout); diff --git a/test/oatpp/core/provider/PoolTest.cpp b/test/oatpp/core/provider/PoolTest.cpp index 556c3162..318c08e3 100644 --- a/test/oatpp/core/provider/PoolTest.cpp +++ b/test/oatpp/core/provider/PoolTest.cpp @@ -59,16 +59,31 @@ public: class Provider : public oatpp::provider::Provider { private: + + class ResourceInvalidator : public oatpp::provider::Invalidator { + public: + + void invalidate(const std::shared_ptr& resource) override { + (void) resource; + } + + }; + +private: + std::shared_ptr m_invalidator = std::make_shared(); std::atomic m_id; public: - std::shared_ptr get() override { - return std::make_shared(++m_id); + oatpp::provider::ResourceHandle get() override { + return oatpp::provider::ResourceHandle( + std::make_shared(++m_id), + m_invalidator + ); } - async::CoroutineStarterForResult &> getAsync() override { + async::CoroutineStarterForResult &> getAsync() override { - class GetCoroutine : public oatpp::async::CoroutineWithResult&> { + class GetCoroutine : public oatpp::async::CoroutineWithResult&> { private: Provider* m_provider; public: @@ -78,7 +93,10 @@ public: {} Action act() override { - return _return(std::make_shared(++ m_provider->m_id)); + return _return(oatpp::provider::ResourceHandle( + std::make_shared(++ m_provider->m_id), + m_provider->m_invalidator + )); } }; @@ -86,10 +104,6 @@ public: return GetCoroutine::startForResult(this); } - void invalidate(const std::shared_ptr& resource) override { - (void) resource; - } - void stop() override { OATPP_LOGD("Provider", "stop()"); } @@ -103,12 +117,13 @@ public: struct AcquisitionProxy : public oatpp::provider::AcquisitionProxy { - AcquisitionProxy(const std::shared_ptr& resource, const std::shared_ptr& pool) + AcquisitionProxy(const oatpp::provider::ResourceHandle& resource, + const std::shared_ptr& pool) : oatpp::provider::AcquisitionProxy(resource, pool) {} v_int64 myId() override { - return _obj->myId(); + return _handle.object->myId(); } }; @@ -119,7 +134,7 @@ typedef oatpp::provider::Pool, Resource, Acq class ClientCoroutine : public oatpp::async::Coroutine { private: std::shared_ptr m_pool; - std::shared_ptr m_resource; + oatpp::provider::ResourceHandle m_resource; bool m_invalidate; public: @@ -132,14 +147,14 @@ public: return m_pool->getAsync().callbackTo(&ClientCoroutine::onGet); } - Action onGet(const std::shared_ptr& resource) { + Action onGet(const oatpp::provider::ResourceHandle& resource) { m_resource = resource; return waitFor(std::chrono::milliseconds(100)).next(yieldTo(&ClientCoroutine::onUse)); } Action onUse() { if(m_invalidate) { - m_pool->invalidate(m_resource); + m_resource.invalidator->invalidate(m_resource.object); } return finish(); } @@ -150,7 +165,7 @@ void clientMethod(std::shared_ptr pool, bool invalidate) { auto resource = pool->get(); std::this_thread::sleep_for(std::chrono::milliseconds(100)); if(invalidate) { - pool->invalidate(resource); + resource.invalidator->invalidate(resource.object); } } diff --git a/test/oatpp/network/ConnectionPoolTest.cpp b/test/oatpp/network/ConnectionPoolTest.cpp index d3ba7b8b..c6060364 100644 --- a/test/oatpp/network/ConnectionPoolTest.cpp +++ b/test/oatpp/network/ConnectionPoolTest.cpp @@ -75,6 +75,18 @@ public: }; class StubStreamProvider : public oatpp::network::ConnectionProvider { +private: + + class Invalidator : public oatpp::provider::Invalidator { + public: + void invalidate(const std::shared_ptr& connection) override { + (void)connection; + // DO Nothing. + } + }; + +private: + std::shared_ptr m_invalidator = std::make_shared(); public: StubStreamProvider() @@ -83,24 +95,36 @@ public: std::atomic counter; - std::shared_ptr get() override { + oatpp::provider::ResourceHandle get() override { ++ counter; - return std::make_shared(); + return oatpp::provider::ResourceHandle( + std::make_shared(), + m_invalidator + ); } - oatpp::async::CoroutineStarterForResult&> getAsync() override { + oatpp::async::CoroutineStarterForResult&> getAsync() override { - class ConnectionCoroutine : public oatpp::async::CoroutineWithResult&> { + class ConnectionCoroutine : public oatpp::async::CoroutineWithResult&> { + private: + std::shared_ptr m_invalidator; public: + ConnectionCoroutine(const std::shared_ptr& invalidator) + : m_invalidator(invalidator) + {} + Action act() override { - return _return(std::make_shared()); + return _return(oatpp::provider::ResourceHandle( + std::make_shared(), + m_invalidator + )); } }; ++ counter; - return ConnectionCoroutine::startForResult(); + return ConnectionCoroutine::startForResult(m_invalidator); } @@ -108,17 +132,12 @@ public: // DO NOTHING } - void invalidate(const std::shared_ptr& connection) override { - (void)connection; - // DO Nothing. - } - }; class ClientCoroutine : public oatpp::async::Coroutine { private: std::shared_ptr m_pool; - std::shared_ptr m_connection; + oatpp::provider::ResourceHandle m_connection; v_int32 m_repeats; bool m_invalidate; public: @@ -133,7 +152,7 @@ public: return m_pool->getAsync().callbackTo(&ClientCoroutine::onConnection); } - Action onConnection(const std::shared_ptr& connection) { + Action onConnection(const oatpp::provider::ResourceHandle& connection) { m_connection = connection; return yieldTo(&ClientCoroutine::useConnection); } @@ -144,7 +163,7 @@ public: return waitFor(std::chrono::milliseconds(100)).next(yieldTo(&ClientCoroutine::useConnection)); } if(m_invalidate) { - m_pool->invalidate(m_connection); + m_connection.invalidator->invalidate(m_connection.object); } return finish(); } @@ -155,7 +174,7 @@ void clientMethod(std::shared_ptr pool, bool invalidate) { auto connection = pool->get(); std::this_thread::sleep_for(std::chrono::milliseconds(100)); if(invalidate) { - pool->invalidate(connection); + connection.invalidator->invalidate(connection.object); } } diff --git a/test/oatpp/web/PipelineAsyncTest.cpp b/test/oatpp/web/PipelineAsyncTest.cpp index cbf361af..76467c4f 100644 --- a/test/oatpp/web/PipelineAsyncTest.cpp +++ b/test/oatpp/web/PipelineAsyncTest.cpp @@ -139,7 +139,7 @@ void PipelineAsyncTest::onRun() { OATPP_COMPONENT(std::shared_ptr, clientConnectionProvider); auto connection = clientConnectionProvider->get(); - connection->setInputStreamIOMode(oatpp::data::stream::IOMode::BLOCKING); + connection.object->setInputStreamIOMode(oatpp::data::stream::IOMode::BLOCKING); std::thread pipeInThread([this, connection] { @@ -153,7 +153,7 @@ void PipelineAsyncTest::onRun() { oatpp::data::buffer::IOBuffer ioBuffer; - oatpp::data::stream::transfer(&inputStream, connection.get(), 0, ioBuffer.getData(), ioBuffer.getSize()); + oatpp::data::stream::transfer(&inputStream, connection.object.get(), 0, ioBuffer.getData(), ioBuffer.getSize()); }); @@ -163,7 +163,7 @@ void PipelineAsyncTest::onRun() { oatpp::data::stream::ChunkedBuffer receiveStream; oatpp::data::buffer::IOBuffer ioBuffer; - auto res = oatpp::data::stream::transfer(connection.get(), &receiveStream, sample->size() * m_pipelineSize, ioBuffer.getData(), ioBuffer.getSize()); + auto res = oatpp::data::stream::transfer(connection.object.get(), &receiveStream, sample->size() * m_pipelineSize, ioBuffer.getData(), ioBuffer.getSize()); auto result = receiveStream.toString(); diff --git a/test/oatpp/web/PipelineTest.cpp b/test/oatpp/web/PipelineTest.cpp index 25e6345a..e5d64987 100644 --- a/test/oatpp/web/PipelineTest.cpp +++ b/test/oatpp/web/PipelineTest.cpp @@ -134,7 +134,7 @@ void PipelineTest::onRun() { OATPP_COMPONENT(std::shared_ptr, clientConnectionProvider); auto connection = clientConnectionProvider->get(); - connection->setInputStreamIOMode(oatpp::data::stream::IOMode::BLOCKING); + connection.object->setInputStreamIOMode(oatpp::data::stream::IOMode::BLOCKING); std::thread pipeInThread([this, connection] { @@ -151,7 +151,7 @@ void PipelineTest::onRun() { oatpp::data::buffer::IOBuffer ioBuffer; - auto res = oatpp::data::stream::transfer(&inputStream, connection.get(), 0, ioBuffer.getData(), ioBuffer.getSize()); + auto res = oatpp::data::stream::transfer(&inputStream, connection.object.get(), 0, ioBuffer.getData(), ioBuffer.getSize()); }); @@ -164,7 +164,7 @@ void PipelineTest::onRun() { v_io_size transferSize = sample->size() * m_pipelineSize; OATPP_LOGD(TAG, "want to Receive %d bytes", transferSize); - auto res = oatpp::data::stream::transfer(connection.get(), &receiveStream, transferSize, ioBuffer.getData(), ioBuffer.getSize()); + auto res = oatpp::data::stream::transfer(connection.object.get(), &receiveStream, transferSize, ioBuffer.getData(), ioBuffer.getSize()); auto result = receiveStream.toString();