ConnectionMonitor: introduce MetricAnalyser.

This commit is contained in:
lganzzzo 2021-09-22 02:22:03 +03:00
parent c1da819763
commit 964a68253e
2 changed files with 51 additions and 9 deletions

View File

@ -109,19 +109,31 @@ void ConnectionMonitor::Monitor::monitorTask(std::shared_ptr<Monitor> monitor) {
while(monitor->m_running) {
{
std::lock_guard<std::mutex> lock(monitor->m_connectionsMutex);
auto currMicroTime = oatpp::base::Environment::getMicroTickCount();
for(auto& pair : monitor->m_connections) {
auto connection = pair.second.lock();
std::lock_guard<std::mutex> dataLock(connection->m_statsMutex);
// TODO - check data
std::lock_guard<std::mutex> analysersLock(monitor->m_analysersMutex);
for(auto& a : monitor->m_analysers) {
bool res = a->analyse(connection->m_stats, currMicroTime);
if(!res) {
connection->invalidate();
break;
}
}
}
}
std::this_thread::sleep_for(std::chrono::milliseconds(500));
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
}
}
@ -153,7 +165,7 @@ void ConnectionMonitor::Monitor::addConnection(v_uint64 id, const std::weak_ptr<
void ConnectionMonitor::Monitor::freeConnectionStats(ConnectionStats& stats) {
std::lock_guard<std::mutex> lock(m_statCollectorsMutex);
std::lock_guard<std::mutex> lock(m_analysersMutex);
for(auto& metric : stats.metricData) {
auto it = m_statCollectors.find(metric.first);
@ -173,15 +185,27 @@ void ConnectionMonitor::Monitor::removeConnection(v_uint64 id) {
}
void ConnectionMonitor::Monitor::addStatCollector(const std::shared_ptr<StatCollector>& collector) {
std::lock_guard<std::mutex> lock(m_statCollectorsMutex);
std::lock_guard<std::mutex> lock(m_analysersMutex);
m_statCollectors.insert({collector->metricName(), collector});
}
void ConnectionMonitor::Monitor::removeStatCollector(const oatpp::String& metricName) {
std::lock_guard<std::mutex> lock(m_statCollectorsMutex);
std::lock_guard<std::mutex> lock(m_analysersMutex);
m_statCollectors.erase(metricName);
}
void ConnectionMonitor::Monitor::addAnalyser(const std::shared_ptr<MetricAnalyser>& analyser) {
std::lock_guard<std::mutex> lock(m_analysersMutex);
m_analysers.push_back(analyser);
auto metrics = analyser->getMetricsList();
for(auto& m : metrics) {
auto it = m_statCollectors.find(m);
if(it == m_statCollectors.end()) {
m_statCollectors.insert({m, analyser->createStatCollector(m)});
}
}
}
void ConnectionMonitor::Monitor::onConnectionRead(ConnectionStats& stats, v_io_size readResult) {
v_int64 currTimestamp = base::Environment::getMicroTickCount();
@ -193,7 +217,7 @@ void ConnectionMonitor::Monitor::onConnectionRead(ConnectionStats& stats, v_io_s
}
{
std::lock_guard<std::mutex> lock(m_statCollectorsMutex);
std::lock_guard<std::mutex> lock(m_analysersMutex);
for(auto& pair : m_statCollectors) {
pair.second->onRead(createOrGetMetricData(stats, pair.second), readResult, currTimestamp);
}
@ -212,7 +236,7 @@ void ConnectionMonitor::Monitor::onConnectionWrite(ConnectionStats& stats, v_io_
}
{
std::lock_guard<std::mutex> lock(m_statCollectorsMutex);
std::lock_guard<std::mutex> lock(m_analysersMutex);
for(auto& pair : m_statCollectors) {
pair.second->onWrite(createOrGetMetricData(stats, pair.second), writeResult, currTimestamp);
}

View File

@ -56,6 +56,8 @@ public:
class StatCollector {
public:
virtual ~StatCollector() = default;
virtual oatpp::String metricName() = 0;
virtual void* createMetricData() = 0;
virtual void deleteMetricData(void* metricData) = 0;
@ -63,6 +65,19 @@ public:
virtual void onWrite(void* metricData, v_io_size writeResult, v_int64 timestamp) = 0;
};
public:
class MetricAnalyser {
public:
virtual ~MetricAnalyser() = default;
virtual std::vector<oatpp::String> getMetricsList() = 0;
virtual std::shared_ptr<StatCollector> createStatCollector(const oatpp::String& metricName) = 0;
virtual bool analyse(const ConnectionStats& stats, v_int64 currMicroTime) = 0;
};
private:
class Monitor; // FWD
@ -109,13 +124,14 @@ private:
std::mutex m_connectionsMutex;
std::unordered_map<v_uint64, std::weak_ptr<ConnectionProxy>> m_connections;
std::mutex m_statCollectorsMutex;
std::mutex m_analysersMutex;
std::vector<std::shared_ptr<MetricAnalyser>> m_analysers;
std::unordered_map<oatpp::String, std::shared_ptr<StatCollector>> m_statCollectors;
private:
static void monitorTask(std::shared_ptr<Monitor> monitor);
private:
void* createOrGetMetricData(ConnectionStats& stats, const std::shared_ptr<StatCollector>& collector);
static void* createOrGetMetricData(ConnectionStats& stats, const std::shared_ptr<StatCollector>& collector);
public:
static std::shared_ptr<Monitor> createShared();
@ -127,6 +143,8 @@ private:
void addStatCollector(const std::shared_ptr<StatCollector>& collector);
void removeStatCollector(const oatpp::String& metricName);
void addAnalyser(const std::shared_ptr<MetricAnalyser>& analyser);
void onConnectionRead(ConnectionStats& stats, v_io_size readResult);
void onConnectionWrite(ConnectionStats& stats, v_io_size writeResult);