ConnectionMonitor: working POC

This commit is contained in:
lganzzzo 2021-09-23 01:24:52 +03:00
parent 0ed9ce14dc
commit 2ffcf9cc20
5 changed files with 128 additions and 10 deletions

View File

@ -271,11 +271,48 @@ ConnectionMonitor::ConnectionMonitor(const std::shared_ptr<ConnectionProvider>&
std::shared_ptr<data::stream::IOStream> ConnectionMonitor::get() {
auto connection = m_connectionProvider->get();
if(!connection) {
return nullptr;
}
auto proxy = std::make_shared<ConnectionProxy>(m_monitor, m_connectionProvider, connection);
m_monitor->addConnection((v_uint64) proxy.get(), proxy);
return proxy;
}
async::CoroutineStarterForResult<const std::shared_ptr<data::stream::IOStream>&>
ConnectionMonitor::getAsync() {
class GetConnectionCoroutine : public async::CoroutineWithResult<GetConnectionCoroutine, const std::shared_ptr<data::stream::IOStream>&> {
private:
std::shared_ptr<Monitor> m_monitor;
std::shared_ptr<ConnectionProvider> m_connectionProvider;
public:
GetConnectionCoroutine(const std::shared_ptr<Monitor>& monitor,
const std::shared_ptr<ConnectionProvider>& connectionProvider)
: m_monitor(monitor)
, m_connectionProvider(connectionProvider)
{}
Action act() override {
return m_connectionProvider->getAsync().callbackTo(&GetConnectionCoroutine::onConnection);
}
Action onConnection(const std::shared_ptr<data::stream::IOStream>& connection) {
if(!connection) {
return _return(nullptr);
}
auto proxy = std::make_shared<ConnectionProxy>(m_monitor, m_connectionProvider, connection);
m_monitor->addConnection((v_uint64) proxy.get(), proxy);
return _return(proxy);
}
};
return GetConnectionCoroutine::startForResult(m_monitor, m_connectionProvider);
}
void ConnectionMonitor::addStatCollector(const std::shared_ptr<StatCollector>& collector) {
m_monitor->addStatCollector(collector);
}
@ -284,6 +321,11 @@ void ConnectionMonitor::addAnalyser(const std::shared_ptr<MetricAnalyser>& analy
m_monitor->addAnalyser(analyser);
}
void ConnectionMonitor::invalidate(const std::shared_ptr<data::stream::IOStream>& connection) {
auto proxy = std::static_pointer_cast<ConnectionProxy>(connection);
proxy->invalidate();
}
void ConnectionMonitor::stop() {
m_monitor->stop();
}

View File

@ -33,7 +33,7 @@
namespace oatpp { namespace network {
class ConnectionMonitor {
class ConnectionMonitor : public ConnectionProvider {
public:
struct ConnectionStats {
@ -163,13 +163,34 @@ public:
ConnectionMonitor(const std::shared_ptr<ConnectionProvider>& connectionProvider);
std::shared_ptr<data::stream::IOStream> get();
std::shared_ptr<data::stream::IOStream> get() override;
async::CoroutineStarterForResult<const std::shared_ptr<data::stream::IOStream>&> getAsync() override;
void addStatCollector(const std::shared_ptr<StatCollector>& collector);
void addAnalyser(const std::shared_ptr<MetricAnalyser>& analyser);
void stop();
void invalidate(const std::shared_ptr<data::stream::IOStream>& connection) override;
void stop() override;
};
class ConnectionMaxAgeAnalyser : public ConnectionMonitor::MetricAnalyser {
public:
std::vector<oatpp::String> getMetricsList() {
return {};
}
std::shared_ptr<ConnectionMonitor::StatCollector> createStatCollector(const oatpp::String& metricName) {
return nullptr;
}
bool analyse(const ConnectionMonitor::ConnectionStats& stats, v_int64 currMicroTime) {
return currMicroTime - stats.timestampCreated < 1000 * 1000 * 20;
}
};

View File

@ -36,7 +36,7 @@ const v_int32 Server::STATUS_RUNNING = 2;
const v_int32 Server::STATUS_STOPPING = 3;
const v_int32 Server::STATUS_DONE = 4;
Server::Server(const std::shared_ptr<ServerConnectionProvider> &connectionProvider,
Server::Server(const std::shared_ptr<ConnectionProvider> &connectionProvider,
const std::shared_ptr<ConnectionHandler> &connectionHandler)
: m_status(STATUS_CREATED)
, m_connectionProvider(connectionProvider)
@ -95,7 +95,6 @@ void Server::mainLoop(Server *instance) {
}
}
instance->setStatus(STATUS_DONE);
}

View File

@ -60,7 +60,7 @@ private:
std::mutex m_mutex;
oatpp::concurrency::SpinLock m_spinlock;
std::shared_ptr<ServerConnectionProvider> m_connectionProvider;
std::shared_ptr<ConnectionProvider> m_connectionProvider;
std::shared_ptr<ConnectionHandler> m_connectionHandler;
bool m_threaded;
@ -72,7 +72,7 @@ public:
* @param connectionProvider - &id:oatpp::network::ConnectionProvider;.
* @param connectionHandler - &id:oatpp::network::ConnectionHandler;.
*/
Server(const std::shared_ptr<ServerConnectionProvider>& connectionProvider,
Server(const std::shared_ptr<ConnectionProvider>& connectionProvider,
const std::shared_ptr<ConnectionHandler>& connectionHandler);
virtual ~Server();

View File

@ -24,20 +24,76 @@
#include "ConnectionMonitorTest.hpp"
#include "oatpp/web/server/HttpConnectionHandler.hpp"
#include "oatpp/web/protocol/http/outgoing/StreamingBody.hpp"
#include "oatpp/network/ConnectionMonitor.hpp"
#include "oatpp/network/Server.hpp"
#include "oatpp/network/tcp/client/ConnectionProvider.hpp"
#include "oatpp/network/tcp/server/ConnectionProvider.hpp"
#include <thread>
namespace oatpp { namespace test { namespace network {
namespace {
class StreamingHandler : public oatpp::web::server::HttpRequestHandler {
public:
class ReadCallback : public oatpp::data::stream::ReadCallback {
public:
v_io_size read(void *buffer, v_buff_size count, async::Action &action) override {
OATPP_LOGE("TEST", "read(...)")
std::this_thread::sleep_for(std::chrono::milliseconds(500));
char* data = (char*) buffer;
data[0] = 'A';
return 1;
}
};
std::shared_ptr<OutgoingResponse> handle(const std::shared_ptr<IncomingRequest>& request) override {
auto body = std::make_shared<oatpp::web::protocol::http::outgoing::StreamingBody>
(std::make_shared<ReadCallback>());
return OutgoingResponse::createShared(Status::CODE_200, body);
}
};
std::shared_ptr<oatpp::network::Server> runServer(const std::shared_ptr<oatpp::network::ConnectionMonitor>& monitor) {
auto router = oatpp::web::server::HttpRouter::createShared();
router->route("GET", "/stream", std::make_shared<StreamingHandler>());
auto connectionHandler = oatpp::web::server::HttpConnectionHandler::createShared(router);
auto server = std::make_shared<oatpp::network::Server>(monitor, connectionHandler);
std::thread t([server]{
server->run();
});
t.detach();
return server;
}
}
void ConnectionMonitorTest::onRun() {
auto connectionProvider = oatpp::network::tcp::client::ConnectionProvider::createShared({"oatpp.io", 80});
oatpp::network::ConnectionMonitor monitor(connectionProvider);
auto connectionProvider = oatpp::network::tcp::server::ConnectionProvider::createShared({"0.0.0.0", 8000, oatpp::network::Address::IP_4});
auto monitor = std::make_shared<oatpp::network::ConnectionMonitor>(connectionProvider);
monitor->addAnalyser(std::make_shared<oatpp::network::ConnectionMaxAgeAnalyser>());
monitor.stop();
auto server = runServer(monitor);
std::this_thread::sleep_for(std::chrono::minutes (10));
monitor->stop();
}