tests: ConnectionMonitorTest

This commit is contained in:
lganzzzo 2021-10-07 01:19:50 +03:00
parent eb1296cbb5
commit 3e97c45b71
6 changed files with 168 additions and 20 deletions

View File

@ -35,7 +35,7 @@ namespace oatpp { namespace provider {
* @tparam T - resource class.
*/
template <class T>
class Provider {
class Provider : public oatpp::base::Countable {
protected:
void setProperty(const oatpp::String& key, const oatpp::String& value) {

View File

@ -57,13 +57,13 @@ public:
/**
* No properties here. It is just a logical division
*/
class ServerConnectionProvider : public ConnectionProvider {
class ServerConnectionProvider : virtual public ConnectionProvider {
};
/**
* No properties here. It is just a logical division
*/
class ClientConnectionProvider : public ConnectionProvider {
class ClientConnectionProvider : virtual public ConnectionProvider {
};
}}

View File

@ -35,7 +35,7 @@
namespace oatpp { namespace network { namespace monitor {
class ConnectionMonitor : public ConnectionProvider {
class ConnectionMonitor : public ClientConnectionProvider, public ServerConnectionProvider {
private:
class Monitor; // FWD

View File

@ -35,7 +35,7 @@ namespace oatpp { namespace network { namespace tcp { namespace client {
/**
* Simple provider of clinet TCP connections.
*/
class ConnectionProvider : public base::Countable, public ClientConnectionProvider {
class ConnectionProvider : public ClientConnectionProvider {
protected:
network::Address m_address;
public:

View File

@ -36,7 +36,7 @@ namespace oatpp { namespace network { namespace tcp { namespace server {
/**
* Simple provider of TCP connections.
*/
class ConnectionProvider : public base::Countable, public ServerConnectionProvider {
class ConnectionProvider : public ServerConnectionProvider {
public:
/**

View File

@ -24,6 +24,8 @@
#include "ConnectionMonitorTest.hpp"
#include "oatpp/web/client/HttpRequestExecutor.hpp"
#include "oatpp/web/server/AsyncHttpConnectionHandler.hpp"
#include "oatpp/web/server/HttpConnectionHandler.hpp"
#include "oatpp/web/protocol/http/outgoing/StreamingBody.hpp"
@ -40,21 +42,21 @@ namespace oatpp { namespace test { namespace network { namespace monitor {
namespace {
class StreamingHandler : public oatpp::web::server::HttpRequestHandler {
class ReadCallback : public oatpp::data::stream::ReadCallback {
public:
class ReadCallback : public oatpp::data::stream::ReadCallback {
public:
v_io_size read(void *buffer, v_buff_size count, async::Action &action) override {
OATPP_LOGE("TEST", "read(...)")
std::this_thread::sleep_for(std::chrono::milliseconds(100));
char* data = (char*) buffer;
data[0] = 'A';
return 1;
}
v_io_size read(void *buffer, v_buff_size count, async::Action &action) override {
OATPP_LOGE("TEST", "read(...)")
std::this_thread::sleep_for(std::chrono::milliseconds(500));
char* data = (char*) buffer;
data[0] = 'A';
return 1;
}
};
};
class StreamingHandler : public oatpp::web::server::HttpRequestHandler {
public:
std::shared_ptr<OutgoingResponse> handle(const std::shared_ptr<IncomingRequest>& request) override {
auto body = std::make_shared<oatpp::web::protocol::http::outgoing::StreamingBody>
@ -64,6 +66,29 @@ public:
};
class AsyncStreamingHandler : public oatpp::web::server::HttpRequestHandler {
public:
oatpp::async::CoroutineStarterForResult<const std::shared_ptr<OutgoingResponse>&>
handleAsync(const std::shared_ptr<IncomingRequest>& request) {
class StreamCoroutine : public oatpp::async::CoroutineWithResult<StreamCoroutine, const std::shared_ptr<OutgoingResponse>&> {
public:
Action act() override {
auto body = std::make_shared<oatpp::web::protocol::http::outgoing::StreamingBody>
(std::make_shared<ReadCallback>());
return _return(OutgoingResponse::createShared(Status::CODE_200, body));
}
};
return StreamCoroutine::startForResult();
}
};
std::shared_ptr<oatpp::network::Server> runServer(const std::shared_ptr<oatpp::network::monitor::ConnectionMonitor>& monitor) {
auto router = oatpp::web::server::HttpRouter::createShared();
@ -73,8 +98,11 @@ std::shared_ptr<oatpp::network::Server> runServer(const std::shared_ptr<oatpp::n
auto server = std::make_shared<oatpp::network::Server>(monitor, connectionHandler);
std::thread t([server]{
std::thread t([server, connectionHandler]{
server->run();
OATPP_LOGD("TEST", "server stopped");
connectionHandler->stop();
OATPP_LOGD("TEST", "connectionHandler stopped");
});
t.detach();
@ -82,6 +110,106 @@ std::shared_ptr<oatpp::network::Server> runServer(const std::shared_ptr<oatpp::n
}
std::shared_ptr<oatpp::network::Server> runAsyncServer(const std::shared_ptr<oatpp::network::monitor::ConnectionMonitor>& monitor) {
auto router = oatpp::web::server::HttpRouter::createShared();
router->route("GET", "/stream", std::make_shared<AsyncStreamingHandler>());
auto executor = std::make_shared<oatpp::async::Executor>();
auto connectionHandler = oatpp::web::server::AsyncHttpConnectionHandler::createShared(router, executor);
auto server = std::make_shared<oatpp::network::Server>(monitor, connectionHandler);
std::thread t([server, connectionHandler, executor]{
server->run();
OATPP_LOGD("TEST_ASYNC", "server stopped");
connectionHandler->stop();
OATPP_LOGD("TEST_ASYNC", "connectionHandler stopped");
executor->waitTasksFinished();
executor->stop();
executor->join();
OATPP_LOGD("TEST_ASYNC", "executor stopped");
});
t.detach();
return server;
}
void runClient() {
auto connectionProvider = oatpp::network::tcp::client::ConnectionProvider::createShared(
{"0.0.0.0", 8000, oatpp::network::Address::IP_4});
oatpp::web::client::HttpRequestExecutor executor(connectionProvider);
auto response = executor.execute("GET", "/stream", oatpp::web::protocol::http::Headers({}), nullptr, nullptr);
OATPP_ASSERT(response->getStatusCode() == 200);
auto data = response->readBodyToString();
OATPP_ASSERT(data)
OATPP_LOGD("TEST", "data->size() == %d", data->size())
OATPP_ASSERT(data->size() < 110) // it should be less than 100. But we put 110 for redundancy
}
void runAsyncClient() {
class ClientCoroutine : public oatpp::async::Coroutine<ClientCoroutine> {
private:
std::shared_ptr<oatpp::web::client::HttpRequestExecutor> m_executor;
std::shared_ptr<oatpp::network::monitor::ConnectionMonitor> m_monitor;
public:
ClientCoroutine() {
auto connectionProvider = oatpp::network::tcp::client::ConnectionProvider::createShared(
{"0.0.0.0", 8000, oatpp::network::Address::IP_4});
m_monitor = std::make_shared<oatpp::network::monitor::ConnectionMonitor>(connectionProvider);
m_monitor->addMetricsChecker(
std::make_shared<oatpp::network::monitor::ConnectionMaxAgeChecker>(
std::chrono::seconds(5)
)
);
m_executor = oatpp::web::client::HttpRequestExecutor::createShared(m_monitor);
}
Action act() override {
return m_executor->executeAsync("GET", "/stream", oatpp::web::protocol::http::Headers({}), nullptr, nullptr)
.callbackTo(&ClientCoroutine::onResponse);
}
Action onResponse(const std::shared_ptr<oatpp::web::protocol::http::incoming::Response>& response) {
OATPP_ASSERT(response->getStatusCode() == 200);
return response->readBodyToStringAsync().callbackTo(&ClientCoroutine::onBody);
}
Action onBody(const oatpp::String& data) {
OATPP_ASSERT(data)
OATPP_LOGD("TEST", "data->size() == %d", data->size())
OATPP_ASSERT(data->size() < 60) // it should be less than 50. But we put 60 for redundancy
m_monitor->stop();
return finish();
}
};
auto executor = std::make_shared<oatpp::async::Executor>();
executor->execute<ClientCoroutine>();
executor->waitTasksFinished();
OATPP_LOGD("TEST_ASYNC_CLIENT", "task finished")
executor->stop();
OATPP_LOGD("TEST_ASYNC_CLIENT", "executor stopped")
executor->join();
OATPP_LOGD("TEST_ASYNC_CLIENT", "done")
}
}
void ConnectionMonitorTest::onRun() {
@ -95,9 +223,29 @@ void ConnectionMonitorTest::onRun() {
)
);
auto server = runServer(monitor);
{
OATPP_LOGD(TAG, "run simple API test")
auto server = runServer(monitor);
runClient();
server->stop();
std::this_thread::sleep_for(std::chrono::seconds(5));
}
std::this_thread::sleep_for(std::chrono::minutes (10));
{
OATPP_LOGD(TAG, "run Async API test")
auto server = runAsyncServer(monitor);
runClient();
server->stop();
std::this_thread::sleep_for(std::chrono::seconds(5));
}
{
OATPP_LOGD(TAG, "run Async Client test")
auto server = runServer(monitor);
runAsyncClient();
server->stop();
std::this_thread::sleep_for(std::chrono::seconds(5));
}
monitor->stop();