Merge pull request #488 from oatpp/connection_monitor

Connection monitor
This commit is contained in:
Leonid Stryzhevskyi 2021-10-13 02:48:27 +03:00 committed by GitHub
commit cc40fe69c8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
20 changed files with 1266 additions and 22 deletions

View File

@ -138,6 +138,14 @@ add_library(oatpp
oatpp/encoding/Hex.hpp
oatpp/encoding/Unicode.cpp
oatpp/encoding/Unicode.hpp
oatpp/network/monitor/ConnectionInactivityChecker.cpp
oatpp/network/monitor/ConnectionInactivityChecker.hpp
oatpp/network/monitor/ConnectionMaxAgeChecker.cpp
oatpp/network/monitor/ConnectionMaxAgeChecker.hpp
oatpp/network/monitor/ConnectionMonitor.cpp
oatpp/network/monitor/ConnectionMonitor.hpp
oatpp/network/monitor/MetricsChecker.hpp
oatpp/network/monitor/StatCollector.hpp
oatpp/network/tcp/client/ConnectionProvider.cpp
oatpp/network/tcp/client/ConnectionProvider.hpp
oatpp/network/tcp/server/ConnectionProvider.cpp

View File

@ -727,7 +727,11 @@ async::CoroutineStarter transferAsync(const base::ObjectHandle<ReadCallback>& re
switch(res) {
case IOError::BROKEN_PIPE:
return error<AsyncTransferError>("[oatpp::data::stream::transferAsync]: Error. ReadCallback. BROKEN_PIPE.");
if(m_transferSize > 0) {
return error<AsyncTransferError>("[oatpp::data::stream::transferAsync]: Error. ReadCallback. BROKEN_PIPE.");
}
m_inData.set(nullptr, 0);
break;
case IOError::ZERO_VALUE:
m_inData.set(nullptr, 0);
@ -746,10 +750,13 @@ async::CoroutineStarter transferAsync(const base::ObjectHandle<ReadCallback>& re
return repeat();
default:
if(!action.isNone()) {
return action;
if(m_transferSize > 0) {
if (!action.isNone()) {
return action;
}
return error<AsyncTransferError>("[oatpp::data::stream::transferAsync]: Error. ReadCallback. Unknown IO error.");
}
return error<AsyncTransferError>("[oatpp::data::stream::transferAsync]: Error. ReadCallback. Unknown IO error.");
m_inData.set(nullptr, 0);
}

View File

@ -35,7 +35,7 @@ namespace oatpp { namespace provider {
* @tparam T - resource class.
*/
template <class T>
class Provider {
class Provider : public oatpp::base::Countable {
protected:
void setProperty(const oatpp::String& key, const oatpp::String& value) {

View File

@ -57,13 +57,13 @@ public:
/**
* No properties here. It is just a logical division
*/
class ServerConnectionProvider : public ConnectionProvider {
class ServerConnectionProvider : virtual public ConnectionProvider {
};
/**
* No properties here. It is just a logical division
*/
class ClientConnectionProvider : public ConnectionProvider {
class ClientConnectionProvider : virtual public ConnectionProvider {
};
}}

View File

@ -36,7 +36,7 @@ const v_int32 Server::STATUS_RUNNING = 2;
const v_int32 Server::STATUS_STOPPING = 3;
const v_int32 Server::STATUS_DONE = 4;
Server::Server(const std::shared_ptr<ServerConnectionProvider> &connectionProvider,
Server::Server(const std::shared_ptr<ConnectionProvider> &connectionProvider,
const std::shared_ptr<ConnectionHandler> &connectionHandler)
: m_status(STATUS_CREATED)
, m_connectionProvider(connectionProvider)
@ -50,11 +50,15 @@ void Server::conditionalMainLoop() {
while (getStatus() == STATUS_RUNNING) {
if (m_condition()) {
auto connection = m_connectionProvider->get();
std::shared_ptr<data::stream::IOStream> connection;
{
std::lock_guard<oatpp::concurrency::SpinLock> lg(m_spinlock);
connection = m_connectionProvider->get();
}
if (connection) {
if (getStatus() == STATUS_RUNNING) {
if (m_condition()) {
std::lock_guard<oatpp::concurrency::SpinLock> lg(m_spinlock);
m_connectionHandler->handleConnection(connection, params /* null params */);
} else {
setStatus(STATUS_STOPPING);
@ -75,10 +79,15 @@ void Server::mainLoop(Server *instance) {
std::shared_ptr<const std::unordered_map<oatpp::String, oatpp::String>> params;
while (instance->getStatus() == STATUS_RUNNING) {
auto connection = instance->m_connectionProvider->get();
std::shared_ptr<data::stream::IOStream> connection;
{
std::lock_guard<oatpp::concurrency::SpinLock> lg(instance->m_spinlock);
connection = instance->m_connectionProvider->get();
}
if (connection) {
if (instance->getStatus() == STATUS_RUNNING) {
std::lock_guard<oatpp::concurrency::SpinLock> lg(instance->m_spinlock);
instance->m_connectionHandler->handleConnection(connection, params /* null params */);
} else {
OATPP_LOGD("[oatpp::network::server::mainLoop()]", "Error. Server already stopped - closing connection...");
@ -86,7 +95,6 @@ void Server::mainLoop(Server *instance) {
}
}
instance->setStatus(STATUS_DONE);
}
@ -163,6 +171,16 @@ v_int32 Server::getStatus() {
return m_status.load();
}
void Server::setConnectionProvider(const std::shared_ptr<ServerConnectionProvider> &connectionProvider) {
std::lock_guard<oatpp::concurrency::SpinLock> lg(m_spinlock);
m_connectionProvider = connectionProvider;
}
void Server::setConnectionHandler(const std::shared_ptr<ConnectionHandler> &connectionHandler) {
std::lock_guard<oatpp::concurrency::SpinLock> lg(m_spinlock);
m_connectionHandler = connectionHandler;
}
Server::~Server() {
stop();
}

View File

@ -58,8 +58,9 @@ private:
std::function<bool()> m_condition;
std::thread m_thread;
std::mutex m_mutex;
oatpp::concurrency::SpinLock m_spinlock;
std::shared_ptr<ServerConnectionProvider> m_connectionProvider;
std::shared_ptr<ConnectionProvider> m_connectionProvider;
std::shared_ptr<ConnectionHandler> m_connectionHandler;
bool m_threaded;
@ -71,7 +72,7 @@ public:
* @param connectionProvider - &id:oatpp::network::ConnectionProvider;.
* @param connectionHandler - &id:oatpp::network::ConnectionHandler;.
*/
Server(const std::shared_ptr<ServerConnectionProvider>& connectionProvider,
Server(const std::shared_ptr<ConnectionProvider>& connectionProvider,
const std::shared_ptr<ConnectionHandler>& connectionHandler);
virtual ~Server();
@ -154,6 +155,18 @@ public:
* </ul>
*/
v_int32 getStatus();
/**
* Replaces the internal connection-provider
* @param connectionProvider - &id:oatpp::network::ConnectionProvider;.
*/
void setConnectionProvider(const std::shared_ptr<ServerConnectionProvider>& connectionProvider);
/**
* Replaces the internal connection-handler
* @param connectionHandler - &id:oatpp::network::ConnectionHandler;.
*/
void setConnectionHandler(const std::shared_ptr<ConnectionHandler>& connectionHandler);
};

View File

@ -0,0 +1,49 @@
/***************************************************************************
*
* Project _____ __ ____ _ _
* ( _ ) /__\ (_ _)_| |_ _| |_
* )(_)( /(__)\ )( (_ _)(_ _)
* (_____)(__)(__)(__) |_| |_|
*
*
* Copyright 2018-present, Leonid Stryzhevskyi <lganzzzo@gmail.com>
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
***************************************************************************/
#include "ConnectionInactivityChecker.hpp"
namespace oatpp { namespace network { namespace monitor {
ConnectionInactivityChecker::ConnectionInactivityChecker(const std::chrono::duration<v_int64, std::micro>& lastReadTimeout,
const std::chrono::duration<v_int64, std::micro>& lastWriteTimeout)
: m_lastReadTimeout(lastReadTimeout)
, m_lastWriteTimeout(lastWriteTimeout)
{}
std::vector<oatpp::String> ConnectionInactivityChecker::getMetricsList() {
return {};
}
std::shared_ptr<StatCollector> ConnectionInactivityChecker::createStatCollector(const oatpp::String& metricName) {
throw std::runtime_error("[oatpp::network::monitor::ConnectionInactivityChecker::createStatCollector()]: "
"Error. ConnectionInactivityChecker doesn't use any stat collectors.");
}
bool ConnectionInactivityChecker::check(const ConnectionStats& stats, v_int64 currMicroTime) {
return currMicroTime - stats.timestampLastRead < m_lastReadTimeout.count() &&
currMicroTime - stats.timestampLastWrite < m_lastWriteTimeout.count();
}
}}}

View File

@ -0,0 +1,60 @@
/***************************************************************************
*
* Project _____ __ ____ _ _
* ( _ ) /__\ (_ _)_| |_ _| |_
* )(_)( /(__)\ )( (_ _)(_ _)
* (_____)(__)(__)(__) |_| |_|
*
*
* Copyright 2018-present, Leonid Stryzhevskyi <lganzzzo@gmail.com>
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
***************************************************************************/
#ifndef oatpp_network_monitor_ConnectionInactivityChecker_hpp
#define oatpp_network_monitor_ConnectionInactivityChecker_hpp
#include "MetricsChecker.hpp"
namespace oatpp { namespace network { namespace monitor {
/**
* ConnectionInactivityChecker - checks if a connection is inactive (has no read/writes) and whether it should be closed.
* Extends - &id:oatpp::network::monitor::MetricsChecker;.
*/
class ConnectionInactivityChecker : public MetricsChecker {
private:
std::chrono::duration<v_int64, std::micro> m_lastReadTimeout;
std::chrono::duration<v_int64, std::micro> m_lastWriteTimeout;
public:
/**
* Constructor.
* @param lastReadTimeout - how long can live connection without reads.
* @param lastWriteTimeout - how long can live connection without writes.
*/
ConnectionInactivityChecker(const std::chrono::duration<v_int64, std::micro>& lastReadTimeout,
const std::chrono::duration<v_int64, std::micro>& lastWriteTimeout);
std::vector<oatpp::String> getMetricsList();
std::shared_ptr<StatCollector> createStatCollector(const oatpp::String& metricName);
bool check(const ConnectionStats& stats, v_int64 currMicroTime);
};
}}}
#endif //oatpp_network_monitor_ConnectionInactivityChecker_hpp

View File

@ -0,0 +1,46 @@
/***************************************************************************
*
* Project _____ __ ____ _ _
* ( _ ) /__\ (_ _)_| |_ _| |_
* )(_)( /(__)\ )( (_ _)(_ _)
* (_____)(__)(__)(__) |_| |_|
*
*
* Copyright 2018-present, Leonid Stryzhevskyi <lganzzzo@gmail.com>
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
***************************************************************************/
#include "ConnectionMaxAgeChecker.hpp"
namespace oatpp { namespace network { namespace monitor {
ConnectionMaxAgeChecker::ConnectionMaxAgeChecker(const std::chrono::duration<v_int64, std::micro>& maxAge)
: m_maxAge(maxAge)
{}
std::vector<oatpp::String> ConnectionMaxAgeChecker::getMetricsList() {
return {};
}
std::shared_ptr<StatCollector> ConnectionMaxAgeChecker::createStatCollector(const oatpp::String& metricName) {
throw std::runtime_error("[oatpp::network::monitor::ConnectionMaxAgeChecker::createStatCollector()]: "
"Error. ConnectionMaxAgeChecker doesn't use any stat collectors.");
}
bool ConnectionMaxAgeChecker::check(const ConnectionStats& stats, v_int64 currMicroTime) {
return currMicroTime - stats.timestampCreated < m_maxAge.count();
}
}}}

View File

@ -0,0 +1,57 @@
/***************************************************************************
*
* Project _____ __ ____ _ _
* ( _ ) /__\ (_ _)_| |_ _| |_
* )(_)( /(__)\ )( (_ _)(_ _)
* (_____)(__)(__)(__) |_| |_|
*
*
* Copyright 2018-present, Leonid Stryzhevskyi <lganzzzo@gmail.com>
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
***************************************************************************/
#ifndef oatpp_network_monitor_ConnectionMaxAgeChecker_hpp
#define oatpp_network_monitor_ConnectionMaxAgeChecker_hpp
#include "MetricsChecker.hpp"
namespace oatpp { namespace network { namespace monitor {
/**
* ConnectionMaxAgeChecker - checks if connection is too old and should be closed.
* Extends - &id:oatpp::network::monitor::MetricsChecker;.
*/
class ConnectionMaxAgeChecker : public MetricsChecker {
private:
std::chrono::duration<v_int64, std::micro> m_maxAge;
public:
/**
* Constructor.
* @param maxAge - how long should connection live.
*/
ConnectionMaxAgeChecker(const std::chrono::duration<v_int64, std::micro>& maxAge);
std::vector<oatpp::String> getMetricsList();
std::shared_ptr<StatCollector> createStatCollector(const oatpp::String& metricName);
bool check(const ConnectionStats& stats, v_int64 currMicroTime);
};
}}}
#endif //oatpp_network_monitor_ConnectionMaxAgeChecker_hpp

View File

@ -0,0 +1,333 @@
/***************************************************************************
*
* Project _____ __ ____ _ _
* ( _ ) /__\ (_ _)_| |_ _| |_
* )(_)( /(__)\ )( (_ _)(_ _)
* (_____)(__)(__)(__) |_| |_|
*
*
* Copyright 2018-present, Leonid Stryzhevskyi <lganzzzo@gmail.com>
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
***************************************************************************/
#include "ConnectionMonitor.hpp"
#include <chrono>
#include <thread>
namespace oatpp { namespace network { namespace monitor {
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// ConnectionMonitor::ConnectionProxy
ConnectionMonitor::ConnectionProxy::ConnectionProxy(const std::shared_ptr<Monitor>& monitor,
const std::shared_ptr<ConnectionProvider>& connectionProvider,
const std::shared_ptr<data::stream::IOStream>& connection)
: m_monitor(monitor)
, m_connectionProvider(connectionProvider)
, m_connection(connection)
{
m_stats.timestampCreated = base::Environment::getMicroTickCount();
}
ConnectionMonitor::ConnectionProxy::~ConnectionProxy() {
std::lock_guard<std::mutex> lock(m_statsMutex);
m_monitor->freeConnectionStats(m_stats);
if(m_stats.metricsData.size() > 0) {
for(auto& pair : m_stats.metricsData) {
OATPP_LOGE("[oatpp::network::ConnectionMonitor::ConnectionProxy::~ConnectionProxy()]",
"Error. Memory leak. Metric data was not deleted: Metric name - '%s'", pair.first->c_str());
}
}
m_monitor->removeConnection((v_uint64) this);
}
v_io_size ConnectionMonitor::ConnectionProxy::read(void *buffer, v_buff_size count, async::Action& action) {
auto res = m_connection->read(buffer, count, action);
std::lock_guard<std::mutex> lock(m_statsMutex);
m_monitor->onConnectionRead(m_stats, res);
return res;
}
v_io_size ConnectionMonitor::ConnectionProxy::write(const void *data, v_buff_size count, async::Action& action) {
auto res = m_connection->write(data, count, action);
std::lock_guard<std::mutex> lock(m_statsMutex);
m_monitor->onConnectionWrite(m_stats, res);
return res;
}
void ConnectionMonitor::ConnectionProxy::setInputStreamIOMode(data::stream::IOMode ioMode) {
m_connection->setInputStreamIOMode(ioMode);
}
data::stream::IOMode ConnectionMonitor::ConnectionProxy::getInputStreamIOMode() {
return m_connection->getInputStreamIOMode();
}
data::stream::Context& ConnectionMonitor::ConnectionProxy::getInputStreamContext() {
return m_connection->getInputStreamContext();
}
void ConnectionMonitor::ConnectionProxy::setOutputStreamIOMode(data::stream::IOMode ioMode) {
m_connection->setOutputStreamIOMode(ioMode);
}
data::stream::IOMode ConnectionMonitor::ConnectionProxy::getOutputStreamIOMode() {
return m_connection->getOutputStreamIOMode();
}
data::stream::Context& ConnectionMonitor::ConnectionProxy::getOutputStreamContext() {
return m_connection->getOutputStreamContext();
}
void ConnectionMonitor::ConnectionProxy::invalidate() {
m_connectionProvider->invalidate(m_connection);
}
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// Monitor
void ConnectionMonitor::Monitor::monitorTask(std::shared_ptr<Monitor> monitor) {
while(monitor->m_running) {
{
std::lock_guard<std::mutex> lock(monitor->m_connectionsMutex);
auto currMicroTime = oatpp::base::Environment::getMicroTickCount();
for(auto& pair : monitor->m_connections) {
auto connection = pair.second.lock();
std::lock_guard<std::mutex> dataLock(connection->m_statsMutex);
std::lock_guard<std::mutex> analysersLock(monitor->m_checkMutex);
for(auto& a : monitor->m_metricsCheckers) {
bool res = a->check(connection->m_stats, currMicroTime);
if(!res) {
connection->invalidate();
break;
}
}
}
}
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
}
{
std::lock_guard<std::mutex>(monitor->m_runMutex);
monitor->m_stopped = true;
}
monitor->m_runCondition.notify_all();
}
void* ConnectionMonitor::Monitor::createOrGetMetricData(ConnectionStats& stats, const std::shared_ptr<StatCollector>& collector) {
void* data;
auto it = stats.metricsData.find(collector->metricName());
if(it == stats.metricsData.end()) {
data = collector->createMetricData();
stats.metricsData.insert({collector->metricName(), data});
} else {
data = it->second;
}
return data;
}
std::shared_ptr<ConnectionMonitor::Monitor> ConnectionMonitor::Monitor::createShared() {
auto monitor = std::make_shared<Monitor>();
std::thread t([monitor](){
ConnectionMonitor::Monitor::monitorTask(monitor);
});
t.detach();
return monitor;
}
void ConnectionMonitor::Monitor::addConnection(v_uint64 id, const std::weak_ptr<ConnectionProxy>& connection) {
std::lock_guard<std::mutex> lock(m_connectionsMutex);
m_connections.insert({id, connection});
}
void ConnectionMonitor::Monitor::freeConnectionStats(ConnectionStats& stats) {
std::lock_guard<std::mutex> lock(m_checkMutex);
for(auto& metric : stats.metricsData) {
auto it = m_statCollectors.find(metric.first);
if(it != m_statCollectors.end()) {
it->second->deleteMetricData(metric.second);
} else {
OATPP_LOGE("[oatpp::network::ConnectionMonitor::Monitor::freeConnectionStats]",
"Error. Can't free Metric data. Unknown Metric: name - '%s'", it->first->c_str());
}
}
}
void ConnectionMonitor::Monitor::removeConnection(v_uint64 id) {
std::lock_guard<std::mutex> lock(m_connectionsMutex);
m_connections.erase(id);
}
void ConnectionMonitor::Monitor::addStatCollector(const std::shared_ptr<StatCollector>& collector) {
std::lock_guard<std::mutex> lock(m_checkMutex);
m_statCollectors.insert({collector->metricName(), collector});
}
void ConnectionMonitor::Monitor::removeStatCollector(const oatpp::String& metricName) {
std::lock_guard<std::mutex> lock(m_checkMutex);
m_statCollectors.erase(metricName);
}
void ConnectionMonitor::Monitor::addMetricsChecker(const std::shared_ptr<MetricsChecker>& checker) {
std::lock_guard<std::mutex> lock(m_checkMutex);
m_metricsCheckers.push_back(checker);
auto metrics = checker->getMetricsList();
for(auto& m : metrics) {
auto it = m_statCollectors.find(m);
if(it == m_statCollectors.end()) {
m_statCollectors.insert({m, checker->createStatCollector(m)});
}
}
}
void ConnectionMonitor::Monitor::onConnectionRead(ConnectionStats& stats, v_io_size readResult) {
v_int64 currTimestamp = base::Environment::getMicroTickCount();
if(readResult > 0) {
stats.totalRead += readResult;
stats.lastReadSize = readResult;
stats.timestampLastRead = currTimestamp;
}
{
std::lock_guard<std::mutex> lock(m_checkMutex);
for(auto& pair : m_statCollectors) {
pair.second->onRead(createOrGetMetricData(stats, pair.second), readResult, currTimestamp);
}
}
}
void ConnectionMonitor::Monitor::onConnectionWrite(ConnectionStats& stats, v_io_size writeResult) {
v_int64 currTimestamp = base::Environment::getMicroTickCount();
if(writeResult > 0) {
stats.totalWrite += writeResult;
stats.lastWriteSize = writeResult;
stats.timestampLastWrite = currTimestamp;
}
{
std::lock_guard<std::mutex> lock(m_checkMutex);
for(auto& pair : m_statCollectors) {
pair.second->onWrite(createOrGetMetricData(stats, pair.second), writeResult, currTimestamp);
}
}
}
void ConnectionMonitor::Monitor::stop() {
m_running = false;
std::unique_lock<std::mutex> runLock(m_runMutex);
while(!m_stopped) {
m_runCondition.wait(runLock);
}
}
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// ConnectionMonitor
ConnectionMonitor::ConnectionMonitor(const std::shared_ptr<ConnectionProvider>& connectionProvider)
: m_monitor(Monitor::createShared())
, m_connectionProvider(connectionProvider)
{
}
std::shared_ptr<data::stream::IOStream> ConnectionMonitor::get() {
auto connection = m_connectionProvider->get();
if(!connection) {
return nullptr;
}
auto proxy = std::make_shared<ConnectionProxy>(m_monitor, m_connectionProvider, connection);
m_monitor->addConnection((v_uint64) proxy.get(), proxy);
return proxy;
}
async::CoroutineStarterForResult<const std::shared_ptr<data::stream::IOStream>&>
ConnectionMonitor::getAsync() {
class GetConnectionCoroutine : public async::CoroutineWithResult<GetConnectionCoroutine, const std::shared_ptr<data::stream::IOStream>&> {
private:
std::shared_ptr<Monitor> m_monitor;
std::shared_ptr<ConnectionProvider> m_connectionProvider;
public:
GetConnectionCoroutine(const std::shared_ptr<Monitor>& monitor,
const std::shared_ptr<ConnectionProvider>& connectionProvider)
: m_monitor(monitor)
, m_connectionProvider(connectionProvider)
{}
Action act() override {
return m_connectionProvider->getAsync().callbackTo(&GetConnectionCoroutine::onConnection);
}
Action onConnection(const std::shared_ptr<data::stream::IOStream>& connection) {
if(!connection) {
return _return(nullptr);
}
auto proxy = std::make_shared<ConnectionProxy>(m_monitor, m_connectionProvider, connection);
m_monitor->addConnection((v_uint64) proxy.get(), proxy);
return _return(proxy);
}
};
return GetConnectionCoroutine::startForResult(m_monitor, m_connectionProvider);
}
void ConnectionMonitor::addStatCollector(const std::shared_ptr<StatCollector>& collector) {
m_monitor->addStatCollector(collector);
}
void ConnectionMonitor::addMetricsChecker(const std::shared_ptr<MetricsChecker>& checker) {
m_monitor->addMetricsChecker(checker);
}
void ConnectionMonitor::invalidate(const std::shared_ptr<data::stream::IOStream>& connection) {
auto proxy = std::static_pointer_cast<ConnectionProxy>(connection);
proxy->invalidate();
}
void ConnectionMonitor::stop() {
m_monitor->stop();
}
}}}

View File

@ -0,0 +1,151 @@
/***************************************************************************
*
* Project _____ __ ____ _ _
* ( _ ) /__\ (_ _)_| |_ _| |_
* )(_)( /(__)\ )( (_ _)(_ _)
* (_____)(__)(__)(__) |_| |_|
*
*
* Copyright 2018-present, Leonid Stryzhevskyi <lganzzzo@gmail.com>
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
***************************************************************************/
#ifndef oatpp_network_monitor_ConnectionMonitor_hpp
#define oatpp_network_monitor_ConnectionMonitor_hpp
#include "MetricsChecker.hpp"
#include "oatpp/network/ConnectionProvider.hpp"
#include "oatpp/core/data/stream/Stream.hpp"
#include <unordered_map>
#include <condition_variable>
namespace oatpp { namespace network { namespace monitor {
/**
* ConnectionMonitor is a middleman who's able to manage provided connections
* and close those ones that are not satisfy selected rules.
*/
class ConnectionMonitor : public ClientConnectionProvider, public ServerConnectionProvider {
private:
class Monitor; // FWD
class ConnectionProxy : public data::stream::IOStream {
friend Monitor;
private:
std::shared_ptr<Monitor> m_monitor;
/* provider which created this connection */
std::shared_ptr<ConnectionProvider> m_connectionProvider;
std::shared_ptr<data::stream::IOStream> m_connection;
std::mutex m_statsMutex;
ConnectionStats m_stats;
public:
ConnectionProxy(const std::shared_ptr<Monitor>& monitor,
const std::shared_ptr<ConnectionProvider>& connectionProvider,
const std::shared_ptr<data::stream::IOStream>& connection);
~ConnectionProxy() override;
v_io_size read(void *buffer, v_buff_size count, async::Action& action) override;
v_io_size write(const void *data, v_buff_size count, async::Action& action) override;
void setInputStreamIOMode(data::stream::IOMode ioMode) override;
data::stream::IOMode getInputStreamIOMode() override;
data::stream::Context& getInputStreamContext() override;
void setOutputStreamIOMode(data::stream::IOMode ioMode) override;
data::stream::IOMode getOutputStreamIOMode() override;
data::stream::Context& getOutputStreamContext() override;
void invalidate();
};
private:
class Monitor : public oatpp::base::Countable {
private:
std::mutex m_runMutex;
std::condition_variable m_runCondition;
std::atomic<bool> m_running {true};
bool m_stopped {false};
std::mutex m_connectionsMutex;
std::unordered_map<v_uint64, std::weak_ptr<ConnectionProxy>> m_connections;
std::mutex m_checkMutex;
std::vector<std::shared_ptr<MetricsChecker>> m_metricsCheckers;
std::unordered_map<oatpp::String, std::shared_ptr<StatCollector>> m_statCollectors;
private:
static void monitorTask(std::shared_ptr<Monitor> monitor);
private:
static void* createOrGetMetricData(ConnectionStats& stats, const std::shared_ptr<StatCollector>& collector);
public:
static std::shared_ptr<Monitor> createShared();
void addConnection(v_uint64 id, const std::weak_ptr<ConnectionProxy>& connection);
void freeConnectionStats(ConnectionStats& stats);
void removeConnection(v_uint64 id);
void addStatCollector(const std::shared_ptr<StatCollector>& collector);
void removeStatCollector(const oatpp::String& metricName);
void addMetricsChecker(const std::shared_ptr<MetricsChecker>& checker);
void onConnectionRead(ConnectionStats& stats, v_io_size readResult);
void onConnectionWrite(ConnectionStats& stats, v_io_size writeResult);
void stop();
};
private:
std::shared_ptr<Monitor> m_monitor;
std::shared_ptr<ConnectionProvider> m_connectionProvider;
public:
/**
* Constructor.
* @param connectionProvider - underlying connection provider.
*/
ConnectionMonitor(const std::shared_ptr<ConnectionProvider>& connectionProvider);
std::shared_ptr<data::stream::IOStream> get() override;
async::CoroutineStarterForResult<const std::shared_ptr<data::stream::IOStream>&> getAsync() override;
void addStatCollector(const std::shared_ptr<StatCollector>& collector);
/**
* Add metrics checker.
* @param checker - &id:oatpp::network::monitor::MetricsChecker;.
*/
void addMetricsChecker(const std::shared_ptr<MetricsChecker>& checker);
void invalidate(const std::shared_ptr<data::stream::IOStream>& connection) override;
void stop() override;
};
}}}
#endif //oatpp_network_monitor_ConnectionMonitor_hpp

View File

@ -0,0 +1,71 @@
/***************************************************************************
*
* Project _____ __ ____ _ _
* ( _ ) /__\ (_ _)_| |_ _| |_
* )(_)( /(__)\ )( (_ _)(_ _)
* (_____)(__)(__)(__) |_| |_|
*
*
* Copyright 2018-present, Leonid Stryzhevskyi <lganzzzo@gmail.com>
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
***************************************************************************/
#ifndef oatpp_network_monitor_MetricsChecker_hpp
#define oatpp_network_monitor_MetricsChecker_hpp
#include "StatCollector.hpp"
namespace oatpp { namespace network { namespace monitor {
/**
* MetricsChecker checks &id:oatpp::network::monitor::ConnectionStats; if those are satisfy the rule.
*/
class MetricsChecker : public oatpp::base::Countable {
public:
/**
* Default virtual destructor.
*/
virtual ~MetricsChecker() = default;
/**
* Get list of metrics names that are checked by this MetricsChecker.
* @return
*/
virtual std::vector<oatpp::String> getMetricsList() = 0;
/**
* Create &id:oatpp::network::monitor::StatCollector; for given `metricName`.
* This method will be called by &id:oatpp::network::monitor::ConnectionMonitor; only if there is
* no such `StatCollector` registered in the `ConnectionMonitor` yet.
* @param metricName - name of the metric.
* @return - &id:oatpp::network::monitor::StatCollector;.
*/
virtual std::shared_ptr<StatCollector> createStatCollector(const oatpp::String& metricName) = 0;
/**
* Called by &id:oatpp::network::monitor::ConnectionMonitor; on each
* time interval to check if connection satisfies the rule.
* @param stats - &id:oatpp::network::monitor::ConnectionStats;.
* @param currMicroTime - current time microseconds.
* @return - `true` if connection satisfies the rule. `false` if connection should be closed.
*/
virtual bool check(const ConnectionStats& stats, v_int64 currMicroTime) = 0;
};
}}}
#endif //oatpp_network_monitor_MetricsChecker_hpp

View File

@ -0,0 +1,131 @@
/***************************************************************************
*
* Project _____ __ ____ _ _
* ( _ ) /__\ (_ _)_| |_ _| |_
* )(_)( /(__)\ )( (_ _)(_ _)
* (_____)(__)(__)(__) |_| |_|
*
*
* Copyright 2018-present, Leonid Stryzhevskyi <lganzzzo@gmail.com>
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
***************************************************************************/
#ifndef oatpp_network_monitor_StatCollector_hpp
#define oatpp_network_monitor_StatCollector_hpp
#include "oatpp/core/Types.hpp"
#include "oatpp/core/IODefinitions.hpp"
namespace oatpp { namespace network { namespace monitor {
/**
* ConnectionStats.
*/
struct ConnectionStats {
/**
* Timestamp created microseconds.
* When connection was created.
*/
v_int64 timestampCreated = 0;
/**
* Total bytes read from the connection.
* Logs all bytes when the `read` method is called.
*/
v_io_size totalRead = 0;
/**
* Total bytes written to the connection.
* Logs all bytes when the `write` method is called.
*/
v_io_size totalWrite = 0;
/**
* Timestamp microseconds when the last successful read was performed on the connection.
*/
v_int64 timestampLastRead = 0;
/**
* Timestamp microseconds when the last successful write was performed on the connection.
*/
v_int64 timestampLastWrite = 0;
/**
* Amount of bytes read during the last successful read.
*/
v_io_size lastReadSize = 0;
/**
* Amount of bytes written during the last successful write.
*/
v_io_size lastWriteSize = 0;
/**
* Data collected by stat-collectors - &l:StatCollector;
*/
std::unordered_map<oatpp::String, void*> metricsData;
};
/**
* StatCollector collects metrics data of the connection.
*/
class StatCollector : public oatpp::base::Countable {
public:
/**
* Default virtual destructor.
*/
virtual ~StatCollector() = default;
/**
* Unique metric name that is collected by this `StatCollector`.
* @return - metricName. &id:oatpp::String;.
*/
virtual oatpp::String metricName() = 0;
/**
* Metric data constructor.
* @return
*/
virtual void* createMetricData() = 0;
/**
* Metric data destructor.
* @param metricData
*/
virtual void deleteMetricData(void* metricData) = 0;
/**
* On connection read event.
* @param metricData - metric data of the given connection.- the one created in the `createMetricData` method.
* @param readResult - result of the connection read method.
* @param timestamp - timestamp microseconds when the connection `read` method was called.
*/
virtual void onRead(void* metricData, v_io_size readResult, v_int64 timestamp) = 0;
/**
* On connection write event.
* @param metricData - metric data of the given connection.- the one created in the `createMetricData` method.
* @param writeResult - result of the connection write method.
* @param timestamp - timestamp microseconds when the connection `write` method was called.
*/
virtual void onWrite(void* metricData, v_io_size writeResult, v_int64 timestamp) = 0;
};
}}}
#endif //oatpp_network_monitor_StatCollector_hpp

View File

@ -35,7 +35,7 @@ namespace oatpp { namespace network { namespace tcp { namespace client {
/**
* Simple provider of clinet TCP connections.
*/
class ConnectionProvider : public base::Countable, public ClientConnectionProvider {
class ConnectionProvider : public ClientConnectionProvider {
protected:
network::Address m_address;
public:

View File

@ -36,7 +36,7 @@ namespace oatpp { namespace network { namespace tcp { namespace server {
/**
* Simple provider of TCP connections.
*/
class ConnectionProvider : public base::Countable, public ServerConnectionProvider {
class ConnectionProvider : public ServerConnectionProvider {
public:
/**

View File

@ -61,14 +61,16 @@ add_executable(oatppAllTests
oatpp/encoding/Base64Test.hpp
oatpp/encoding/UnicodeTest.cpp
oatpp/encoding/UnicodeTest.hpp
oatpp/network/ConnectionPoolTest.cpp
oatpp/network/ConnectionPoolTest.hpp
oatpp/network/UrlTest.cpp
oatpp/network/UrlTest.hpp
oatpp/network/monitor/ConnectionMonitorTest.cpp
oatpp/network/monitor/ConnectionMonitorTest.hpp
oatpp/network/virtual_/InterfaceTest.cpp
oatpp/network/virtual_/InterfaceTest.hpp
oatpp/network/virtual_/PipeTest.cpp
oatpp/network/virtual_/PipeTest.hpp
oatpp/network/ConnectionPoolTest.cpp
oatpp/network/ConnectionPoolTest.hpp
oatpp/network/UrlTest.cpp
oatpp/network/UrlTest.hpp
oatpp/parser/json/mapping/DTOMapperPerfTest.cpp
oatpp/parser/json/mapping/DTOMapperPerfTest.hpp
oatpp/parser/json/mapping/DTOMapperTest.cpp

View File

@ -15,6 +15,7 @@
#include "oatpp/network/virtual_/InterfaceTest.hpp"
#include "oatpp/network/UrlTest.hpp"
#include "oatpp/network/ConnectionPoolTest.hpp"
#include "oatpp/network/monitor/ConnectionMonitorTest.hpp"
#include "oatpp/parser/json/mapping/DeserializerTest.hpp"
#include "oatpp/parser/json/mapping/DTOMapperPerfTest.hpp"
@ -125,9 +126,8 @@ void runTests() {
OATPP_RUN_TEST(oatpp::test::encoding::UnicodeTest);
OATPP_RUN_TEST(oatpp::test::network::UrlTest);
OATPP_RUN_TEST(oatpp::test::network::ConnectionPoolTest);
OATPP_RUN_TEST(oatpp::test::network::monitor::ConnectionMonitorTest);
OATPP_RUN_TEST(oatpp::test::network::virtual_::PipeTest);
OATPP_RUN_TEST(oatpp::test::network::virtual_::InterfaceTest);

View File

@ -0,0 +1,255 @@
/***************************************************************************
*
* Project _____ __ ____ _ _
* ( _ ) /__\ (_ _)_| |_ _| |_
* )(_)( /(__)\ )( (_ _)(_ _)
* (_____)(__)(__)(__) |_| |_|
*
*
* Copyright 2018-present, Leonid Stryzhevskyi <lganzzzo@gmail.com>
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
***************************************************************************/
#include "ConnectionMonitorTest.hpp"
#include "oatpp/web/client/HttpRequestExecutor.hpp"
#include "oatpp/web/server/AsyncHttpConnectionHandler.hpp"
#include "oatpp/web/server/HttpConnectionHandler.hpp"
#include "oatpp/web/protocol/http/outgoing/StreamingBody.hpp"
#include "oatpp/network/monitor/ConnectionMonitor.hpp"
#include "oatpp/network/monitor/ConnectionMaxAgeChecker.hpp"
#include "oatpp/network/Server.hpp"
#include "oatpp/network/tcp/client/ConnectionProvider.hpp"
#include "oatpp/network/tcp/server/ConnectionProvider.hpp"
#include <thread>
namespace oatpp { namespace test { namespace network { namespace monitor {
namespace {
class ReadCallback : public oatpp::data::stream::ReadCallback {
public:
v_io_size read(void *buffer, v_buff_size count, async::Action &action) override {
OATPP_LOGI("TEST", "read(...)")
std::this_thread::sleep_for(std::chrono::milliseconds(100));
char* data = (char*) buffer;
data[0] = 'A';
return 1;
}
};
class StreamingHandler : public oatpp::web::server::HttpRequestHandler {
public:
std::shared_ptr<OutgoingResponse> handle(const std::shared_ptr<IncomingRequest>& request) override {
auto body = std::make_shared<oatpp::web::protocol::http::outgoing::StreamingBody>
(std::make_shared<ReadCallback>());
return OutgoingResponse::createShared(Status::CODE_200, body);
}
};
class AsyncStreamingHandler : public oatpp::web::server::HttpRequestHandler {
public:
oatpp::async::CoroutineStarterForResult<const std::shared_ptr<OutgoingResponse>&>
handleAsync(const std::shared_ptr<IncomingRequest>& request) {
class StreamCoroutine : public oatpp::async::CoroutineWithResult<StreamCoroutine, const std::shared_ptr<OutgoingResponse>&> {
public:
Action act() override {
auto body = std::make_shared<oatpp::web::protocol::http::outgoing::StreamingBody>
(std::make_shared<ReadCallback>());
return _return(OutgoingResponse::createShared(Status::CODE_200, body));
}
};
return StreamCoroutine::startForResult();
}
};
std::shared_ptr<oatpp::network::Server> runServer(const std::shared_ptr<oatpp::network::monitor::ConnectionMonitor>& monitor) {
auto router = oatpp::web::server::HttpRouter::createShared();
router->route("GET", "/stream", std::make_shared<StreamingHandler>());
auto connectionHandler = oatpp::web::server::HttpConnectionHandler::createShared(router);
auto server = std::make_shared<oatpp::network::Server>(monitor, connectionHandler);
std::thread t([server, connectionHandler]{
server->run();
OATPP_LOGD("TEST", "server stopped");
connectionHandler->stop();
OATPP_LOGD("TEST", "connectionHandler stopped");
});
t.detach();
return server;
}
std::shared_ptr<oatpp::network::Server> runAsyncServer(const std::shared_ptr<oatpp::network::monitor::ConnectionMonitor>& monitor) {
auto router = oatpp::web::server::HttpRouter::createShared();
router->route("GET", "/stream", std::make_shared<AsyncStreamingHandler>());
auto executor = std::make_shared<oatpp::async::Executor>();
auto connectionHandler = oatpp::web::server::AsyncHttpConnectionHandler::createShared(router, executor);
auto server = std::make_shared<oatpp::network::Server>(monitor, connectionHandler);
std::thread t([server, connectionHandler, executor]{
server->run();
OATPP_LOGD("TEST_ASYNC", "server stopped");
connectionHandler->stop();
OATPP_LOGD("TEST_ASYNC", "connectionHandler stopped");
executor->waitTasksFinished();
executor->stop();
executor->join();
OATPP_LOGD("TEST_ASYNC", "executor stopped");
});
t.detach();
return server;
}
void runClient() {
auto connectionProvider = oatpp::network::tcp::client::ConnectionProvider::createShared(
{"localhost", 8000});
oatpp::web::client::HttpRequestExecutor executor(connectionProvider);
auto response = executor.execute("GET", "/stream", oatpp::web::protocol::http::Headers({}), nullptr, nullptr);
OATPP_ASSERT(response->getStatusCode() == 200);
auto data = response->readBodyToString();
OATPP_ASSERT(data)
OATPP_LOGD("TEST", "data->size() == %d", data->size())
OATPP_ASSERT(data->size() < 110) // it should be less than 100. But we put 110 for redundancy
}
void runAsyncClient() {
class ClientCoroutine : public oatpp::async::Coroutine<ClientCoroutine> {
private:
std::shared_ptr<oatpp::web::client::HttpRequestExecutor> m_executor;
std::shared_ptr<oatpp::network::monitor::ConnectionMonitor> m_monitor;
public:
ClientCoroutine() {
auto connectionProvider = oatpp::network::tcp::client::ConnectionProvider::createShared(
{"localhost", 8000});
m_monitor = std::make_shared<oatpp::network::monitor::ConnectionMonitor>(connectionProvider);
m_monitor->addMetricsChecker(
std::make_shared<oatpp::network::monitor::ConnectionMaxAgeChecker>(
std::chrono::seconds(5)
)
);
m_executor = oatpp::web::client::HttpRequestExecutor::createShared(m_monitor);
}
Action act() override {
return m_executor->executeAsync("GET", "/stream", oatpp::web::protocol::http::Headers({}), nullptr, nullptr)
.callbackTo(&ClientCoroutine::onResponse);
}
Action onResponse(const std::shared_ptr<oatpp::web::protocol::http::incoming::Response>& response) {
OATPP_ASSERT(response->getStatusCode() == 200);
return response->readBodyToStringAsync().callbackTo(&ClientCoroutine::onBody);
}
Action onBody(const oatpp::String& data) {
OATPP_ASSERT(data)
OATPP_LOGD("TEST", "data->size() == %d", data->size())
OATPP_ASSERT(data->size() < 60) // it should be less than 50. But we put 60 for redundancy
m_monitor->stop();
return finish();
}
};
auto executor = std::make_shared<oatpp::async::Executor>();
executor->execute<ClientCoroutine>();
executor->waitTasksFinished();
OATPP_LOGD("TEST_ASYNC_CLIENT", "task finished")
executor->stop();
OATPP_LOGD("TEST_ASYNC_CLIENT", "executor stopped")
executor->join();
OATPP_LOGD("TEST_ASYNC_CLIENT", "done")
}
}
void ConnectionMonitorTest::onRun() {
auto connectionProvider = oatpp::network::tcp::server::ConnectionProvider::createShared(
{"localhost", 8000});
auto monitor = std::make_shared<oatpp::network::monitor::ConnectionMonitor>(connectionProvider);
monitor->addMetricsChecker(
std::make_shared<oatpp::network::monitor::ConnectionMaxAgeChecker>(
std::chrono::seconds(10)
)
);
{
OATPP_LOGD(TAG, "run simple API test")
auto server = runServer(monitor);
runClient();
server->stop();
std::this_thread::sleep_for(std::chrono::seconds(5));
}
{
OATPP_LOGD(TAG, "run Async API test")
auto server = runAsyncServer(monitor);
runClient();
server->stop();
std::this_thread::sleep_for(std::chrono::seconds(5));
}
{
OATPP_LOGD(TAG, "run Async Client test")
auto server = runServer(monitor);
runAsyncClient();
server->stop();
std::this_thread::sleep_for(std::chrono::seconds(5));
}
monitor->stop();
}
}}}}

View File

@ -0,0 +1,43 @@
/***************************************************************************
*
* Project _____ __ ____ _ _
* ( _ ) /__\ (_ _)_| |_ _| |_
* )(_)( /(__)\ )( (_ _)(_ _)
* (_____)(__)(__)(__) |_| |_|
*
*
* Copyright 2018-present, Leonid Stryzhevskyi <lganzzzo@gmail.com>
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
***************************************************************************/
#ifndef oatpp_test_network_monitor_ConnectionMonitorTest_hpp
#define oatpp_test_network_monitor_ConnectionMonitorTest_hpp
#include "oatpp-test/UnitTest.hpp"
namespace oatpp { namespace test { namespace network { namespace monitor {
class ConnectionMonitorTest : public UnitTest {
public:
ConnectionMonitorTest():UnitTest("TEST[network::monitor::ConnectionMonitorTest]"){}
void onRun() override;
};
}}}}
#endif // oatpp_test_network_monitor_ConnectionMonitorTest_hpp