provider: Decouple Provider and Invalidator.

This commit is contained in:
lganzzzo 2021-10-19 03:34:16 +03:00
parent cc6c54f3a9
commit d74d6b2308
31 changed files with 609 additions and 334 deletions

View File

@ -126,6 +126,7 @@ add_library(oatpp
oatpp/core/parser/Caret.hpp
oatpp/core/parser/ParsingError.cpp
oatpp/core/parser/ParsingError.hpp
oatpp/core/provider/Invalidator.hpp
oatpp/core/provider/Pool.hpp
oatpp/core/provider/Provider.hpp
oatpp/core/utils/Binary.cpp

View File

@ -0,0 +1,57 @@
/***************************************************************************
*
* Project _____ __ ____ _ _
* ( _ ) /__\ (_ _)_| |_ _| |_
* )(_)( /(__)\ )( (_ _)(_ _)
* (_____)(__)(__)(__) |_| |_|
*
*
* Copyright 2018-present, Leonid Stryzhevskyi <lganzzzo@gmail.com>
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
***************************************************************************/
#ifndef oatpp_provider_Invalidator_hpp
#define oatpp_provider_Invalidator_hpp
#include "oatpp/core/base/Countable.hpp"
#include <memory>
namespace oatpp { namespace provider {
/**
* Abstract resource invalidator.
* @tparam T - resource class.
*/
template<class T>
class Invalidator : public oatpp::base::Countable {
public:
/**
* Default virtual destructor.
*/
virtual ~Invalidator() = default;
/**
* Invalidate resource that was previously created by the correspondent provider. <br>
* Use-case: if provider is pool based - you can signal that this resource should not be reused anymore.
* @param resource
*/
virtual void invalidate(const std::shared_ptr<T> &resource) = 0;
};
}}
#endif //oatpp_provider_Invalidator_hpp

View File

@ -56,12 +56,12 @@ private:
m_valid = false;
}
std::shared_ptr<TResource> __pool__getUnderlyingResource() {
return _obj;
provider::ResourceHandle<TResource> __pool__getUnderlyingResource() {
return _handle;
}
protected:
std::shared_ptr<TResource> _obj;
provider::ResourceHandle<TResource> _handle;
private:
std::shared_ptr<PoolInstance> m_pool;
bool m_valid;
@ -72,8 +72,8 @@ public:
* @param resource - base resource.
* @param pool - &l:AcquisitionProxy::PoolInstance;.
*/
AcquisitionProxy(const std::shared_ptr<TResource>& resource, const std::shared_ptr<PoolInstance>& pool)
: _obj(resource)
AcquisitionProxy(const provider::ResourceHandle<TResource>& resource, const std::shared_ptr<PoolInstance>& pool)
: _handle(resource)
, m_pool(pool)
, m_valid(true)
{}
@ -82,7 +82,7 @@ public:
* Virtual destructor.
*/
virtual ~AcquisitionProxy() {
m_pool->release(std::move(_obj), m_valid);
m_pool->release(std::move(_handle), m_valid);
}
};
@ -93,10 +93,24 @@ class PoolTemplate : public oatpp::base::Countable, public async::CoroutineWaitL
private:
struct PoolRecord {
std::shared_ptr<TResource> resource;
provider::ResourceHandle<TResource> resource;
v_int64 timestamp;
};
private:
class ResourceInvalidator : public provider::Invalidator<TResource> {
public:
void invalidate(const std::shared_ptr<TResource>& resource) override {
auto proxy = std::static_pointer_cast<AcquisitionProxyImpl>(resource);
proxy->__pool__invalidate();
const auto& handle = proxy->__pool__getUnderlyingResource();
handle.invalidator->invalidate(handle.object);
}
};
private:
void onNewItem(async::CoroutineWaitList& list) override {
@ -116,7 +130,7 @@ private:
}
void release(std::shared_ptr<TResource>&& resource, bool canReuse) {
void release(provider::ResourceHandle<TResource>&& resource, bool canReuse) {
{
@ -155,7 +169,7 @@ private:
auto elapsed = ticks - i->timestamp;
if(elapsed > pool->m_maxResourceTTL) {
pool->m_provider->invalidate(i->resource);
i->resource.invalidator->invalidate(i->resource.object);
i = pool->m_bench.erase(i);
pool->m_counter --;
} else {
@ -175,7 +189,7 @@ private:
std::lock_guard<std::mutex> guard(pool->m_lock);
auto i = pool->m_bench.begin();
while (i != pool->m_bench.end()) {
pool->m_provider->invalidate(i->resource);
i->resource.invalidator->invalidate(i->resource.object);
i = pool->m_bench.erase(i);
pool->m_counter --;
}
@ -189,6 +203,7 @@ private:
}
private:
std::shared_ptr<ResourceInvalidator> m_invalidator;
std::shared_ptr<Provider<TResource>> m_provider;
v_int64 m_counter{0};
v_int64 m_maxResources;
@ -204,7 +219,8 @@ private:
protected:
PoolTemplate(const std::shared_ptr<Provider<TResource>>& provider, v_int64 maxResources, v_int64 maxResourceTTL, const std::chrono::duration<v_int64, std::micro>& timeout)
: m_provider(provider)
: m_invalidator(std::make_shared<ResourceInvalidator>())
, m_provider(provider)
, m_maxResources(maxResources)
, m_maxResourceTTL(maxResourceTTL)
, m_timeout(timeout)
@ -215,7 +231,7 @@ protected:
poolCleanupTask.detach();
}
static std::shared_ptr<TResource> get(const std::shared_ptr<PoolTemplate>& _this) {
static provider::ResourceHandle<TResource> get(const std::shared_ptr<PoolTemplate>& _this) {
auto readyPredicate = [&_this]() { return !_this->m_running || !_this->m_bench.empty() || _this->m_counter < _this->m_maxResources; };
std::unique_lock<std::mutex> guard{_this->m_lock};
@ -235,7 +251,10 @@ protected:
if (_this->m_bench.size() > 0) {
auto record = _this->m_bench.front();
_this->m_bench.pop_front();
return std::make_shared<AcquisitionProxyImpl>(record.resource, _this);
return provider::ResourceHandle<TResource>(
std::make_shared<AcquisitionProxyImpl>(record.resource, _this),
_this->m_invalidator
);
} else {
++ _this->m_counter;
}
@ -243,7 +262,10 @@ protected:
guard.unlock();
try {
return std::make_shared<AcquisitionProxyImpl>(_this->m_provider->get(), _this);
return provider::ResourceHandle<TResource>(
std::make_shared<AcquisitionProxyImpl>(_this->m_provider->get(), _this),
_this->m_invalidator
);
} catch (...) {
guard.lock();
--_this->m_counter;
@ -251,12 +273,11 @@ protected:
}
}
static async::CoroutineStarterForResult<const std::shared_ptr<TResource>&> getAsync(const std::shared_ptr<PoolTemplate>& _this) {
static async::CoroutineStarterForResult<const provider::ResourceHandle<TResource>&> getAsync(const std::shared_ptr<PoolTemplate>& _this) {
class GetCoroutine : public oatpp::async::CoroutineWithResult<GetCoroutine, const std::shared_ptr<TResource>&> {
class GetCoroutine : public oatpp::async::CoroutineWithResult<GetCoroutine, const provider::ResourceHandle<TResource>&> {
private:
std::shared_ptr<PoolTemplate> m_pool;
std::chrono::steady_clock::time_point m_startTime{std::chrono::steady_clock::now()};
public:
@ -292,7 +313,10 @@ protected:
auto record = m_pool->m_bench.front();
m_pool->m_bench.pop_front();
guard.unlock();
return this->_return(std::make_shared<AcquisitionProxyImpl>(record.resource, m_pool));
return this->_return(provider::ResourceHandle<TResource>(
std::make_shared<AcquisitionProxyImpl>(record.resource, m_pool),
m_pool->m_invalidator
));
} else {
++ m_pool->m_counter;
}
@ -303,8 +327,11 @@ protected:
}
async::Action onGet(const std::shared_ptr<TResource>& resource) {
return this->_return(std::make_shared<AcquisitionProxyImpl>(resource, m_pool));
async::Action onGet(const provider::ResourceHandle<TResource>& resource) {
return this->_return(provider::ResourceHandle<TResource>(
std::make_shared<AcquisitionProxyImpl>(resource, m_pool),
m_pool->m_invalidator
));
}
async::Action handleError(oatpp::async::Error* error) override {
@ -338,12 +365,6 @@ public:
stop();
}
void invalidate(const std::shared_ptr<TResource>& resource) {
auto proxy = std::static_pointer_cast<AcquisitionProxyImpl>(resource);
proxy->__pool__invalidate();
m_provider->invalidate(proxy->__pool__getUnderlyingResource());
}
void stop() {
{
@ -437,7 +458,7 @@ public:
* Get resource.
* @return
*/
std::shared_ptr<TResource> get() override {
provider::ResourceHandle<TResource> get() override {
return TPool::get(this->shared_from_this());
}
@ -445,18 +466,10 @@ public:
* Get resource asynchronously.
* @return
*/
async::CoroutineStarterForResult<const std::shared_ptr<TResource>&> getAsync() override {
async::CoroutineStarterForResult<const provider::ResourceHandle<TResource>&> getAsync() override {
return TPool::getAsync(this->shared_from_this());
}
/**
* Invalidate resource.
* @param resource
*/
void invalidate(const std::shared_ptr<TResource>& resource) override {
TPool::invalidate(resource);
}
/**
* Stop pool. <br>
* *Note: call to stop() may block.*

View File

@ -25,11 +25,122 @@
#ifndef oatpp_provider_Provider_hpp
#define oatpp_provider_Provider_hpp
#include "Invalidator.hpp"
#include "oatpp/core/async/Coroutine.hpp"
#include "oatpp/core/data/share/MemoryLabel.hpp"
namespace oatpp { namespace provider {
/**
* Resource handle template.
* @tparam T
*/
template<class T, class PTR>
struct ResourceHandleTemplate {
/**
* Default constructor.
*/
ResourceHandleTemplate() = default;
/**
* Nullptr constructor.
*/
ResourceHandleTemplate(std::nullptr_t) {}
/**
* Constructor.
* @param resourceObject
* @param resourceInvalidator
*/
ResourceHandleTemplate(const PTR& resourceObject,
const std::shared_ptr<Invalidator<T>> &resourceInvalidator)
: object(resourceObject), invalidator(resourceInvalidator)
{}
/**
* Pointer to the resource.
*/
PTR object;
/**
* Invalidator that can be used to invalidate the resource.
*/
std::shared_ptr<Invalidator<T>> invalidator;
inline bool operator == (std::nullptr_t) const {
return object.get() == nullptr;
}
inline bool operator != (std::nullptr_t) const {
return object.get() != nullptr;
}
explicit inline operator bool() const {
return object.operator bool();
}
};
/**
* Resource handle.
* @tparam T
*/
template<class T>
struct ResourceHandle : public ResourceHandleTemplate<T, std::shared_ptr<T>> {
/**
* Default constructor.
*/
ResourceHandle() = default;
/**
* Nullptr constructor.
*/
ResourceHandle(std::nullptr_t) {}
/**
* Constructor.
* @param resourceObject
* @param resourceInvalidator
*/
ResourceHandle(const std::shared_ptr<T>& resourceObject,
const std::shared_ptr<Invalidator<T>>& resourceInvalidator)
: ResourceHandleTemplate<T, std::shared_ptr<T>>(resourceObject, resourceInvalidator)
{}
};
/**
* Weak Resource handle.
* @tparam T
*/
template<class T>
struct WeakResourceHandle : public ResourceHandleTemplate<T, std::weak_ptr<T>> {
/**
* Default constructor.
*/
WeakResourceHandle() = default;
/**
* Nullptr constructor.
*/
WeakResourceHandle(std::nullptr_t) {}
/**
* Constructor.
* @param resourceObject
* @param resourceInvalidator
*/
WeakResourceHandle(const std::weak_ptr<T>& resourceObject,
const std::shared_ptr<Invalidator<T>>& resourceInvalidator)
: ResourceHandleTemplate<T, std::weak_ptr<T>>(resourceObject, resourceInvalidator)
{}
};
/**
* Abstract resource provider.
* @tparam T - resource class.
@ -68,7 +179,7 @@ public:
* Some optional properties that user might want to know. <br>
* Note: All properties are optional and user should not rely on this.
*/
const std::unordered_map<oatpp::data::share::StringKeyLabelCI, oatpp::data::share::StringKeyLabel>& getProperties() const {
const std::unordered_map<data::share::StringKeyLabelCI, data::share::StringKeyLabel>& getProperties() const {
return m_properties;
}
@ -87,20 +198,13 @@ public:
* Get resource.
* @return - resource.
*/
virtual std::shared_ptr<T> get() = 0;
virtual ResourceHandle<T> get() = 0;
/**
* Get resource in Async manner.
* @return - &id:oatpp::async::CoroutineStarterForResult; of `T`.
*/
virtual async::CoroutineStarterForResult<const std::shared_ptr<T>&> getAsync() = 0;
/**
* Invalidate resource that was previously created by this provider. <br>
* Use-case: if provider is pool based - you can signal that this resource should not be reused anymore.
* @param resource
*/
virtual void invalidate(const std::shared_ptr<T>& resource) = 0;
virtual async::CoroutineStarterForResult<const ResourceHandle<T>&> getAsync() = 0;
/**
* Stop provider and free associated resources.

View File

@ -25,6 +25,7 @@
#ifndef oatpp_network_ConnectionHandler_hpp
#define oatpp_network_ConnectionHandler_hpp
#include "oatpp/core/provider/Provider.hpp"
#include "oatpp/core/data/stream/Stream.hpp"
#include <unordered_map>
@ -53,10 +54,11 @@ public:
/**
* Handle provided connection.
* @param connection - see &id:oatpp::data::stream::IOStream;.
* @param connectionData - see &id:oatpp::data::stream::IOStream;.
* @param params - accompanying parameters.
*/
virtual void handleConnection(const std::shared_ptr<IOStream>& connection, const std::shared_ptr<const ParameterMap>& params) = 0;
virtual void handleConnection(const provider::ResourceHandle<IOStream>& connectionData,
const std::shared_ptr<const ParameterMap>& params) = 0;
/**
* Stop all threads here

View File

@ -24,44 +24,41 @@
#include "ConnectionPool.hpp"
#include <thread>
#include <chrono>
namespace oatpp { namespace network {
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// ConnectionAcquisitionProxy
v_io_size ConnectionAcquisitionProxy::write(const void *buff, v_buff_size count, async::Action& action) {
return _obj->write(buff, count, action);
return _handle.object->write(buff, count, action);
}
v_io_size ConnectionAcquisitionProxy::read(void *buff, v_buff_size count, async::Action& action) {
return _obj->read(buff, count, action);
return _handle.object->read(buff, count, action);
}
void ConnectionAcquisitionProxy::setOutputStreamIOMode(oatpp::data::stream::IOMode ioMode) {
return _obj->setOutputStreamIOMode(ioMode);
return _handle.object->setOutputStreamIOMode(ioMode);
}
oatpp::data::stream::IOMode ConnectionAcquisitionProxy::getOutputStreamIOMode() {
return _obj->getOutputStreamIOMode();
return _handle.object->getOutputStreamIOMode();
}
void ConnectionAcquisitionProxy::setInputStreamIOMode(oatpp::data::stream::IOMode ioMode) {
return _obj->setInputStreamIOMode(ioMode);
return _handle.object->setInputStreamIOMode(ioMode);
}
oatpp::data::stream::IOMode ConnectionAcquisitionProxy::getInputStreamIOMode() {
return _obj->getInputStreamIOMode();
return _handle.object->getInputStreamIOMode();
}
oatpp::data::stream::Context& ConnectionAcquisitionProxy::getOutputStreamContext() {
return _obj->getOutputStreamContext();
return _handle.object->getOutputStreamContext();
}
oatpp::data::stream::Context& ConnectionAcquisitionProxy::getInputStreamContext() {
return _obj->getInputStreamContext();
return _handle.object->getInputStreamContext();
}
}}

View File

@ -36,7 +36,7 @@ namespace oatpp { namespace network {
*/
struct ConnectionAcquisitionProxy : public provider::AcquisitionProxy<data::stream::IOStream, ConnectionAcquisitionProxy> {
ConnectionAcquisitionProxy(const std::shared_ptr<data::stream::IOStream>& resource,
ConnectionAcquisitionProxy(const provider::ResourceHandle<data::stream::IOStream>& resource,
const std::shared_ptr<PoolInstance>& pool)
: provider::AcquisitionProxy<data::stream::IOStream, ConnectionAcquisitionProxy>(resource, pool)
{}

View File

@ -50,16 +50,16 @@ void Server::conditionalMainLoop() {
while (getStatus() == STATUS_RUNNING) {
if (m_condition()) {
std::shared_ptr<data::stream::IOStream> connection;
provider::ResourceHandle<data::stream::IOStream> connectionHandle;
{
std::lock_guard<oatpp::concurrency::SpinLock> lg(m_spinlock);
connection = m_connectionProvider->get();
connectionHandle = m_connectionProvider->get();
}
if (connection) {
if (connectionHandle.object) {
if (getStatus() == STATUS_RUNNING) {
if (m_condition()) {
std::lock_guard<oatpp::concurrency::SpinLock> lg(m_spinlock);
m_connectionHandler->handleConnection(connection, params /* null params */);
m_connectionHandler->handleConnection(connectionHandle, params /* null params */);
} else {
setStatus(STATUS_STOPPING);
}
@ -79,16 +79,16 @@ void Server::mainLoop(Server *instance) {
std::shared_ptr<const std::unordered_map<oatpp::String, oatpp::String>> params;
while (instance->getStatus() == STATUS_RUNNING) {
std::shared_ptr<data::stream::IOStream> connection;
provider::ResourceHandle<data::stream::IOStream> connectionHandle;
{
std::lock_guard<oatpp::concurrency::SpinLock> lg(instance->m_spinlock);
connection = instance->m_connectionProvider->get();
connectionHandle = instance->m_connectionProvider->get();
}
if (connection) {
if (connectionHandle) {
if (instance->getStatus() == STATUS_RUNNING) {
std::lock_guard<oatpp::concurrency::SpinLock> lg(instance->m_spinlock);
instance->m_connectionHandler->handleConnection(connection, params /* null params */);
instance->m_connectionHandler->handleConnection(connectionHandle, params /* null params */);
} else {
OATPP_LOGD("[oatpp::network::server::mainLoop()]", "Error. Server already stopped - closing connection...");
}

View File

@ -29,15 +29,21 @@
namespace oatpp { namespace network { namespace monitor {
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// ConnectionMonitor::ConnectionInvalidator
void ConnectionMonitor::ConnectionInvalidator::invalidate(const std::shared_ptr<data::stream::IOStream> &connection) {
auto proxy = std::static_pointer_cast<ConnectionProxy>(connection);
proxy->invalidate();
}
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// ConnectionMonitor::ConnectionProxy
ConnectionMonitor::ConnectionProxy::ConnectionProxy(const std::shared_ptr<Monitor>& monitor,
const std::shared_ptr<ConnectionProvider>& connectionProvider,
const std::shared_ptr<data::stream::IOStream>& connection)
const provider::ResourceHandle<data::stream::IOStream>& connectionHandle)
: m_monitor(monitor)
, m_connectionProvider(connectionProvider)
, m_connection(connection)
, m_connectionHandle(connectionHandle)
{
m_stats.timestampCreated = base::Environment::getMicroTickCount();
}
@ -62,43 +68,43 @@ ConnectionMonitor::ConnectionProxy::~ConnectionProxy() {
}
v_io_size ConnectionMonitor::ConnectionProxy::read(void *buffer, v_buff_size count, async::Action& action) {
auto res = m_connection->read(buffer, count, action);
auto res = m_connectionHandle.object->read(buffer, count, action);
std::lock_guard<std::mutex> lock(m_statsMutex);
m_monitor->onConnectionRead(m_stats, res);
return res;
}
v_io_size ConnectionMonitor::ConnectionProxy::write(const void *data, v_buff_size count, async::Action& action) {
auto res = m_connection->write(data, count, action);
auto res = m_connectionHandle.object->write(data, count, action);
std::lock_guard<std::mutex> lock(m_statsMutex);
m_monitor->onConnectionWrite(m_stats, res);
return res;
}
void ConnectionMonitor::ConnectionProxy::setInputStreamIOMode(data::stream::IOMode ioMode) {
m_connection->setInputStreamIOMode(ioMode);
m_connectionHandle.object->setInputStreamIOMode(ioMode);
}
data::stream::IOMode ConnectionMonitor::ConnectionProxy::getInputStreamIOMode() {
return m_connection->getInputStreamIOMode();
return m_connectionHandle.object->getInputStreamIOMode();
}
data::stream::Context& ConnectionMonitor::ConnectionProxy::getInputStreamContext() {
return m_connection->getInputStreamContext();
return m_connectionHandle.object->getInputStreamContext();
}
void ConnectionMonitor::ConnectionProxy::setOutputStreamIOMode(data::stream::IOMode ioMode) {
m_connection->setOutputStreamIOMode(ioMode);
m_connectionHandle.object->setOutputStreamIOMode(ioMode);
}
data::stream::IOMode ConnectionMonitor::ConnectionProxy::getOutputStreamIOMode() {
return m_connection->getOutputStreamIOMode();
return m_connectionHandle.object->getOutputStreamIOMode();
}
data::stream::Context& ConnectionMonitor::ConnectionProxy::getOutputStreamContext() {
return m_connection->getOutputStreamContext();
return m_connectionHandle.object->getOutputStreamContext();
}
void ConnectionMonitor::ConnectionProxy::invalidate() {
m_connectionProvider->invalidate(m_connection);
m_connectionHandle.invalidator->invalidate(m_connectionHandle.object);
}
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
@ -264,52 +270,56 @@ void ConnectionMonitor::Monitor::stop() {
// ConnectionMonitor
ConnectionMonitor::ConnectionMonitor(const std::shared_ptr<ConnectionProvider>& connectionProvider)
: m_monitor(Monitor::createShared())
: m_invalidator(std::make_shared<ConnectionInvalidator>())
, m_monitor(Monitor::createShared())
, m_connectionProvider(connectionProvider)
{
}
std::shared_ptr<data::stream::IOStream> ConnectionMonitor::get() {
provider::ResourceHandle<data::stream::IOStream> ConnectionMonitor::get() {
auto connection = m_connectionProvider->get();
if(!connection) {
return nullptr;
}
auto proxy = std::make_shared<ConnectionProxy>(m_monitor, m_connectionProvider, connection);
auto proxy = std::make_shared<ConnectionProxy>(m_monitor, connection);
m_monitor->addConnection((v_uint64) proxy.get(), proxy);
return proxy;
return provider::ResourceHandle<data::stream::IOStream>(proxy, m_invalidator);
}
async::CoroutineStarterForResult<const std::shared_ptr<data::stream::IOStream>&>
async::CoroutineStarterForResult<const provider::ResourceHandle<data::stream::IOStream>&>
ConnectionMonitor::getAsync() {
class GetConnectionCoroutine : public async::CoroutineWithResult<GetConnectionCoroutine, const std::shared_ptr<data::stream::IOStream>&> {
class GetConnectionCoroutine : public async::CoroutineWithResult<GetConnectionCoroutine, const provider::ResourceHandle<data::stream::IOStream>&> {
private:
std::shared_ptr<Monitor> m_monitor;
std::shared_ptr<ConnectionProvider> m_connectionProvider;
std::shared_ptr<ConnectionInvalidator> m_invalidator;
public:
GetConnectionCoroutine(const std::shared_ptr<Monitor>& monitor,
const std::shared_ptr<ConnectionProvider>& connectionProvider)
const std::shared_ptr<ConnectionProvider>& connectionProvider,
const std::shared_ptr<ConnectionInvalidator>& invalidator)
: m_monitor(monitor)
, m_connectionProvider(connectionProvider)
, m_invalidator(invalidator)
{}
Action act() override {
return m_connectionProvider->getAsync().callbackTo(&GetConnectionCoroutine::onConnection);
}
Action onConnection(const std::shared_ptr<data::stream::IOStream>& connection) {
Action onConnection(const provider::ResourceHandle<data::stream::IOStream>& connection) {
if(!connection) {
return _return(nullptr);
}
auto proxy = std::make_shared<ConnectionProxy>(m_monitor, m_connectionProvider, connection);
auto proxy = std::make_shared<ConnectionProxy>(m_monitor, connection);
m_monitor->addConnection((v_uint64) proxy.get(), proxy);
return _return(proxy);
return _return(provider::ResourceHandle<data::stream::IOStream>(proxy, m_invalidator));
}
};
return GetConnectionCoroutine::startForResult(m_monitor, m_connectionProvider);
return GetConnectionCoroutine::startForResult(m_monitor, m_connectionProvider, m_invalidator);
}
@ -321,11 +331,6 @@ void ConnectionMonitor::addMetricsChecker(const std::shared_ptr<MetricsChecker>&
m_monitor->addMetricsChecker(checker);
}
void ConnectionMonitor::invalidate(const std::shared_ptr<data::stream::IOStream>& connection) {
auto proxy = std::static_pointer_cast<ConnectionProxy>(connection);
proxy->invalidate();
}
void ConnectionMonitor::stop() {
m_monitor->stop();
}

View File

@ -40,6 +40,15 @@ namespace oatpp { namespace network { namespace monitor {
* and close those ones that are not satisfy selected rules.
*/
class ConnectionMonitor : public ClientConnectionProvider, public ServerConnectionProvider {
private:
class ConnectionInvalidator : public provider::Invalidator<data::stream::IOStream> {
public:
void invalidate(const std::shared_ptr<data::stream::IOStream>& connection) override;
};
private:
class Monitor; // FWD
@ -48,16 +57,13 @@ private:
friend Monitor;
private:
std::shared_ptr<Monitor> m_monitor;
/* provider which created this connection */
std::shared_ptr<ConnectionProvider> m_connectionProvider;
std::shared_ptr<data::stream::IOStream> m_connection;
provider::ResourceHandle<data::stream::IOStream> m_connectionHandle;
std::mutex m_statsMutex;
ConnectionStats m_stats;
public:
ConnectionProxy(const std::shared_ptr<Monitor>& monitor,
const std::shared_ptr<ConnectionProvider>& connectionProvider,
const std::shared_ptr<data::stream::IOStream>& connection);
const provider::ResourceHandle<data::stream::IOStream>& connectionHandle);
~ConnectionProxy() override;
@ -118,6 +124,7 @@ private:
};
private:
std::shared_ptr<ConnectionInvalidator> m_invalidator;
std::shared_ptr<Monitor> m_monitor;
std::shared_ptr<ConnectionProvider> m_connectionProvider;
public:
@ -128,9 +135,9 @@ public:
*/
ConnectionMonitor(const std::shared_ptr<ConnectionProvider>& connectionProvider);
std::shared_ptr<data::stream::IOStream> get() override;
provider::ResourceHandle<data::stream::IOStream> get() override;
async::CoroutineStarterForResult<const std::shared_ptr<data::stream::IOStream>&> getAsync() override;
async::CoroutineStarterForResult<const provider::ResourceHandle<data::stream::IOStream>&> getAsync() override;
void addStatCollector(const std::shared_ptr<StatCollector>& collector);
@ -140,8 +147,6 @@ public:
*/
void addMetricsChecker(const std::shared_ptr<MetricsChecker>& checker);
void invalidate(const std::shared_ptr<data::stream::IOStream>& connection) override;
void stop() override;
};

View File

@ -42,14 +42,40 @@
namespace oatpp { namespace network { namespace tcp { namespace client {
void ConnectionProvider::ConnectionInvalidator::invalidate(const std::shared_ptr<data::stream::IOStream>& connection) {
/************************************************
* WARNING!!!
*
* shutdown(handle, SHUT_RDWR) <--- DO!
* close(handle); <--- DO NOT!
*
* DO NOT CLOSE file handle here -
* USE shutdown instead.
* Using close prevent FDs popping out of epoll,
* and they'll be stuck there forever.
************************************************/
auto c = std::static_pointer_cast<network::tcp::Connection>(connection);
v_io_handle handle = c->getHandle();
#if defined(WIN32) || defined(_WIN32)
shutdown(handle, SD_BOTH);
#else
shutdown(handle, SHUT_RDWR);
#endif
}
ConnectionProvider::ConnectionProvider(const network::Address& address)
: m_address(address)
: m_invalidator(std::make_shared<ConnectionInvalidator>())
, m_address(address)
{
setProperty(PROPERTY_HOST, address.host);
setProperty(PROPERTY_PORT, oatpp::utils::conversion::int32ToStr(address.port));
}
std::shared_ptr<oatpp::data::stream::IOStream> ConnectionProvider::get() {
provider::ResourceHandle<data::stream::IOStream> ConnectionProvider::get() {
auto portStr = oatpp::utils::conversion::int32ToStr(m_address.port);
@ -123,14 +149,18 @@ std::shared_ptr<oatpp::data::stream::IOStream> ConnectionProvider::get() {
}
#endif
return std::make_shared<oatpp::network::tcp::Connection>(clientHandle);
return provider::ResourceHandle<data::stream::IOStream>(
std::make_shared<oatpp::network::tcp::Connection>(clientHandle),
m_invalidator
);
}
oatpp::async::CoroutineStarterForResult<const std::shared_ptr<oatpp::data::stream::IOStream>&> ConnectionProvider::getAsync() {
oatpp::async::CoroutineStarterForResult<const provider::ResourceHandle<data::stream::IOStream>&> ConnectionProvider::getAsync() {
class ConnectCoroutine : public oatpp::async::CoroutineWithResult<ConnectCoroutine, const std::shared_ptr<oatpp::data::stream::IOStream>&> {
class ConnectCoroutine : public oatpp::async::CoroutineWithResult<ConnectCoroutine, const provider::ResourceHandle<oatpp::data::stream::IOStream>&> {
private:
std::shared_ptr<ConnectionInvalidator> m_connectionInvalidator;
network::Address m_address;
oatpp::v_io_handle m_clientHandle;
private:
@ -139,8 +169,10 @@ oatpp::async::CoroutineStarterForResult<const std::shared_ptr<oatpp::data::strea
bool m_isHandleOpened;
public:
ConnectCoroutine(const network::Address& address)
: m_address(address)
ConnectCoroutine(const std::shared_ptr<ConnectionInvalidator>& connectionInvalidator,
const network::Address& address)
: m_connectionInvalidator(connectionInvalidator)
, m_address(address)
, m_result(nullptr)
, m_currentResult(nullptr)
, m_isHandleOpened(false)
@ -261,7 +293,10 @@ oatpp::async::CoroutineStarterForResult<const std::shared_ptr<oatpp::data::strea
#else
if(res == 0 || errno == EISCONN) {
return _return(std::make_shared<oatpp::network::tcp::Connection>(m_clientHandle));
return _return(provider::ResourceHandle<data::stream::IOStream>(
std::make_shared<oatpp::network::tcp::Connection>(m_clientHandle),
m_connectionInvalidator
));
}
if(errno == EALREADY || errno == EINPROGRESS) {
return ioWait(m_clientHandle, oatpp::async::Action::IOEventType::IO_EVENT_WRITE);
@ -278,32 +313,7 @@ oatpp::async::CoroutineStarterForResult<const std::shared_ptr<oatpp::data::strea
};
return ConnectCoroutine::startForResult(m_address);
}
void ConnectionProvider::invalidate(const std::shared_ptr<data::stream::IOStream>& connection) {
/************************************************
* WARNING!!!
*
* shutdown(handle, SHUT_RDWR) <--- DO!
* close(handle); <--- DO NOT!
*
* DO NOT CLOSE file handle here -
* USE shutdown instead.
* Using close prevent FDs popping out of epoll,
* and they'll be stuck there forever.
************************************************/
auto c = std::static_pointer_cast<network::tcp::Connection>(connection);
v_io_handle handle = c->getHandle();
#if defined(WIN32) || defined(_WIN32)
shutdown(handle, SD_BOTH);
#else
shutdown(handle, SHUT_RDWR);
#endif
return ConnectCoroutine::startForResult(m_invalidator, m_address);
}

View File

@ -28,6 +28,7 @@
#include "oatpp/network/Address.hpp"
#include "oatpp/network/ConnectionProvider.hpp"
#include "oatpp/core/provider/Invalidator.hpp"
#include "oatpp/core/Types.hpp"
namespace oatpp { namespace network { namespace tcp { namespace client {
@ -36,6 +37,17 @@ namespace oatpp { namespace network { namespace tcp { namespace client {
* Simple provider of clinet TCP connections.
*/
class ConnectionProvider : public ClientConnectionProvider {
private:
class ConnectionInvalidator : public provider::Invalidator<data::stream::IOStream> {
public:
void invalidate(const std::shared_ptr<data::stream::IOStream>& connection) override;
};
private:
std::shared_ptr<ConnectionInvalidator> m_invalidator;
protected:
network::Address m_address;
public:
@ -66,20 +78,13 @@ public:
* Get connection.
* @return - `std::shared_ptr` to &id:oatpp::data::stream::IOStream;.
*/
std::shared_ptr<data::stream::IOStream> get() override;
provider::ResourceHandle<data::stream::IOStream> get() override;
/**
* Get connection in asynchronous manner.
* @return - &id:oatpp::async::CoroutineStarterForResult;.
*/
oatpp::async::CoroutineStarterForResult<const std::shared_ptr<data::stream::IOStream>&> getAsync() override;
/**
* Call shutdown read and write on an underlying file descriptor.
* `connection` **MUST** be an object previously obtained from **THIS** connection provider.
* @param connection
*/
void invalidate(const std::shared_ptr<data::stream::IOStream>& connection) override;
oatpp::async::CoroutineStarterForResult<const provider::ResourceHandle<data::stream::IOStream>&> getAsync() override;
/**
* Get address - &id:oatpp::network::Address;.

View File

@ -90,11 +90,41 @@ oatpp::data::stream::Context& ConnectionProvider::ExtendedConnection::getInputSt
return m_context;
}
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// ConnectionProvider::ConnectionInvalidator
void ConnectionProvider::ConnectionInvalidator::invalidate(const std::shared_ptr<data::stream::IOStream>& connection) {
/************************************************
* WARNING!!!
*
* shutdown(handle, SHUT_RDWR) <--- DO!
* close(handle); <--- DO NOT!
*
* DO NOT CLOSE file handle here -
* USE shutdown instead.
* Using close prevent FDs popping out of epoll,
* and they'll be stuck there forever.
************************************************/
auto c = std::static_pointer_cast<network::tcp::Connection>(connection);
v_io_handle handle = c->getHandle();
#if defined(WIN32) || defined(_WIN32)
shutdown(handle, SD_BOTH);
#else
shutdown(handle, SHUT_RDWR);
#endif
}
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// ConnectionProvider
ConnectionProvider::ConnectionProvider(const network::Address& address, bool useExtendedConnections)
: m_address(address)
: m_invalidator(std::make_shared<ConnectionInvalidator>())
, m_address(address)
, m_closed(false)
, m_useExtendedConnections(useExtendedConnections)
{
@ -301,7 +331,7 @@ bool ConnectionProvider::prepareConnectionHandle(oatpp::v_io_handle handle) {
}
std::shared_ptr<oatpp::data::stream::IOStream> ConnectionProvider::getDefaultConnection() {
provider::ResourceHandle<data::stream::IOStream> ConnectionProvider::getDefaultConnection() {
oatpp::v_io_handle handle = accept(m_serverHandle, nullptr, nullptr);
@ -310,14 +340,17 @@ std::shared_ptr<oatpp::data::stream::IOStream> ConnectionProvider::getDefaultCon
}
if(prepareConnectionHandle(handle)) {
return std::make_shared<Connection>(handle);
return provider::ResourceHandle<data::stream::IOStream>(
std::make_shared<Connection>(handle),
m_invalidator
);
}
return nullptr;
}
std::shared_ptr<oatpp::data::stream::IOStream> ConnectionProvider::getExtendedConnection() {
provider::ResourceHandle<data::stream::IOStream> ConnectionProvider::getExtendedConnection() {
struct sockaddr_storage clientAddress;
socklen_t clientAddressSize = sizeof(clientAddress);
@ -364,14 +397,17 @@ std::shared_ptr<oatpp::data::stream::IOStream> ConnectionProvider::getExtendedCo
}
if(prepareConnectionHandle(handle)) {
return std::make_shared<ExtendedConnection>(handle, std::move(properties));
return provider::ResourceHandle<data::stream::IOStream>(
std::make_shared<ExtendedConnection>(handle, std::move(properties)),
m_invalidator
);
}
return nullptr;
}
std::shared_ptr<oatpp::data::stream::IOStream> ConnectionProvider::get() {
provider::ResourceHandle<oatpp::data::stream::IOStream> ConnectionProvider::get() {
fd_set set;
struct timeval timeout;
@ -399,29 +435,4 @@ std::shared_ptr<oatpp::data::stream::IOStream> ConnectionProvider::get() {
}
void ConnectionProvider::invalidate(const std::shared_ptr<data::stream::IOStream>& connection) {
/************************************************
* WARNING!!!
*
* shutdown(handle, SHUT_RDWR) <--- DO!
* close(handle); <--- DO NOT!
*
* DO NOT CLOSE file handle here -
* USE shutdown instead.
* Using close prevent FDs popping out of epoll,
* and they'll be stuck there forever.
************************************************/
auto c = std::static_pointer_cast<network::tcp::Connection>(connection);
v_io_handle handle = c->getHandle();
#if defined(WIN32) || defined(_WIN32)
shutdown(handle, SD_BOTH);
#else
shutdown(handle, SHUT_RDWR);
#endif
}
}}}}

View File

@ -37,6 +37,15 @@ namespace oatpp { namespace network { namespace tcp { namespace server {
* Simple provider of TCP connections.
*/
class ConnectionProvider : public ServerConnectionProvider {
private:
class ConnectionInvalidator : public provider::Invalidator<data::stream::IOStream> {
public:
void invalidate(const std::shared_ptr<data::stream::IOStream>& connection) override;
};
public:
/**
@ -75,6 +84,7 @@ public:
};
private:
std::shared_ptr<ConnectionInvalidator> m_invalidator;
network::Address m_address;
std::atomic<bool> m_closed;
oatpp::v_io_handle m_serverHandle;
@ -83,8 +93,8 @@ private:
oatpp::v_io_handle instantiateServer();
private:
bool prepareConnectionHandle(oatpp::v_io_handle handle);
std::shared_ptr<data::stream::IOStream> getDefaultConnection();
std::shared_ptr<data::stream::IOStream> getExtendedConnection();
provider::ResourceHandle<data::stream::IOStream> getDefaultConnection();
provider::ResourceHandle<data::stream::IOStream> getExtendedConnection();
public:
/**
@ -122,7 +132,7 @@ public:
* Get incoming connection.
* @return &id:oatpp::data::stream::IOStream;.
*/
std::shared_ptr<data::stream::IOStream> get() override;
provider::ResourceHandle<data::stream::IOStream> get() override;
/**
* No need to implement this.<br>
@ -132,7 +142,7 @@ public:
* <br>
* *It may be implemented later*
*/
oatpp::async::CoroutineStarterForResult<const std::shared_ptr<data::stream::IOStream>&> getAsync() override {
oatpp::async::CoroutineStarterForResult<const provider::ResourceHandle<data::stream::IOStream>&> getAsync() override {
/*
* No need to implement this.
* For Asynchronous IO in oatpp it is considered to be a good practice
@ -144,13 +154,6 @@ public:
throw std::runtime_error("[oatpp::network::tcp::server::ConnectionProvider::getAsync()]: Error. Not implemented.");
}
/**
* Call shutdown read and write on an underlying file descriptor.
* `connection` **MUST** be an object previously obtained from **THIS** connection provider.
* @param connection
*/
void invalidate(const std::shared_ptr<data::stream::IOStream>& connection) override;
/**
* Get address - &id:oatpp::network::Address;.
* @return

View File

@ -26,8 +26,13 @@
namespace oatpp { namespace network { namespace virtual_ { namespace client {
void ConnectionProvider::ConnectionInvalidator::invalidate(const std::shared_ptr<data::stream::IOStream>& connection) {
(void) connection;
}
ConnectionProvider::ConnectionProvider(const std::shared_ptr<virtual_::Interface>& interface)
: m_interface(interface)
: m_invalidator(std::make_shared<ConnectionInvalidator>())
, m_interface(interface)
, m_maxAvailableToRead(-1)
, m_maxAvailableToWrite(-1)
{
@ -43,7 +48,7 @@ void ConnectionProvider::stop() {
}
std::shared_ptr<data::stream::IOStream> ConnectionProvider::get() {
provider::ResourceHandle<data::stream::IOStream> ConnectionProvider::get() {
auto submission = m_interface->connect();
if(submission->isValid()) {
auto socket = submission->getSocket();
@ -51,26 +56,30 @@ std::shared_ptr<data::stream::IOStream> ConnectionProvider::get() {
socket->setOutputStreamIOMode(oatpp::data::stream::IOMode::BLOCKING);
socket->setInputStreamIOMode(oatpp::data::stream::IOMode::BLOCKING);
socket->setMaxAvailableToReadWrtie(m_maxAvailableToRead, m_maxAvailableToWrite);
return socket;
return provider::ResourceHandle<data::stream::IOStream>(socket, m_invalidator);
}
}
throw std::runtime_error("[oatpp::network::virtual_::client::getConnection()]: Error. Can't connect. " + *m_interface->getName());
}
oatpp::async::CoroutineStarterForResult<const std::shared_ptr<oatpp::data::stream::IOStream>&> ConnectionProvider::getAsync() {
oatpp::async::CoroutineStarterForResult<const provider::ResourceHandle<data::stream::IOStream>&>
ConnectionProvider::getAsync() {
class ConnectCoroutine : public oatpp::async::CoroutineWithResult<ConnectCoroutine, const std::shared_ptr<oatpp::data::stream::IOStream>&> {
class ConnectCoroutine : public oatpp::async::CoroutineWithResult<ConnectCoroutine, const provider::ResourceHandle<oatpp::data::stream::IOStream>&> {
private:
std::shared_ptr<ConnectionInvalidator> m_invalidator;
std::shared_ptr<virtual_::Interface> m_interface;
v_io_size m_maxAvailableToRead;
v_io_size m_maxAvailableToWrite;
std::shared_ptr<virtual_::Interface::ConnectionSubmission> m_submission;
public:
ConnectCoroutine(const std::shared_ptr<virtual_::Interface>& interface,
ConnectCoroutine(const std::shared_ptr<ConnectionInvalidator>& invalidator,
const std::shared_ptr<virtual_::Interface>& interface,
v_io_size maxAvailableToRead,
v_io_size maxAvailableToWrite)
: m_interface(interface)
: m_invalidator(invalidator)
, m_interface(interface)
, m_maxAvailableToRead(maxAvailableToRead)
, m_maxAvailableToWrite(maxAvailableToWrite)
{}
@ -93,7 +102,7 @@ oatpp::async::CoroutineStarterForResult<const std::shared_ptr<oatpp::data::strea
socket->setOutputStreamIOMode(oatpp::data::stream::IOMode::ASYNCHRONOUS);
socket->setInputStreamIOMode(oatpp::data::stream::IOMode::ASYNCHRONOUS);
socket->setMaxAvailableToReadWrtie(m_maxAvailableToRead, m_maxAvailableToWrite);
return _return(socket);
return _return(provider::ResourceHandle<data::stream::IOStream>(socket, m_invalidator));
}
return waitRepeat(std::chrono::milliseconds(100));
@ -105,7 +114,7 @@ oatpp::async::CoroutineStarterForResult<const std::shared_ptr<oatpp::data::strea
};
return ConnectCoroutine::startForResult(m_interface, m_maxAvailableToRead, m_maxAvailableToWrite);
return ConnectCoroutine::startForResult(m_invalidator, m_interface, m_maxAvailableToRead, m_maxAvailableToWrite);
}

View File

@ -36,6 +36,17 @@ namespace oatpp { namespace network { namespace virtual_ { namespace client {
* Extends &id:oatpp::network::ClientConnectionProvider;.
*/
class ConnectionProvider : public oatpp::network::ClientConnectionProvider {
private:
class ConnectionInvalidator : public provider::Invalidator<data::stream::IOStream> {
public:
void invalidate(const std::shared_ptr<data::stream::IOStream>& connection) override;
};
private:
std::shared_ptr<ConnectionInvalidator> m_invalidator;
private:
std::shared_ptr<virtual_::Interface> m_interface;
v_io_size m_maxAvailableToRead;
@ -75,22 +86,13 @@ public:
* Get connection.
* @return - `std::shared_ptr` to &id:oatpp::data::stream::IOStream;.
*/
std::shared_ptr<data::stream::IOStream> get() override;
provider::ResourceHandle<data::stream::IOStream> get() override;
/**
* Get connection in asynchronous manner.
* @return - &id:oatpp::async::CoroutineStarterForResult;.
*/
oatpp::async::CoroutineStarterForResult<const std::shared_ptr<data::stream::IOStream>&> getAsync() override;
/**
* Does nothing.
* @param connection
*/
void invalidate(const std::shared_ptr<data::stream::IOStream>& connection) override {
(void)connection;
// DO Nothing.
}
oatpp::async::CoroutineStarterForResult<const provider::ResourceHandle<data::stream::IOStream>&> getAsync() override;
};

View File

@ -26,8 +26,13 @@
namespace oatpp { namespace network { namespace virtual_ { namespace server {
void ConnectionProvider::ConnectionInvalidator::invalidate(const std::shared_ptr<data::stream::IOStream>& connection) {
(void) connection;
}
ConnectionProvider::ConnectionProvider(const std::shared_ptr<virtual_::Interface>& interface)
: m_interface(interface)
: m_invalidator(std::make_shared<ConnectionInvalidator>())
, m_interface(interface)
, m_listenerLock(interface->bind())
, m_open(true)
, m_maxAvailableToRead(-1)
@ -52,12 +57,12 @@ void ConnectionProvider::stop() {
m_interface->notifyAcceptors();
}
std::shared_ptr<data::stream::IOStream> ConnectionProvider::get() {
provider::ResourceHandle<data::stream::IOStream> ConnectionProvider::get() {
auto socket = m_interface->accept(m_open);
if(socket) {
socket->setMaxAvailableToReadWrtie(m_maxAvailableToRead, m_maxAvailableToWrite);
}
return socket;
return provider::ResourceHandle<data::stream::IOStream>(socket, m_invalidator);
}
}}}}

View File

@ -37,6 +37,16 @@ namespace oatpp { namespace network { namespace virtual_ { namespace server {
*/
class ConnectionProvider : public oatpp::network::ServerConnectionProvider {
private:
class ConnectionInvalidator : public provider::Invalidator<data::stream::IOStream> {
public:
void invalidate(const std::shared_ptr<data::stream::IOStream>& connection) override;
};
private:
std::shared_ptr<ConnectionInvalidator> m_invalidator;
std::shared_ptr<virtual_::Interface> m_interface;
std::shared_ptr<virtual_::Interface::ListenerLock> m_listenerLock;
bool m_open;
@ -74,7 +84,7 @@ public:
* Get incoming connection.
* @return &id:oatpp::data::stream::IOStream;.
*/
std::shared_ptr<data::stream::IOStream> get() override;
provider::ResourceHandle<data::stream::IOStream> get() override;
/**
* **NOT IMPLEMENTED!**<br>
@ -85,7 +95,7 @@ public:
* <br>
* *It may be implemented later.*
*/
oatpp::async::CoroutineStarterForResult<const std::shared_ptr<data::stream::IOStream>&> getAsync() override {
oatpp::async::CoroutineStarterForResult<const provider::ResourceHandle<data::stream::IOStream>&> getAsync() override {
/*
* No need to implement this.
* For Asynchronous IO in oatpp it is considered to be a good practice
@ -96,15 +106,6 @@ public:
*/
throw std::runtime_error("[oatpp::network::virtual_::server::ConnectionProvider::getConnectionAsync()]: Error. Not implemented.");
}
/**
* Does nothing.
* @param connection
*/
void invalidate(const std::shared_ptr<data::stream::IOStream>& connection) override {
(void)connection;
// DO Nothing.
}
};

View File

@ -40,10 +40,8 @@ namespace oatpp { namespace web { namespace client {
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// HttpRequestExecutor::ConnectionProxy
HttpRequestExecutor::ConnectionProxy::ConnectionProxy(const std::shared_ptr<ClientConnectionProvider>& connectionProvider,
const std::shared_ptr<data::stream::IOStream>& connection)
: m_connectionProvider(connectionProvider)
, m_connection(connection)
HttpRequestExecutor::ConnectionProxy::ConnectionProxy(const provider::ResourceHandle<data::stream::IOStream>& connectionHandle)
: m_connectionHandle(connectionHandle)
, m_valid(true)
, m_invalidateOnDestroy(false)
{}
@ -55,40 +53,40 @@ HttpRequestExecutor::ConnectionProxy::~ConnectionProxy() {
}
v_io_size HttpRequestExecutor::ConnectionProxy::read(void *buffer, v_buff_size count, async::Action& action) {
return m_connection->read(buffer, count, action);
return m_connectionHandle.object->read(buffer, count, action);
}
v_io_size HttpRequestExecutor::ConnectionProxy::write(const void *data, v_buff_size count, async::Action& action) {
return m_connection->write(data,count, action);
return m_connectionHandle.object->write(data,count, action);
}
void HttpRequestExecutor::ConnectionProxy::setInputStreamIOMode(data::stream::IOMode ioMode) {
m_connection->setInputStreamIOMode(ioMode);
m_connectionHandle.object->setInputStreamIOMode(ioMode);
}
data::stream::IOMode HttpRequestExecutor::ConnectionProxy::getInputStreamIOMode() {
return m_connection->getInputStreamIOMode();
return m_connectionHandle.object->getInputStreamIOMode();
}
data::stream::Context& HttpRequestExecutor::ConnectionProxy::getInputStreamContext() {
return m_connection->getInputStreamContext();
return m_connectionHandle.object->getInputStreamContext();
}
void HttpRequestExecutor::ConnectionProxy::setOutputStreamIOMode(data::stream::IOMode ioMode) {
return m_connection->setOutputStreamIOMode(ioMode);
return m_connectionHandle.object->setOutputStreamIOMode(ioMode);
}
data::stream::IOMode HttpRequestExecutor::ConnectionProxy::getOutputStreamIOMode() {
return m_connection->getOutputStreamIOMode();
return m_connectionHandle.object->getOutputStreamIOMode();
}
data::stream::Context& HttpRequestExecutor::ConnectionProxy::getOutputStreamContext() {
return m_connection->getOutputStreamContext();
return m_connectionHandle.object->getOutputStreamContext();
}
void HttpRequestExecutor::ConnectionProxy::invalidate() {
if(m_valid) {
m_connectionProvider->invalidate(m_connection);
m_connectionHandle.invalidator->invalidate(m_connectionHandle.object);
m_valid = false;
}
}
@ -138,7 +136,7 @@ std::shared_ptr<HttpRequestExecutor::ConnectionHandle> HttpRequestExecutor::getC
throw RequestExecutionError(RequestExecutionError::ERROR_CODE_CANT_CONNECT,
"[oatpp::web::client::HttpRequestExecutor::getConnection()]: ConnectionProvider failed to provide Connection");
}
auto connectionProxy = std::make_shared<ConnectionProxy>(m_connectionProvider, connection);
auto connectionProxy = std::make_shared<ConnectionProxy>(connection);
return std::make_shared<HttpConnectionHandle>(connectionProxy);
}
@ -158,8 +156,8 @@ HttpRequestExecutor::getConnectionAsync() {
return m_connectionProvider->getAsync().callbackTo(&GetConnectionCoroutine::onConnectionReady);
}
Action onConnectionReady(const std::shared_ptr<oatpp::data::stream::IOStream>& connection) {
auto connectionProxy = std::make_shared<ConnectionProxy>(m_connectionProvider, connection);
Action onConnectionReady(const provider::ResourceHandle<oatpp::data::stream::IOStream>& connection) {
auto connectionProxy = std::make_shared<ConnectionProxy>(connection);
return _return(std::make_shared<HttpConnectionHandle>(connectionProxy));
}

View File

@ -48,15 +48,12 @@ public:
class ConnectionProxy : public data::stream::IOStream {
private:
/* provider which created this connection */
std::shared_ptr<ClientConnectionProvider> m_connectionProvider;
std::shared_ptr<data::stream::IOStream> m_connection;
provider::ResourceHandle<data::stream::IOStream> m_connectionHandle;
bool m_valid;
bool m_invalidateOnDestroy;
public:
ConnectionProxy(const std::shared_ptr<ClientConnectionProvider>& connectionProvider,
const std::shared_ptr<data::stream::IOStream>& connection);
ConnectionProxy(const provider::ResourceHandle<data::stream::IOStream>& connectionHandle);
~ConnectionProxy() override;

View File

@ -67,7 +67,7 @@ void AsyncHttpConnectionHandler::addResponseInterceptor(const std::shared_ptr<in
m_components->responseInterceptors.push_back(interceptor);
}
void AsyncHttpConnectionHandler::handleConnection(const std::shared_ptr<IOStream>& connection,
void AsyncHttpConnectionHandler::handleConnection(const provider::ResourceHandle<IOStream>& connection,
const std::shared_ptr<const ParameterMap>& params)
{
@ -75,8 +75,8 @@ void AsyncHttpConnectionHandler::handleConnection(const std::shared_ptr<IOStream
if (m_continue.load()) {
connection->setOutputStreamIOMode(oatpp::data::stream::IOMode::ASYNCHRONOUS);
connection->setInputStreamIOMode(oatpp::data::stream::IOMode::ASYNCHRONOUS);
connection.object->setOutputStreamIOMode(oatpp::data::stream::IOMode::ASYNCHRONOUS);
connection.object->setInputStreamIOMode(oatpp::data::stream::IOMode::ASYNCHRONOUS);
m_executor->execute<HttpProcessor::Coroutine>(m_components, connection, &m_spawns);

View File

@ -123,7 +123,8 @@ public:
void addResponseInterceptor(const std::shared_ptr<interceptor::ResponseInterceptor>& interceptor);
void handleConnection(const std::shared_ptr<IOStream>& connection, const std::shared_ptr<const ParameterMap>& params) override;
void handleConnection(const provider::ResourceHandle<IOStream>& connection,
const std::shared_ptr<const ParameterMap>& params) override;
/**
* Will call m_executor.stop()

View File

@ -62,7 +62,7 @@ void HttpConnectionHandler::addResponseInterceptor(const std::shared_ptr<interce
m_components->responseInterceptors.push_back(interceptor);
}
void HttpConnectionHandler::handleConnection(const std::shared_ptr<oatpp::data::stream::IOStream>& connection,
void HttpConnectionHandler::handleConnection(const provider::ResourceHandle<data::stream::IOStream>& connection,
const std::shared_ptr<const ParameterMap>& params)
{
@ -70,8 +70,8 @@ void HttpConnectionHandler::handleConnection(const std::shared_ptr<oatpp::data::
if (m_continue.load()) {
connection->setOutputStreamIOMode(oatpp::data::stream::IOMode::BLOCKING);
connection->setInputStreamIOMode(oatpp::data::stream::IOMode::BLOCKING);
connection.object->setOutputStreamIOMode(oatpp::data::stream::IOMode::BLOCKING);
connection.object->setInputStreamIOMode(oatpp::data::stream::IOMode::BLOCKING);
/* Create working thread */
std::thread thread(&HttpProcessor::Task::run, std::move(HttpProcessor::Task(m_components, connection, &m_spawns)));

View File

@ -99,7 +99,8 @@ public:
* Implementation of &id:oatpp::network::ConnectionHandler::handleConnection;.
* @param connection - &id:oatpp::data::stream::IOStream; representing connection.
*/
void handleConnection(const std::shared_ptr<IOStream>& connection, const std::shared_ptr<const ParameterMap>& params) override;
void handleConnection(const provider::ResourceHandle<IOStream>& connection,
const std::shared_ptr<const ParameterMap>& params) override;
/**
* Tell all worker threads to exit when done.

View File

@ -72,13 +72,13 @@ HttpProcessor::Components::Components(const std::shared_ptr<HttpRouter>& pRouter
// Other
HttpProcessor::ProcessingResources::ProcessingResources(const std::shared_ptr<Components>& pComponents,
const std::shared_ptr<oatpp::data::stream::IOStream>& pConnection)
const provider::ResourceHandle<oatpp::data::stream::IOStream>& pConnection)
: components(pComponents)
, connection(pConnection)
, headersInBuffer(components->config->headersInBufferInitial)
, headersOutBuffer(components->config->headersOutBufferInitial)
, headersReader(&headersInBuffer, components->config->headersReaderChunkSize, components->config->headersReaderMaxSize)
, inStream(data::stream::InputStreamBufferedProxy::createShared(connection, std::make_shared<std::string>(data::buffer::IOBuffer::BUFFER_SIZE, 0)))
, inStream(data::stream::InputStreamBufferedProxy::createShared(connection.object, std::make_shared<std::string>(data::buffer::IOBuffer::BUFFER_SIZE, 0)))
{}
std::shared_ptr<protocol::http::outgoing::Response>
@ -147,7 +147,7 @@ HttpProcessor::ConnectionState HttpProcessor::processNextRequest(ProcessingResou
connectionState = ConnectionState::CLOSING;
} else {
request = protocol::http::incoming::Request::createShared(resources.connection,
request = protocol::http::incoming::Request::createShared(resources.connection.object,
headersReadResult.startingLine,
headersReadResult.headers,
resources.inStream,
@ -209,7 +209,7 @@ HttpProcessor::ConnectionState HttpProcessor::processNextRequest(ProcessingResou
auto contentEncoderProvider =
protocol::http::utils::CommunicationUtils::selectEncoder(request, resources.components->contentEncodingProviders);
response->send(resources.connection.get(), &resources.headersOutBuffer, contentEncoderProvider.get());
response->send(resources.connection.object.get(), &resources.headersOutBuffer, contentEncoderProvider.get());
return connectionState;
@ -219,7 +219,7 @@ HttpProcessor::ConnectionState HttpProcessor::processNextRequest(ProcessingResou
// Task
HttpProcessor::Task::Task(const std::shared_ptr<Components>& components,
const std::shared_ptr<oatpp::data::stream::IOStream>& connection,
const provider::ResourceHandle<oatpp::data::stream::IOStream>& connection,
std::atomic_long *taskCounter)
: m_components(components)
, m_connection(connection)
@ -264,7 +264,7 @@ HttpProcessor::Task &HttpProcessor::Task::operator=(HttpProcessor::Task &&t) {
void HttpProcessor::Task::run(){
m_connection->initContexts();
m_connection.object->initContexts();
ProcessingResources resources(m_components, m_connection);
@ -293,14 +293,14 @@ HttpProcessor::Task::~Task() {
// HttpProcessor::Coroutine
HttpProcessor::Coroutine::Coroutine(const std::shared_ptr<Components>& components,
const std::shared_ptr<oatpp::data::stream::IOStream>& connection,
const provider::ResourceHandle<oatpp::data::stream::IOStream>& connection,
std::atomic_long *taskCounter)
: m_components(components)
, m_connection(connection)
, m_headersInBuffer(components->config->headersInBufferInitial)
, m_headersReader(&m_headersInBuffer, components->config->headersReaderChunkSize, components->config->headersReaderMaxSize)
, m_headersOutBuffer(std::make_shared<oatpp::data::stream::BufferOutputStream>(components->config->headersOutBufferInitial))
, m_inStream(data::stream::InputStreamBufferedProxy::createShared(m_connection, std::make_shared<std::string>(data::buffer::IOBuffer::BUFFER_SIZE, 0)))
, m_inStream(data::stream::InputStreamBufferedProxy::createShared(m_connection.object, std::make_shared<std::string>(data::buffer::IOBuffer::BUFFER_SIZE, 0)))
, m_connectionState(ConnectionState::ALIVE)
, m_counter(taskCounter)
{
@ -312,7 +312,7 @@ HttpProcessor::Coroutine::~Coroutine() {
}
HttpProcessor::Coroutine::Action HttpProcessor::Coroutine::act() {
return m_connection->initContextsAsync().next(yieldTo(&HttpProcessor::Coroutine::parseHeaders));
return m_connection.object->initContextsAsync().next(yieldTo(&HttpProcessor::Coroutine::parseHeaders));
}
HttpProcessor::Coroutine::Action HttpProcessor::Coroutine::parseHeaders() {
@ -321,7 +321,7 @@ HttpProcessor::Coroutine::Action HttpProcessor::Coroutine::parseHeaders() {
oatpp::async::Action HttpProcessor::Coroutine::onHeadersParsed(const RequestHeadersReader::Result& headersReadResult) {
m_currentRequest = protocol::http::incoming::Request::createShared(m_connection,
m_currentRequest = protocol::http::incoming::Request::createShared(m_connection.object,
headersReadResult.startingLine,
headersReadResult.headers,
m_inStream,
@ -404,7 +404,7 @@ HttpProcessor::Coroutine::Action HttpProcessor::Coroutine::onResponseFormed() {
auto contentEncoderProvider =
protocol::http::utils::CommunicationUtils::selectEncoder(m_currentRequest, m_components->contentEncodingProviders);
return protocol::http::outgoing::Response::sendAsync(m_currentResponse, m_connection, m_headersOutBuffer, contentEncoderProvider)
return protocol::http::outgoing::Response::sendAsync(m_currentResponse, m_connection.object, m_headersOutBuffer, contentEncoderProvider)
.next(yieldTo(&HttpProcessor::Coroutine::onRequestDone));
}

View File

@ -161,10 +161,10 @@ private:
struct ProcessingResources {
ProcessingResources(const std::shared_ptr<Components>& pComponents,
const std::shared_ptr<oatpp::data::stream::IOStream>& pConnection);
const provider::ResourceHandle<oatpp::data::stream::IOStream>& pConnection);
std::shared_ptr<Components> components;
std::shared_ptr<oatpp::data::stream::IOStream> connection;
provider::ResourceHandle<oatpp::data::stream::IOStream> connection;
oatpp::data::stream::BufferOutputStream headersInBuffer;
oatpp::data::stream::BufferOutputStream headersOutBuffer;
RequestHeadersReader headersReader;
@ -189,7 +189,7 @@ public:
class Task : public base::Countable {
private:
std::shared_ptr<Components> m_components;
std::shared_ptr<oatpp::data::stream::IOStream> m_connection;
provider::ResourceHandle<oatpp::data::stream::IOStream> m_connection;
std::atomic_long *m_counter;
public:
@ -199,7 +199,7 @@ public:
* @param connection - &id:oatpp::data::stream::IOStream;.
*/
Task(const std::shared_ptr<Components>& components,
const std::shared_ptr<oatpp::data::stream::IOStream>& connection,
const provider::ResourceHandle<oatpp::data::stream::IOStream>& connection,
std::atomic_long *taskCounter);
/**
@ -248,7 +248,7 @@ public:
class Coroutine : public oatpp::async::Coroutine<HttpProcessor::Coroutine> {
private:
std::shared_ptr<Components> m_components;
std::shared_ptr<oatpp::data::stream::IOStream> m_connection;
provider::ResourceHandle<oatpp::data::stream::IOStream> m_connection;
oatpp::data::stream::BufferOutputStream m_headersInBuffer;
RequestHeadersReader m_headersReader;
std::shared_ptr<oatpp::data::stream::BufferOutputStream> m_headersOutBuffer;
@ -268,7 +268,7 @@ public:
* @param connection - &id:oatpp::data::stream::IOStream;.
*/
Coroutine(const std::shared_ptr<Components>& components,
const std::shared_ptr<oatpp::data::stream::IOStream>& connection,
const provider::ResourceHandle<oatpp::data::stream::IOStream>& connection,
std::atomic_long *taskCounter);
Action act() override;

View File

@ -37,15 +37,28 @@ struct Resource {
};
class Provider : public oatpp::provider::Provider<Resource> {
private:
class ResourceInvalidator : public oatpp::provider::Invalidator<Resource> {
public:
void invalidate(const std::shared_ptr<Resource>& resource) override {
(void) resource;
}
};
private:
std::shared_ptr<ResourceInvalidator> m_invalidator;
public:
std::shared_ptr<Resource> get() override {
return std::make_shared<Resource>();
oatpp::provider::ResourceHandle<Resource> get() override {
return oatpp::provider::ResourceHandle<Resource>(std::make_shared<Resource>(), m_invalidator);
}
async::CoroutineStarterForResult<const std::shared_ptr<Resource> &> getAsync() override {
async::CoroutineStarterForResult<const oatpp::provider::ResourceHandle<Resource> &> getAsync() override {
class GetCoroutine : public oatpp::async::CoroutineWithResult<GetCoroutine, const std::shared_ptr<Resource>&> {
class GetCoroutine : public oatpp::async::CoroutineWithResult<GetCoroutine, const oatpp::provider::ResourceHandle<Resource>&> {
private:
Provider* m_provider;
public:
@ -55,7 +68,7 @@ public:
{}
Action act() override {
return _return(std::make_shared<Resource>());
return _return(oatpp::provider::ResourceHandle<Resource>(std::make_shared<Resource>(), m_provider->m_invalidator));
}
};
@ -63,10 +76,6 @@ public:
return GetCoroutine::startForResult(this);
}
void invalidate(const std::shared_ptr<Resource>& resource) override {
(void) resource;
}
void stop() override {
OATPP_LOGD("Provider", "stop()");
}
@ -75,7 +84,7 @@ public:
struct AcquisitionProxy : public oatpp::provider::AcquisitionProxy<Resource, AcquisitionProxy> {
AcquisitionProxy(const std::shared_ptr<Resource>& resource, const std::shared_ptr<PoolInstance>& pool)
AcquisitionProxy(const oatpp::provider::ResourceHandle<Resource>& resource, const std::shared_ptr<PoolInstance>& pool)
: oatpp::provider::AcquisitionProxy<Resource, AcquisitionProxy>(resource, pool)
{}
@ -87,11 +96,11 @@ struct Pool : public oatpp::provider::PoolTemplate<Resource, AcquisitionProxy> {
: oatpp::provider::PoolTemplate<Resource, AcquisitionProxy>(provider, maxResources, maxResourceTTL, timeout)
{}
static std::shared_ptr<Resource> get(const std::shared_ptr<PoolTemplate>& _this) {
static oatpp::provider::ResourceHandle<Resource> get(const std::shared_ptr<PoolTemplate>& _this) {
return oatpp::provider::PoolTemplate<Resource, AcquisitionProxy>::get(_this);
}
static async::CoroutineStarterForResult<const std::shared_ptr<Resource>&> getAsync(const std::shared_ptr<PoolTemplate>& _this) {
static async::CoroutineStarterForResult<const oatpp::provider::ResourceHandle<Resource>&> getAsync(const std::shared_ptr<PoolTemplate>& _this) {
return oatpp::provider::PoolTemplate<Resource, AcquisitionProxy>::getAsync(_this);
}
@ -109,10 +118,10 @@ struct Pool : public oatpp::provider::PoolTemplate<Resource, AcquisitionProxy> {
class ClientCoroutine : public oatpp::async::Coroutine<ClientCoroutine> {
private:
std::shared_ptr<oatpp::provider::PoolTemplate<Resource, AcquisitionProxy>> m_pool;
std::promise<std::shared_ptr<Resource>>* m_promise;
std::promise<oatpp::provider::ResourceHandle<Resource>>* m_promise;
public:
ClientCoroutine(const std::shared_ptr<oatpp::provider::PoolTemplate<Resource, AcquisitionProxy>>& pool, std::promise<std::shared_ptr<Resource>>* promise)
ClientCoroutine(const std::shared_ptr<oatpp::provider::PoolTemplate<Resource, AcquisitionProxy>>& pool, std::promise<oatpp::provider::ResourceHandle<Resource>>* promise)
: m_pool(pool)
, m_promise(promise)
{}
@ -121,7 +130,7 @@ public:
return Pool::getAsync(m_pool).callbackTo(&ClientCoroutine::onGet);
}
Action onGet(const std::shared_ptr<Resource>& resource) {
Action onGet(const oatpp::provider::ResourceHandle<Resource>& resource) {
m_promise->set_value(resource);
return finish();
}
@ -137,8 +146,8 @@ void PoolTemplateTest::onRun() {
{
OATPP_LOGD(TAG, "Synchronously with timeout");
auto poolTemplate = Pool::createShared(provider, maxResources, std::chrono::seconds(10), std::chrono::milliseconds(500));
std::shared_ptr<Resource> resource = Pool::get(poolTemplate);
oatpp::provider::ResourceHandle<Resource> resource = Pool::get(poolTemplate);
OATPP_ASSERT(resource != nullptr);
OATPP_ASSERT(Pool::get(poolTemplate) == nullptr);
@ -150,9 +159,9 @@ void PoolTemplateTest::onRun() {
OATPP_LOGD(TAG, "Synchronously without timeout");
auto poolTemplate = Pool::createShared(provider, maxResources, std::chrono::seconds(10), std::chrono::milliseconds::zero());
std::shared_ptr<Resource> resource = Pool::get(poolTemplate);
oatpp::provider::ResourceHandle<Resource> resource = Pool::get(poolTemplate);
OATPP_ASSERT(resource != nullptr);
std::future<std::shared_ptr<Resource>> futureResource = std::async(std::launch::async, [&poolTemplate]() {
std::future<oatpp::provider::ResourceHandle<Resource>> futureResource = std::async(std::launch::async, [&poolTemplate]() {
return Pool::get(poolTemplate);
});
OATPP_ASSERT(futureResource.wait_for(std::chrono::seconds(1)) == std::future_status::timeout);
@ -166,19 +175,24 @@ void PoolTemplateTest::onRun() {
oatpp::async::Executor executor(1, 1, 1);
auto poolTemplate = Pool::createShared(provider, maxResources, std::chrono::seconds(10), std::chrono::milliseconds(500));
std::shared_ptr<Resource> resource;
oatpp::provider::ResourceHandle<Resource> resourceHandle;
{
std::promise<std::shared_ptr<Resource>> promise;
std::promise<oatpp::provider::ResourceHandle<Resource>> promise;
auto future = promise.get_future();
executor.execute<ClientCoroutine>(poolTemplate, &promise);
resource = future.get();
OATPP_ASSERT(resource != nullptr);
resourceHandle = future.get();
OATPP_ASSERT(resourceHandle != nullptr);
OATPP_ASSERT(resourceHandle.object != nullptr)
OATPP_ASSERT(resourceHandle.invalidator != nullptr)
}
{
std::promise<std::shared_ptr<Resource>> promise;
std::promise<oatpp::provider::ResourceHandle<Resource>> promise;
auto future = promise.get_future();
executor.execute<ClientCoroutine>(poolTemplate, &promise);
OATPP_ASSERT(future.get() == nullptr);
resourceHandle = future.get();
OATPP_ASSERT(resourceHandle == nullptr);
OATPP_ASSERT(resourceHandle.object == nullptr)
OATPP_ASSERT(resourceHandle.invalidator == nullptr)
}
poolTemplate->stop();
@ -190,10 +204,10 @@ void PoolTemplateTest::onRun() {
oatpp::async::Executor executor(1, 1, 1);
auto poolTemplate = Pool::createShared(provider, maxResources, std::chrono::seconds(10), std::chrono::milliseconds::zero());
std::shared_ptr<Resource> resource = Pool::get(poolTemplate);
oatpp::provider::ResourceHandle<Resource> resource = Pool::get(poolTemplate);
OATPP_ASSERT(resource != nullptr);
std::promise<std::shared_ptr<Resource>> promise;
std::promise<oatpp::provider::ResourceHandle<Resource>> promise;
auto future = promise.get_future();
executor.execute<ClientCoroutine>(poolTemplate, &promise);
OATPP_ASSERT(future.wait_for(std::chrono::seconds(1)) == std::future_status::timeout);

View File

@ -59,16 +59,31 @@ public:
class Provider : public oatpp::provider::Provider<Resource> {
private:
class ResourceInvalidator : public oatpp::provider::Invalidator<Resource> {
public:
void invalidate(const std::shared_ptr<Resource>& resource) override {
(void) resource;
}
};
private:
std::shared_ptr<ResourceInvalidator> m_invalidator = std::make_shared<ResourceInvalidator>();
std::atomic<v_int64> m_id;
public:
std::shared_ptr<Resource> get() override {
return std::make_shared<MyResource>(++m_id);
oatpp::provider::ResourceHandle<Resource> get() override {
return oatpp::provider::ResourceHandle<Resource>(
std::make_shared<MyResource>(++m_id),
m_invalidator
);
}
async::CoroutineStarterForResult<const std::shared_ptr<Resource> &> getAsync() override {
async::CoroutineStarterForResult<const oatpp::provider::ResourceHandle<Resource> &> getAsync() override {
class GetCoroutine : public oatpp::async::CoroutineWithResult<GetCoroutine, const std::shared_ptr<Resource>&> {
class GetCoroutine : public oatpp::async::CoroutineWithResult<GetCoroutine, const oatpp::provider::ResourceHandle<Resource>&> {
private:
Provider* m_provider;
public:
@ -78,7 +93,10 @@ public:
{}
Action act() override {
return _return(std::make_shared<MyResource>(++ m_provider->m_id));
return _return(oatpp::provider::ResourceHandle<Resource>(
std::make_shared<MyResource>(++ m_provider->m_id),
m_provider->m_invalidator
));
}
};
@ -86,10 +104,6 @@ public:
return GetCoroutine::startForResult(this);
}
void invalidate(const std::shared_ptr<Resource>& resource) override {
(void) resource;
}
void stop() override {
OATPP_LOGD("Provider", "stop()");
}
@ -103,12 +117,13 @@ public:
struct AcquisitionProxy : public oatpp::provider::AcquisitionProxy<Resource, AcquisitionProxy> {
AcquisitionProxy(const std::shared_ptr<Resource>& resource, const std::shared_ptr<PoolInstance>& pool)
AcquisitionProxy(const oatpp::provider::ResourceHandle<Resource>& resource,
const std::shared_ptr<PoolInstance>& pool)
: oatpp::provider::AcquisitionProxy<Resource, AcquisitionProxy>(resource, pool)
{}
v_int64 myId() override {
return _obj->myId();
return _handle.object->myId();
}
};
@ -119,7 +134,7 @@ typedef oatpp::provider::Pool<oatpp::provider::Provider<Resource>, Resource, Acq
class ClientCoroutine : public oatpp::async::Coroutine<ClientCoroutine> {
private:
std::shared_ptr<Pool> m_pool;
std::shared_ptr<Resource> m_resource;
oatpp::provider::ResourceHandle<Resource> m_resource;
bool m_invalidate;
public:
@ -132,14 +147,14 @@ public:
return m_pool->getAsync().callbackTo(&ClientCoroutine::onGet);
}
Action onGet(const std::shared_ptr<Resource>& resource) {
Action onGet(const oatpp::provider::ResourceHandle<Resource>& resource) {
m_resource = resource;
return waitFor(std::chrono::milliseconds(100)).next(yieldTo(&ClientCoroutine::onUse));
}
Action onUse() {
if(m_invalidate) {
m_pool->invalidate(m_resource);
m_resource.invalidator->invalidate(m_resource.object);
}
return finish();
}
@ -150,7 +165,7 @@ void clientMethod(std::shared_ptr<Pool> pool, bool invalidate) {
auto resource = pool->get();
std::this_thread::sleep_for(std::chrono::milliseconds(100));
if(invalidate) {
pool->invalidate(resource);
resource.invalidator->invalidate(resource.object);
}
}

View File

@ -75,6 +75,18 @@ public:
};
class StubStreamProvider : public oatpp::network::ConnectionProvider {
private:
class Invalidator : public oatpp::provider::Invalidator<oatpp::data::stream::IOStream> {
public:
void invalidate(const std::shared_ptr<oatpp::data::stream::IOStream>& connection) override {
(void)connection;
// DO Nothing.
}
};
private:
std::shared_ptr<Invalidator> m_invalidator = std::make_shared<Invalidator>();
public:
StubStreamProvider()
@ -83,24 +95,36 @@ public:
std::atomic<v_int64> counter;
std::shared_ptr<oatpp::data::stream::IOStream> get() override {
oatpp::provider::ResourceHandle<oatpp::data::stream::IOStream> get() override {
++ counter;
return std::make_shared<StubStream>();
return oatpp::provider::ResourceHandle<oatpp::data::stream::IOStream>(
std::make_shared<StubStream>(),
m_invalidator
);
}
oatpp::async::CoroutineStarterForResult<const std::shared_ptr<oatpp::data::stream::IOStream>&> getAsync() override {
oatpp::async::CoroutineStarterForResult<const oatpp::provider::ResourceHandle<oatpp::data::stream::IOStream>&> getAsync() override {
class ConnectionCoroutine : public oatpp::async::CoroutineWithResult<ConnectionCoroutine, const std::shared_ptr<oatpp::data::stream::IOStream>&> {
class ConnectionCoroutine : public oatpp::async::CoroutineWithResult<ConnectionCoroutine, const oatpp::provider::ResourceHandle<oatpp::data::stream::IOStream>&> {
private:
std::shared_ptr<Invalidator> m_invalidator;
public:
ConnectionCoroutine(const std::shared_ptr<Invalidator>& invalidator)
: m_invalidator(invalidator)
{}
Action act() override {
return _return(std::make_shared<StubStream>());
return _return(oatpp::provider::ResourceHandle<oatpp::data::stream::IOStream>(
std::make_shared<StubStream>(),
m_invalidator
));
}
};
++ counter;
return ConnectionCoroutine::startForResult();
return ConnectionCoroutine::startForResult(m_invalidator);
}
@ -108,17 +132,12 @@ public:
// DO NOTHING
}
void invalidate(const std::shared_ptr<oatpp::data::stream::IOStream>& connection) override {
(void)connection;
// DO Nothing.
}
};
class ClientCoroutine : public oatpp::async::Coroutine<ClientCoroutine> {
private:
std::shared_ptr<ConnectionPool> m_pool;
std::shared_ptr<oatpp::data::stream::IOStream> m_connection;
oatpp::provider::ResourceHandle<oatpp::data::stream::IOStream> m_connection;
v_int32 m_repeats;
bool m_invalidate;
public:
@ -133,7 +152,7 @@ public:
return m_pool->getAsync().callbackTo(&ClientCoroutine::onConnection);
}
Action onConnection(const std::shared_ptr<oatpp::data::stream::IOStream>& connection) {
Action onConnection(const oatpp::provider::ResourceHandle<oatpp::data::stream::IOStream>& connection) {
m_connection = connection;
return yieldTo(&ClientCoroutine::useConnection);
}
@ -144,7 +163,7 @@ public:
return waitFor(std::chrono::milliseconds(100)).next(yieldTo(&ClientCoroutine::useConnection));
}
if(m_invalidate) {
m_pool->invalidate(m_connection);
m_connection.invalidator->invalidate(m_connection.object);
}
return finish();
}
@ -155,7 +174,7 @@ void clientMethod(std::shared_ptr<ConnectionPool> pool, bool invalidate) {
auto connection = pool->get();
std::this_thread::sleep_for(std::chrono::milliseconds(100));
if(invalidate) {
pool->invalidate(connection);
connection.invalidator->invalidate(connection.object);
}
}

View File

@ -139,7 +139,7 @@ void PipelineAsyncTest::onRun() {
OATPP_COMPONENT(std::shared_ptr<oatpp::network::ClientConnectionProvider>, clientConnectionProvider);
auto connection = clientConnectionProvider->get();
connection->setInputStreamIOMode(oatpp::data::stream::IOMode::BLOCKING);
connection.object->setInputStreamIOMode(oatpp::data::stream::IOMode::BLOCKING);
std::thread pipeInThread([this, connection] {
@ -153,7 +153,7 @@ void PipelineAsyncTest::onRun() {
oatpp::data::buffer::IOBuffer ioBuffer;
oatpp::data::stream::transfer(&inputStream, connection.get(), 0, ioBuffer.getData(), ioBuffer.getSize());
oatpp::data::stream::transfer(&inputStream, connection.object.get(), 0, ioBuffer.getData(), ioBuffer.getSize());
});
@ -163,7 +163,7 @@ void PipelineAsyncTest::onRun() {
oatpp::data::stream::ChunkedBuffer receiveStream;
oatpp::data::buffer::IOBuffer ioBuffer;
auto res = oatpp::data::stream::transfer(connection.get(), &receiveStream, sample->size() * m_pipelineSize, ioBuffer.getData(), ioBuffer.getSize());
auto res = oatpp::data::stream::transfer(connection.object.get(), &receiveStream, sample->size() * m_pipelineSize, ioBuffer.getData(), ioBuffer.getSize());
auto result = receiveStream.toString();

View File

@ -134,7 +134,7 @@ void PipelineTest::onRun() {
OATPP_COMPONENT(std::shared_ptr<oatpp::network::ClientConnectionProvider>, clientConnectionProvider);
auto connection = clientConnectionProvider->get();
connection->setInputStreamIOMode(oatpp::data::stream::IOMode::BLOCKING);
connection.object->setInputStreamIOMode(oatpp::data::stream::IOMode::BLOCKING);
std::thread pipeInThread([this, connection] {
@ -151,7 +151,7 @@ void PipelineTest::onRun() {
oatpp::data::buffer::IOBuffer ioBuffer;
auto res = oatpp::data::stream::transfer(&inputStream, connection.get(), 0, ioBuffer.getData(), ioBuffer.getSize());
auto res = oatpp::data::stream::transfer(&inputStream, connection.object.get(), 0, ioBuffer.getData(), ioBuffer.getSize());
});
@ -164,7 +164,7 @@ void PipelineTest::onRun() {
v_io_size transferSize = sample->size() * m_pipelineSize;
OATPP_LOGD(TAG, "want to Receive %d bytes", transferSize);
auto res = oatpp::data::stream::transfer(connection.get(), &receiveStream, transferSize, ioBuffer.getData(), ioBuffer.getSize());
auto res = oatpp::data::stream::transfer(connection.object.get(), &receiveStream, transferSize, ioBuffer.getData(), ioBuffer.getSize());
auto result = receiveStream.toString();