mirror of
https://github.com/oatpp/oatpp.git
synced 2025-04-18 19:00:23 +08:00
ConnectionMonitor: Fix connection proxy.
This commit is contained in:
parent
809b3fb3f1
commit
95a7fe0067
@ -50,6 +50,8 @@ ConnectionMonitor::ConnectionProxy::ConnectionProxy(const std::shared_ptr<Monito
|
||||
|
||||
ConnectionMonitor::ConnectionProxy::~ConnectionProxy() {
|
||||
|
||||
m_monitor->removeConnection((v_uint64) this);
|
||||
|
||||
std::lock_guard<std::mutex> lock(m_statsMutex);
|
||||
|
||||
m_monitor->freeConnectionStats(m_stats);
|
||||
@ -63,8 +65,6 @@ ConnectionMonitor::ConnectionProxy::~ConnectionProxy() {
|
||||
|
||||
}
|
||||
|
||||
m_monitor->removeConnection((v_uint64) this);
|
||||
|
||||
}
|
||||
|
||||
v_io_size ConnectionMonitor::ConnectionProxy::read(void *buffer, v_buff_size count, async::Action& action) {
|
||||
@ -120,9 +120,9 @@ void ConnectionMonitor::Monitor::monitorTask(std::shared_ptr<Monitor> monitor) {
|
||||
|
||||
auto currMicroTime = oatpp::base::Environment::getMicroTickCount();
|
||||
|
||||
for(auto& pair : monitor->m_connections) {
|
||||
for(auto& caddr : monitor->m_connections) {
|
||||
|
||||
auto connection = pair.second.lock();
|
||||
auto connection = (ConnectionProxy*) caddr;
|
||||
std::lock_guard<std::mutex> dataLock(connection->m_statsMutex);
|
||||
std::lock_guard<std::mutex> analysersLock(monitor->m_checkMutex);
|
||||
|
||||
@ -172,9 +172,9 @@ std::shared_ptr<ConnectionMonitor::Monitor> ConnectionMonitor::Monitor::createSh
|
||||
return monitor;
|
||||
}
|
||||
|
||||
void ConnectionMonitor::Monitor::addConnection(v_uint64 id, const std::weak_ptr<ConnectionProxy>& connection) {
|
||||
void ConnectionMonitor::Monitor::addConnection(ConnectionProxy* connection) {
|
||||
std::lock_guard<std::mutex> lock(m_connectionsMutex);
|
||||
m_connections.insert({id, connection});
|
||||
m_connections.insert((v_uint64) connection);
|
||||
}
|
||||
|
||||
void ConnectionMonitor::Monitor::freeConnectionStats(ConnectionStats& stats) {
|
||||
@ -282,7 +282,7 @@ provider::ResourceHandle<data::stream::IOStream> ConnectionMonitor::get() {
|
||||
return nullptr;
|
||||
}
|
||||
auto proxy = std::make_shared<ConnectionProxy>(m_monitor, connection);
|
||||
m_monitor->addConnection((v_uint64) proxy.get(), proxy);
|
||||
m_monitor->addConnection(proxy.get());
|
||||
return provider::ResourceHandle<data::stream::IOStream>(proxy, m_invalidator);
|
||||
}
|
||||
|
||||
@ -313,7 +313,7 @@ ConnectionMonitor::getAsync() {
|
||||
return _return(nullptr);
|
||||
}
|
||||
auto proxy = std::make_shared<ConnectionProxy>(m_monitor, connection);
|
||||
m_monitor->addConnection((v_uint64) proxy.get(), proxy);
|
||||
m_monitor->addConnection(proxy.get());
|
||||
return _return(provider::ResourceHandle<data::stream::IOStream>(proxy, m_invalidator));
|
||||
}
|
||||
|
||||
|
@ -30,7 +30,7 @@
|
||||
#include "oatpp/network/ConnectionProvider.hpp"
|
||||
#include "oatpp/core/data/stream/Stream.hpp"
|
||||
|
||||
#include <unordered_map>
|
||||
#include <unordered_set>
|
||||
#include <condition_variable>
|
||||
|
||||
namespace oatpp { namespace network { namespace monitor {
|
||||
@ -93,7 +93,7 @@ private:
|
||||
bool m_stopped {false};
|
||||
|
||||
std::mutex m_connectionsMutex;
|
||||
std::unordered_map<v_uint64, std::weak_ptr<ConnectionProxy>> m_connections;
|
||||
std::unordered_set<v_uint64> m_connections;
|
||||
|
||||
std::mutex m_checkMutex;
|
||||
std::vector<std::shared_ptr<MetricsChecker>> m_metricsCheckers;
|
||||
@ -107,7 +107,7 @@ private:
|
||||
|
||||
static std::shared_ptr<Monitor> createShared();
|
||||
|
||||
void addConnection(v_uint64 id, const std::weak_ptr<ConnectionProxy>& connection);
|
||||
void addConnection(ConnectionProxy* connection);
|
||||
void freeConnectionStats(ConnectionStats& stats);
|
||||
void removeConnection(v_uint64 id);
|
||||
|
||||
|
@ -44,7 +44,6 @@ protected:
|
||||
void onTaskEnd(const provider::ResourceHandle<data::stream::IOStream>& connection) override;
|
||||
|
||||
void invalidateAllConnections();
|
||||
v_uint64 getConnectionsCount();
|
||||
|
||||
private:
|
||||
std::shared_ptr<oatpp::async::Executor> m_executor;
|
||||
@ -142,6 +141,12 @@ public:
|
||||
* Will call m_executor.stop()
|
||||
*/
|
||||
void stop() override;
|
||||
|
||||
/**
|
||||
* Get connections count.
|
||||
* @return
|
||||
*/
|
||||
v_uint64 getConnectionsCount();
|
||||
|
||||
};
|
||||
|
||||
|
@ -44,7 +44,6 @@ protected:
|
||||
void onTaskEnd(const provider::ResourceHandle<data::stream::IOStream>& connection) override;
|
||||
|
||||
void invalidateAllConnections();
|
||||
v_uint64 getConnectionsCount();
|
||||
|
||||
private:
|
||||
std::shared_ptr<HttpProcessor::Components> m_components;
|
||||
@ -118,6 +117,12 @@ public:
|
||||
* Tell all worker threads to exit when done.
|
||||
*/
|
||||
void stop() override;
|
||||
|
||||
/**
|
||||
* Get connections count.
|
||||
* @return
|
||||
*/
|
||||
v_uint64 getConnectionsCount();
|
||||
|
||||
};
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user