network: introduce ConnectionMonitor.

This commit is contained in:
lganzzzo 2021-09-07 02:17:50 +03:00
parent eb666070ab
commit 1230a81753
2 changed files with 255 additions and 12 deletions

View File

@ -32,22 +32,47 @@ namespace oatpp { namespace network {
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// ConnectionMonitor::ConnectionProxy
ConnectionMonitor::ConnectionProxy::ConnectionProxy(const std::shared_ptr<ConnectionProvider>& connectionProvider,
ConnectionMonitor::ConnectionProxy::ConnectionProxy(const std::shared_ptr<Monitor>& monitor,
const std::shared_ptr<ConnectionProvider>& connectionProvider,
const std::shared_ptr<data::stream::IOStream>& connection)
: m_connectionProvider(connectionProvider)
: m_monitor(monitor)
, m_connectionProvider(connectionProvider)
, m_connection(connection)
{}
{
m_stats.timestampCreated = base::Environment::getMicroTickCount();
}
ConnectionMonitor::ConnectionProxy::~ConnectionProxy() {
std::lock_guard<std::mutex> lock(m_statsMutex);
m_monitor->freeConnectionStats(m_stats);
if(m_stats.metricData.size() > 0) {
for(auto& pair : m_stats.metricData) {
OATPP_LOGE("[oatpp::network::ConnectionMonitor::ConnectionProxy::~ConnectionProxy()]",
"Error. Memory leak. Metric data was not deleted: Metric name - '%s'", pair.first->c_str());
}
}
m_monitor->removeConnection((v_uint64) this);
}
v_io_size ConnectionMonitor::ConnectionProxy::read(void *buffer, v_buff_size count, async::Action& action) {
return m_connection->read(buffer, count, action);
auto res = m_connection->read(buffer, count, action);
std::lock_guard<std::mutex> lock(m_statsMutex);
m_monitor->onConnectionRead(m_stats, res);
return res;
}
v_io_size ConnectionMonitor::ConnectionProxy::write(const void *data, v_buff_size count, async::Action& action) {
return m_connection->write(data, count, action);
auto res = m_connection->write(data, count, action);
std::lock_guard<std::mutex> lock(m_statsMutex);
m_monitor->onConnectionWrite(m_stats, res);
return res;
}
void ConnectionMonitor::ConnectionProxy::setInputStreamIOMode(data::stream::IOMode ioMode) {
@ -72,16 +97,155 @@ data::stream::Context& ConnectionMonitor::ConnectionProxy::getOutputStreamContex
return m_connection->getOutputStreamContext();
}
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// ConnectionMonitor
void ConnectionMonitor::ConnectionProxy::invalidate() {
m_connectionProvider->invalidate(m_connection);
}
void ConnectionMonitor::monitorTask(std::shared_ptr<ConnectionMonitor> monitor) {
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// Monitor
void ConnectionMonitor::Monitor::monitorTask(std::shared_ptr<Monitor> monitor) {
while(monitor->m_running) {
{
std::lock_guard<std::mutex> lock(monitor->m_connectionsMutex);
for(auto& pair : monitor->m_connections) {
auto connection = pair.second.lock();
std::lock_guard<std::mutex> dataLock(connection->m_statsMutex);
// TODO - check data
}
}
std::this_thread::sleep_for(std::chrono::milliseconds(500));
}
}
void* ConnectionMonitor::Monitor::createOrGetMetricData(ConnectionStats& stats, const std::shared_ptr<StatCollector>& collector) {
void* data;
auto it = stats.metricData.find(collector->metricName());
if(it == stats.metricData.end()) {
data = collector->createMetricData();
stats.metricData.insert({collector->metricName(), data});
} else {
data = it->second;
}
return data;
}
std::shared_ptr<ConnectionMonitor::Monitor> ConnectionMonitor::Monitor::createShared() {
auto monitor = std::make_shared<Monitor>();
std::thread t([monitor](){
ConnectionMonitor::Monitor::monitorTask(monitor);
});
return monitor;
}
void ConnectionMonitor::Monitor::addConnection(v_uint64 id, const std::weak_ptr<ConnectionProxy>& connection) {
std::lock_guard<std::mutex> lock(m_connectionsMutex);
m_connections.insert({id, connection});
}
void ConnectionMonitor::Monitor::freeConnectionStats(ConnectionStats& stats) {
std::lock_guard<std::mutex> lock(m_statCollectorsMutex);
for(auto& metric : stats.metricData) {
auto it = m_statCollectors.find(metric.first);
if(it != m_statCollectors.end()) {
it->second->deleteMetricData(metric.second);
} else {
OATPP_LOGE("[oatpp::network::ConnectionMonitor::Monitor::freeConnectionStats]",
"Error. Can't free Metric data. Unknown Metric: name - '%s'", it->first->c_str());
}
}
}
void ConnectionMonitor::Monitor::removeConnection(v_uint64 id) {
std::lock_guard<std::mutex> lock(m_connectionsMutex);
m_connections.erase(id);
}
void ConnectionMonitor::Monitor::addStatCollector(const std::shared_ptr<StatCollector>& collector) {
std::lock_guard<std::mutex> lock(m_statCollectorsMutex);
m_statCollectors.insert({collector->metricName(), collector});
}
void ConnectionMonitor::Monitor::removeStatCollector(const oatpp::String& metricName) {
std::lock_guard<std::mutex> lock(m_statCollectorsMutex);
m_statCollectors.erase(metricName);
}
void ConnectionMonitor::Monitor::onConnectionRead(ConnectionStats& stats, v_io_size readResult) {
v_int64 currTimestamp = base::Environment::getMicroTickCount();
if(readResult > 0) {
stats.totalRead += readResult;
stats.lastReadSize = readResult;
stats.timestampLastRead = currTimestamp;
}
{
std::lock_guard<std::mutex> lock(m_statCollectorsMutex);
for(auto& pair : m_statCollectors) {
pair.second->onRead(createOrGetMetricData(stats, pair.second), readResult, currTimestamp);
}
}
}
void ConnectionMonitor::Monitor::onConnectionWrite(ConnectionStats& stats, v_io_size writeResult) {
v_int64 currTimestamp = base::Environment::getMicroTickCount();
if(writeResult > 0) {
stats.totalWrite += writeResult;
stats.lastWriteSize = writeResult;
stats.timestampLastWrite = currTimestamp;
}
{
std::lock_guard<std::mutex> lock(m_statCollectorsMutex);
for(auto& pair : m_statCollectors) {
pair.second->onWrite(createOrGetMetricData(stats, pair.second), writeResult, currTimestamp);
}
}
}
void ConnectionMonitor::Monitor::stop() {
m_running = false;
}
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// ConnectionMonitor
ConnectionMonitor::ConnectionMonitor(const std::shared_ptr<ConnectionProvider>& connectionProvider)
: m_monitor(Monitor::createShared())
, m_connectionProvider(connectionProvider)
{
}
std::shared_ptr<data::stream::IOStream> ConnectionMonitor::get() {
auto connection = m_connectionProvider->get();
auto proxy = std::make_shared<ConnectionProxy>(m_monitor, m_connectionProvider, connection);
m_monitor->addConnection((v_uint64) proxy.get(), proxy);
return proxy;
}
void ConnectionMonitor::addStatCollector(const std::shared_ptr<StatCollector>& collector) {
m_monitor->addStatCollector(collector);
}
void ConnectionMonitor::stop() {
m_monitor->stop();
}
}}

View File

@ -28,19 +28,58 @@
#include "./ConnectionProvider.hpp"
#include "oatpp/core/data/stream/Stream.hpp"
#include "unordered_map"
namespace oatpp { namespace network {
class ConnectionMonitor {
public:
struct ConnectionStats {
v_int64 timestampCreated = 0;
v_io_size totalRead = 0;
v_io_size totalWrite = 0;
v_int64 timestampLastRead = 0;
v_int64 timestampLastWrite = 0;
v_io_size lastReadSize = 0;
v_io_size lastWriteSize = 0;
std::unordered_map<oatpp::String, void*> metricData;
};
public:
class StatCollector {
public:
virtual oatpp::String metricName() = 0;
virtual void* createMetricData() = 0;
virtual void deleteMetricData(void* metricData) = 0;
virtual void onRead(void* metricData, v_io_size readResult, v_int64 timestamp) = 0;
virtual void onWrite(void* metricData, v_io_size writeResult, v_int64 timestamp) = 0;
};
private:
class Monitor; // FWD
class ConnectionProxy : public data::stream::IOStream {
friend Monitor;
private:
std::shared_ptr<Monitor> m_monitor;
/* provider which created this connection */
std::shared_ptr<ConnectionProvider> m_connectionProvider;
std::shared_ptr<data::stream::IOStream> m_connection;
std::mutex m_statsMutex;
ConnectionStats m_stats;
public:
ConnectionProxy(const std::shared_ptr<ConnectionProvider>& connectionProvider,
ConnectionProxy(const std::shared_ptr<Monitor>& monitor,
const std::shared_ptr<ConnectionProvider>& connectionProvider,
const std::shared_ptr<data::stream::IOStream>& connection);
~ConnectionProxy() override;
@ -56,17 +95,57 @@ private:
data::stream::IOMode getOutputStreamIOMode() override;
data::stream::Context& getOutputStreamContext() override;
void invalidate();
};
private:
static void monitorTask(std::shared_ptr<ConnectionMonitor> monitor);
class Monitor {
private:
std::atomic<bool> m_running {true};
std::mutex m_connectionsMutex;
std::unordered_map<v_uint64, std::weak_ptr<ConnectionProxy>> m_connections;
std::mutex m_statCollectorsMutex;
std::unordered_map<oatpp::String, std::shared_ptr<StatCollector>> m_statCollectors;
private:
static void monitorTask(std::shared_ptr<Monitor> monitor);
private:
void* createOrGetMetricData(ConnectionStats& stats, const std::shared_ptr<StatCollector>& collector);
public:
static std::shared_ptr<Monitor> createShared();
void addConnection(v_uint64 id, const std::weak_ptr<ConnectionProxy>& connection);
void freeConnectionStats(ConnectionStats& stats);
void removeConnection(v_uint64 id);
void addStatCollector(const std::shared_ptr<StatCollector>& collector);
void removeStatCollector(const oatpp::String& metricName);
void onConnectionRead(ConnectionStats& stats, v_io_size readResult);
void onConnectionWrite(ConnectionStats& stats, v_io_size writeResult);
void stop();
};
private:
std::atomic<bool> m_running {true};
public:
std::shared_ptr<Monitor> m_monitor;
std::shared_ptr<ConnectionProvider> m_connectionProvider;
protected:
ConnectionMonitor(const std::shared_ptr<ConnectionProvider>& connectionProvider);
std::shared_ptr<data::stream::IOStream> get();
void addStatCollector(const std::shared_ptr<StatCollector>& collector);
void stop();
};