ConnectionMonitor: move ConnectionMonitor to network::monitor namespace.

This commit is contained in:
lganzzzo 2021-09-25 01:05:51 +03:00
parent 2ffcf9cc20
commit d47ae5c20a
11 changed files with 270 additions and 117 deletions

View File

@ -138,6 +138,12 @@ add_library(oatpp
oatpp/encoding/Hex.hpp
oatpp/encoding/Unicode.cpp
oatpp/encoding/Unicode.hpp
oatpp/network/monitor/ConnectionMaxAgeChecker.cpp
oatpp/network/monitor/ConnectionMaxAgeChecker.hpp
oatpp/network/monitor/ConnectionMonitor.cpp
oatpp/network/monitor/ConnectionMonitor.hpp
oatpp/network/monitor/MetricsChecker.hpp
oatpp/network/monitor/StatCollector.hpp
oatpp/network/tcp/client/ConnectionProvider.cpp
oatpp/network/tcp/client/ConnectionProvider.hpp
oatpp/network/tcp/server/ConnectionProvider.cpp
@ -157,8 +163,6 @@ add_library(oatpp
oatpp/network/Address.cpp
oatpp/network/Address.hpp
oatpp/network/ConnectionHandler.hpp
oatpp/network/ConnectionMonitor.cpp
oatpp/network/ConnectionMonitor.hpp
oatpp/network/ConnectionPool.cpp
oatpp/network/ConnectionPool.hpp
oatpp/network/ConnectionProvider.cpp

View File

@ -0,0 +1,46 @@
/***************************************************************************
*
* Project _____ __ ____ _ _
* ( _ ) /__\ (_ _)_| |_ _| |_
* )(_)( /(__)\ )( (_ _)(_ _)
* (_____)(__)(__)(__) |_| |_|
*
*
* Copyright 2018-present, Leonid Stryzhevskyi <lganzzzo@gmail.com>
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
***************************************************************************/
#include "ConnectionMaxAgeChecker.hpp"
namespace oatpp { namespace network { namespace monitor {
ConnectionMaxAgeChecker::ConnectionMaxAgeChecker(const std::chrono::duration<v_int64, std::micro>& maxAge)
: m_maxAge(maxAge)
{}
std::vector<oatpp::String> ConnectionMaxAgeChecker::getMetricsList() {
return {};
}
std::shared_ptr<StatCollector> ConnectionMaxAgeChecker::createStatCollector(const oatpp::String& metricName) {
throw std::runtime_error("[oatpp::network::monitor::ConnectionMaxAgeChecker::createStatCollector()]: "
"Error. ConnectionMaxAgeChecker doesn't use any stat collectors.");
}
bool ConnectionMaxAgeChecker::check(const ConnectionStats& stats, v_int64 currMicroTime) {
return currMicroTime - stats.timestampCreated < m_maxAge.count();
}
}}}

View File

@ -0,0 +1,49 @@
/***************************************************************************
*
* Project _____ __ ____ _ _
* ( _ ) /__\ (_ _)_| |_ _| |_
* )(_)( /(__)\ )( (_ _)(_ _)
* (_____)(__)(__)(__) |_| |_|
*
*
* Copyright 2018-present, Leonid Stryzhevskyi <lganzzzo@gmail.com>
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
***************************************************************************/
#ifndef oatpp_network_monitor_ConnectionMaxAgeChecker_hpp
#define oatpp_network_monitor_ConnectionMaxAgeChecker_hpp
#include "MetricsChecker.hpp"
namespace oatpp { namespace network { namespace monitor {
class ConnectionMaxAgeChecker : public MetricsChecker {
private:
std::chrono::duration<v_int64, std::micro> m_maxAge;
public:
ConnectionMaxAgeChecker(const std::chrono::duration<v_int64, std::micro>& maxAge);
std::vector<oatpp::String> getMetricsList();
std::shared_ptr<StatCollector> createStatCollector(const oatpp::String& metricName);
bool check(const ConnectionStats& stats, v_int64 currMicroTime);
};
}}}
#endif //oatpp_network_monitor_ConnectionMaxAgeChecker_hpp

View File

@ -27,7 +27,7 @@
#include <chrono>
#include <thread>
namespace oatpp { namespace network {
namespace oatpp { namespace network { namespace monitor {
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// ConnectionMonitor::ConnectionProxy
@ -48,9 +48,9 @@ ConnectionMonitor::ConnectionProxy::~ConnectionProxy() {
m_monitor->freeConnectionStats(m_stats);
if(m_stats.metricData.size() > 0) {
if(m_stats.metricsData.size() > 0) {
for(auto& pair : m_stats.metricData) {
for(auto& pair : m_stats.metricsData) {
OATPP_LOGE("[oatpp::network::ConnectionMonitor::ConnectionProxy::~ConnectionProxy()]",
"Error. Memory leak. Metric data was not deleted: Metric name - '%s'", pair.first->c_str());
}
@ -118,10 +118,10 @@ void ConnectionMonitor::Monitor::monitorTask(std::shared_ptr<Monitor> monitor) {
auto connection = pair.second.lock();
std::lock_guard<std::mutex> dataLock(connection->m_statsMutex);
std::lock_guard<std::mutex> analysersLock(monitor->m_analysersMutex);
std::lock_guard<std::mutex> analysersLock(monitor->m_checkMutex);
for(auto& a : monitor->m_analysers) {
bool res = a->analyse(connection->m_stats, currMicroTime);
for(auto& a : monitor->m_metricsCheckers) {
bool res = a->check(connection->m_stats, currMicroTime);
if(!res) {
connection->invalidate();
break;
@ -147,10 +147,10 @@ void ConnectionMonitor::Monitor::monitorTask(std::shared_ptr<Monitor> monitor) {
void* ConnectionMonitor::Monitor::createOrGetMetricData(ConnectionStats& stats, const std::shared_ptr<StatCollector>& collector) {
void* data;
auto it = stats.metricData.find(collector->metricName());
if(it == stats.metricData.end()) {
auto it = stats.metricsData.find(collector->metricName());
if(it == stats.metricsData.end()) {
data = collector->createMetricData();
stats.metricData.insert({collector->metricName(), data});
stats.metricsData.insert({collector->metricName(), data});
} else {
data = it->second;
}
@ -173,9 +173,9 @@ void ConnectionMonitor::Monitor::addConnection(v_uint64 id, const std::weak_ptr<
void ConnectionMonitor::Monitor::freeConnectionStats(ConnectionStats& stats) {
std::lock_guard<std::mutex> lock(m_analysersMutex);
std::lock_guard<std::mutex> lock(m_checkMutex);
for(auto& metric : stats.metricData) {
for(auto& metric : stats.metricsData) {
auto it = m_statCollectors.find(metric.first);
if(it != m_statCollectors.end()) {
it->second->deleteMetricData(metric.second);
@ -193,23 +193,23 @@ void ConnectionMonitor::Monitor::removeConnection(v_uint64 id) {
}
void ConnectionMonitor::Monitor::addStatCollector(const std::shared_ptr<StatCollector>& collector) {
std::lock_guard<std::mutex> lock(m_analysersMutex);
std::lock_guard<std::mutex> lock(m_checkMutex);
m_statCollectors.insert({collector->metricName(), collector});
}
void ConnectionMonitor::Monitor::removeStatCollector(const oatpp::String& metricName) {
std::lock_guard<std::mutex> lock(m_analysersMutex);
std::lock_guard<std::mutex> lock(m_checkMutex);
m_statCollectors.erase(metricName);
}
void ConnectionMonitor::Monitor::addAnalyser(const std::shared_ptr<MetricAnalyser>& analyser) {
std::lock_guard<std::mutex> lock(m_analysersMutex);
m_analysers.push_back(analyser);
auto metrics = analyser->getMetricsList();
void ConnectionMonitor::Monitor::addMetricsChecker(const std::shared_ptr<MetricsChecker>& checker) {
std::lock_guard<std::mutex> lock(m_checkMutex);
m_metricsCheckers.push_back(checker);
auto metrics = checker->getMetricsList();
for(auto& m : metrics) {
auto it = m_statCollectors.find(m);
if(it == m_statCollectors.end()) {
m_statCollectors.insert({m, analyser->createStatCollector(m)});
m_statCollectors.insert({m, checker->createStatCollector(m)});
}
}
}
@ -225,7 +225,7 @@ void ConnectionMonitor::Monitor::onConnectionRead(ConnectionStats& stats, v_io_s
}
{
std::lock_guard<std::mutex> lock(m_analysersMutex);
std::lock_guard<std::mutex> lock(m_checkMutex);
for(auto& pair : m_statCollectors) {
pair.second->onRead(createOrGetMetricData(stats, pair.second), readResult, currTimestamp);
}
@ -244,7 +244,7 @@ void ConnectionMonitor::Monitor::onConnectionWrite(ConnectionStats& stats, v_io_
}
{
std::lock_guard<std::mutex> lock(m_analysersMutex);
std::lock_guard<std::mutex> lock(m_checkMutex);
for(auto& pair : m_statCollectors) {
pair.second->onWrite(createOrGetMetricData(stats, pair.second), writeResult, currTimestamp);
}
@ -317,8 +317,8 @@ void ConnectionMonitor::addStatCollector(const std::shared_ptr<StatCollector>& c
m_monitor->addStatCollector(collector);
}
void ConnectionMonitor::addAnalyser(const std::shared_ptr<MetricAnalyser>& analyser) {
m_monitor->addAnalyser(analyser);
void ConnectionMonitor::addMetricsChecker(const std::shared_ptr<MetricsChecker>& checker) {
m_monitor->addMetricsChecker(checker);
}
void ConnectionMonitor::invalidate(const std::shared_ptr<data::stream::IOStream>& connection) {
@ -330,4 +330,4 @@ void ConnectionMonitor::stop() {
m_monitor->stop();
}
}}
}}}

View File

@ -22,63 +22,20 @@
*
***************************************************************************/
#ifndef oatpp_network_ConnectionMonitor_hpp
#define oatpp_network_ConnectionMonitor_hpp
#ifndef oatpp_network_monitor_ConnectionMonitor_hpp
#define oatpp_network_monitor_ConnectionMonitor_hpp
#include "./ConnectionProvider.hpp"
#include "MetricsChecker.hpp"
#include "oatpp/network/ConnectionProvider.hpp"
#include "oatpp/core/data/stream/Stream.hpp"
#include <unordered_map>
#include <condition_variable>
namespace oatpp { namespace network {
namespace oatpp { namespace network { namespace monitor {
class ConnectionMonitor : public ConnectionProvider {
public:
struct ConnectionStats {
v_int64 timestampCreated = 0;
v_io_size totalRead = 0;
v_io_size totalWrite = 0;
v_int64 timestampLastRead = 0;
v_int64 timestampLastWrite = 0;
v_io_size lastReadSize = 0;
v_io_size lastWriteSize = 0;
std::unordered_map<oatpp::String, void*> metricData;
};
public:
class StatCollector : public oatpp::base::Countable {
public:
virtual ~StatCollector() = default;
virtual oatpp::String metricName() = 0;
virtual void* createMetricData() = 0;
virtual void deleteMetricData(void* metricData) = 0;
virtual void onRead(void* metricData, v_io_size readResult, v_int64 timestamp) = 0;
virtual void onWrite(void* metricData, v_io_size writeResult, v_int64 timestamp) = 0;
};
public:
class MetricAnalyser : public oatpp::base::Countable {
public:
virtual ~MetricAnalyser() = default;
virtual std::vector<oatpp::String> getMetricsList() = 0;
virtual std::shared_ptr<StatCollector> createStatCollector(const oatpp::String& metricName) = 0;
virtual bool analyse(const ConnectionStats& stats, v_int64 currMicroTime) = 0;
};
private:
class Monitor; // FWD
@ -128,8 +85,8 @@ private:
std::mutex m_connectionsMutex;
std::unordered_map<v_uint64, std::weak_ptr<ConnectionProxy>> m_connections;
std::mutex m_analysersMutex;
std::vector<std::shared_ptr<MetricAnalyser>> m_analysers;
std::mutex m_checkMutex;
std::vector<std::shared_ptr<MetricsChecker>> m_metricsCheckers;
std::unordered_map<oatpp::String, std::shared_ptr<StatCollector>> m_statCollectors;
private:
@ -147,7 +104,7 @@ private:
void addStatCollector(const std::shared_ptr<StatCollector>& collector);
void removeStatCollector(const oatpp::String& metricName);
void addAnalyser(const std::shared_ptr<MetricAnalyser>& analyser);
void addMetricsChecker(const std::shared_ptr<MetricsChecker>& checker);
void onConnectionRead(ConnectionStats& stats, v_io_size readResult);
void onConnectionWrite(ConnectionStats& stats, v_io_size writeResult);
@ -169,7 +126,7 @@ public:
void addStatCollector(const std::shared_ptr<StatCollector>& collector);
void addAnalyser(const std::shared_ptr<MetricAnalyser>& analyser);
void addMetricsChecker(const std::shared_ptr<MetricsChecker>& checker);
void invalidate(const std::shared_ptr<data::stream::IOStream>& connection) override;
@ -177,23 +134,6 @@ public:
};
class ConnectionMaxAgeAnalyser : public ConnectionMonitor::MetricAnalyser {
public:
}}}
std::vector<oatpp::String> getMetricsList() {
return {};
}
std::shared_ptr<ConnectionMonitor::StatCollector> createStatCollector(const oatpp::String& metricName) {
return nullptr;
}
bool analyse(const ConnectionMonitor::ConnectionStats& stats, v_int64 currMicroTime) {
return currMicroTime - stats.timestampCreated < 1000 * 1000 * 20;
}
};
}}
#endif //oatpp_network_ConnectionMonitor_hpp
#endif //oatpp_network_monitor_ConnectionMonitor_hpp

View File

@ -0,0 +1,45 @@
/***************************************************************************
*
* Project _____ __ ____ _ _
* ( _ ) /__\ (_ _)_| |_ _| |_
* )(_)( /(__)\ )( (_ _)(_ _)
* (_____)(__)(__)(__) |_| |_|
*
*
* Copyright 2018-present, Leonid Stryzhevskyi <lganzzzo@gmail.com>
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
***************************************************************************/
#ifndef oatpp_network_monitor_MetricsChecker_hpp
#define oatpp_network_monitor_MetricsChecker_hpp
#include "StatCollector.hpp"
namespace oatpp { namespace network { namespace monitor {
class MetricsChecker : public oatpp::base::Countable {
public:
virtual ~MetricsChecker() = default;
virtual std::vector<oatpp::String> getMetricsList() = 0;
virtual std::shared_ptr<StatCollector> createStatCollector(const oatpp::String& metricName) = 0;
virtual bool check(const ConnectionStats& stats, v_int64 currMicroTime) = 0;
};
}}}
#endif //oatpp_network_monitor_MetricsChecker_hpp

View File

@ -0,0 +1,63 @@
/***************************************************************************
*
* Project _____ __ ____ _ _
* ( _ ) /__\ (_ _)_| |_ _| |_
* )(_)( /(__)\ )( (_ _)(_ _)
* (_____)(__)(__)(__) |_| |_|
*
*
* Copyright 2018-present, Leonid Stryzhevskyi <lganzzzo@gmail.com>
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
***************************************************************************/
#ifndef oatpp_network_monitor_StatCollector_hpp
#define oatpp_network_monitor_StatCollector_hpp
#include "oatpp/core/Types.hpp"
#include "oatpp/core/IODefinitions.hpp"
namespace oatpp { namespace network { namespace monitor {
struct ConnectionStats {
v_int64 timestampCreated = 0;
v_io_size totalRead = 0;
v_io_size totalWrite = 0;
v_int64 timestampLastRead = 0;
v_int64 timestampLastWrite = 0;
v_io_size lastReadSize = 0;
v_io_size lastWriteSize = 0;
std::unordered_map<oatpp::String, void*> metricsData;
};
class StatCollector : public oatpp::base::Countable {
public:
virtual ~StatCollector() = default;
virtual oatpp::String metricName() = 0;
virtual void* createMetricData() = 0;
virtual void deleteMetricData(void* metricData) = 0;
virtual void onRead(void* metricData, v_io_size readResult, v_int64 timestamp) = 0;
virtual void onWrite(void* metricData, v_io_size writeResult, v_int64 timestamp) = 0;
};
}}}
#endif //oatpp_network_monitor_StatCollector_hpp

View File

@ -61,16 +61,16 @@ add_executable(oatppAllTests
oatpp/encoding/Base64Test.hpp
oatpp/encoding/UnicodeTest.cpp
oatpp/encoding/UnicodeTest.hpp
oatpp/network/ConnectionMonitorTest.cpp
oatpp/network/ConnectionMonitorTest.hpp
oatpp/network/ConnectionPoolTest.cpp
oatpp/network/ConnectionPoolTest.hpp
oatpp/network/UrlTest.cpp
oatpp/network/UrlTest.hpp
oatpp/network/monitor/ConnectionMonitorTest.cpp
oatpp/network/monitor/ConnectionMonitorTest.hpp
oatpp/network/virtual_/InterfaceTest.cpp
oatpp/network/virtual_/InterfaceTest.hpp
oatpp/network/virtual_/PipeTest.cpp
oatpp/network/virtual_/PipeTest.hpp
oatpp/network/ConnectionPoolTest.cpp
oatpp/network/ConnectionPoolTest.hpp
oatpp/network/UrlTest.cpp
oatpp/network/UrlTest.hpp
oatpp/parser/json/mapping/DTOMapperPerfTest.cpp
oatpp/parser/json/mapping/DTOMapperPerfTest.hpp
oatpp/parser/json/mapping/DTOMapperTest.cpp

View File

@ -15,7 +15,7 @@
#include "oatpp/network/virtual_/InterfaceTest.hpp"
#include "oatpp/network/UrlTest.hpp"
#include "oatpp/network/ConnectionPoolTest.hpp"
#include "oatpp/network/ConnectionMonitorTest.hpp"
#include "oatpp/network/monitor/ConnectionMonitorTest.hpp"
#include "oatpp/parser/json/mapping/DeserializerTest.hpp"
#include "oatpp/parser/json/mapping/DTOMapperPerfTest.hpp"
@ -128,7 +128,7 @@ void runTests() {
OATPP_RUN_TEST(oatpp::test::network::UrlTest);
OATPP_RUN_TEST(oatpp::test::network::ConnectionPoolTest);
*/
OATPP_RUN_TEST(oatpp::test::network::ConnectionMonitorTest);
OATPP_RUN_TEST(oatpp::test::network::monitor::ConnectionMonitorTest);
/*
OATPP_RUN_TEST(oatpp::test::network::virtual_::PipeTest);
OATPP_RUN_TEST(oatpp::test::network::virtual_::InterfaceTest);

View File

@ -27,14 +27,16 @@
#include "oatpp/web/server/HttpConnectionHandler.hpp"
#include "oatpp/web/protocol/http/outgoing/StreamingBody.hpp"
#include "oatpp/network/ConnectionMonitor.hpp"
#include "oatpp/network/monitor/ConnectionMonitor.hpp"
#include "oatpp/network/monitor/ConnectionMaxAgeChecker.hpp"
#include "oatpp/network/Server.hpp"
#include "oatpp/network/tcp/client/ConnectionProvider.hpp"
#include "oatpp/network/tcp/server/ConnectionProvider.hpp"
#include <thread>
namespace oatpp { namespace test { namespace network {
namespace oatpp { namespace test { namespace network { namespace monitor {
namespace {
@ -62,7 +64,7 @@ public:
};
std::shared_ptr<oatpp::network::Server> runServer(const std::shared_ptr<oatpp::network::ConnectionMonitor>& monitor) {
std::shared_ptr<oatpp::network::Server> runServer(const std::shared_ptr<oatpp::network::monitor::ConnectionMonitor>& monitor) {
auto router = oatpp::web::server::HttpRouter::createShared();
router->route("GET", "/stream", std::make_shared<StreamingHandler>());
@ -85,9 +87,13 @@ std::shared_ptr<oatpp::network::Server> runServer(const std::shared_ptr<oatpp::n
void ConnectionMonitorTest::onRun() {
auto connectionProvider = oatpp::network::tcp::server::ConnectionProvider::createShared({"0.0.0.0", 8000, oatpp::network::Address::IP_4});
auto monitor = std::make_shared<oatpp::network::ConnectionMonitor>(connectionProvider);
auto monitor = std::make_shared<oatpp::network::monitor::ConnectionMonitor>(connectionProvider);
monitor->addAnalyser(std::make_shared<oatpp::network::ConnectionMaxAgeAnalyser>());
monitor->addMetricsChecker(
std::make_shared<oatpp::network::monitor::ConnectionMaxAgeChecker>(
std::chrono::seconds(10)
)
);
auto server = runServer(monitor);
@ -97,4 +103,4 @@ void ConnectionMonitorTest::onRun() {
}
}}}
}}}}

View File

@ -22,22 +22,22 @@
*
***************************************************************************/
#ifndef oatpp_test_network_ConnectionMonitorTest_hpp
#define oatpp_test_network_ConnectionMonitorTest_hpp
#ifndef oatpp_test_network_monitor_ConnectionMonitorTest_hpp
#define oatpp_test_network_monitor_ConnectionMonitorTest_hpp
#include "oatpp-test/UnitTest.hpp"
namespace oatpp { namespace test { namespace network {
namespace oatpp { namespace test { namespace network { namespace monitor {
class ConnectionMonitorTest : public UnitTest {
public:
ConnectionMonitorTest():UnitTest("TEST[network::ConnectionMonitorTest]"){}
ConnectionMonitorTest():UnitTest("TEST[network::monitor::ConnectionMonitorTest]"){}
void onRun() override;
};
}}}
}}}}
#endif // oatpp_test_network_ConnectionMonitorTest_hpp
#endif // oatpp_test_network_monitor_ConnectionMonitorTest_hpp