mirror of
https://github.com/oatpp/oatpp.git
synced 2025-04-18 19:00:23 +08:00
ConnectionHandler: proper server stoppage. (#476)
This commit is contained in:
parent
6335fc7acf
commit
c3f22dfb43
@ -210,12 +210,15 @@ std::shared_ptr<Interface::ConnectionSubmission> Interface::connectNonBlocking()
|
||||
return std::make_shared<ConnectionSubmission>(false);
|
||||
}
|
||||
|
||||
std::shared_ptr<Socket> Interface::accept(const bool& waitingHandle) {
|
||||
std::shared_ptr<Socket> Interface::accept(const bool& waitingHandle,
|
||||
const std::chrono::duration<v_int64, std::micro>& timeout) {
|
||||
|
||||
auto startTime = std::chrono::system_clock::now();
|
||||
std::unique_lock<std::mutex> lock(m_mutex);
|
||||
while (waitingHandle && m_submissions.empty()) {
|
||||
m_condition.wait(lock);
|
||||
while (waitingHandle && m_submissions.empty() && std::chrono::system_clock::now() - startTime < timeout) {
|
||||
m_condition.wait_for(lock, std::chrono::milliseconds (100));
|
||||
}
|
||||
if(!waitingHandle) {
|
||||
if(!waitingHandle || m_submissions.empty()) {
|
||||
return nullptr;
|
||||
}
|
||||
const auto submission = m_submissions.front();
|
||||
|
@ -29,6 +29,7 @@
|
||||
|
||||
#include <list>
|
||||
#include <unordered_map>
|
||||
#include <chrono>
|
||||
|
||||
namespace oatpp { namespace network { namespace virtual_ {
|
||||
|
||||
@ -161,9 +162,11 @@ public:
|
||||
* Block and wait for incloming connection.
|
||||
* @param waitingHandle - reference to a boolean variable.
|
||||
* User may set waitingHandle = false and call &l:Interface::notifyAcceptors (); in order to break waiting loop. and exit accept() method.
|
||||
* @param timeout
|
||||
* @return - `std::shared_ptr` to &id:oatpp::network::virtual_::Socket;.
|
||||
*/
|
||||
std::shared_ptr<Socket> accept(const bool& waitingHandle = true);
|
||||
std::shared_ptr<Socket> accept(const bool& waitingHandle = true,
|
||||
const std::chrono::duration<v_int64, std::micro>& timeout = std::chrono::minutes (10));
|
||||
|
||||
/**
|
||||
* Check if incoming connection is available. NonBlocking.
|
||||
|
@ -124,11 +124,13 @@ v_io_size Pipe::Writer::write(const void *data, v_buff_size count, async::Action
|
||||
|
||||
std::lock_guard<std::mutex> lock(pipe.m_mutex);
|
||||
|
||||
if (pipe.m_fifo.availableToWrite() > 0) {
|
||||
result = pipe.m_fifo.write(data, count);
|
||||
} else if (pipe.m_open) {
|
||||
action = async::Action::createWaitListAction(&m_waitList);
|
||||
result = IOError::RETRY_WRITE;
|
||||
if(pipe.m_open) {
|
||||
if (pipe.m_fifo.availableToWrite() > 0) {
|
||||
result = pipe.m_fifo.write(data, count);
|
||||
} else {
|
||||
action = async::Action::createWaitListAction(&m_waitList);
|
||||
result = IOError::RETRY_WRITE;
|
||||
}
|
||||
} else {
|
||||
result = IOError::BROKEN_PIPE;
|
||||
}
|
||||
|
@ -80,8 +80,6 @@ oatpp::data::stream::Context& Socket::getInputStreamContext() {
|
||||
void Socket::close() {
|
||||
m_pipeIn->close();
|
||||
m_pipeOut->close();
|
||||
m_pipeIn.reset();
|
||||
m_pipeOut.reset();
|
||||
}
|
||||
|
||||
}}}
|
||||
|
@ -27,7 +27,8 @@
|
||||
namespace oatpp { namespace network { namespace virtual_ { namespace client {
|
||||
|
||||
void ConnectionProvider::ConnectionInvalidator::invalidate(const std::shared_ptr<data::stream::IOStream>& connection) {
|
||||
(void) connection;
|
||||
auto socket = std::static_pointer_cast<Socket>(connection);
|
||||
socket->close();
|
||||
}
|
||||
|
||||
ConnectionProvider::ConnectionProvider(const std::shared_ptr<virtual_::Interface>& interface)
|
||||
|
@ -24,10 +24,13 @@
|
||||
|
||||
#include "ConnectionProvider.hpp"
|
||||
|
||||
#include <chrono>
|
||||
|
||||
namespace oatpp { namespace network { namespace virtual_ { namespace server {
|
||||
|
||||
void ConnectionProvider::ConnectionInvalidator::invalidate(const std::shared_ptr<data::stream::IOStream>& connection) {
|
||||
(void) connection;
|
||||
auto socket = std::static_pointer_cast<Socket>(connection);
|
||||
socket->close();
|
||||
}
|
||||
|
||||
ConnectionProvider::ConnectionProvider(const std::shared_ptr<virtual_::Interface>& interface)
|
||||
@ -58,7 +61,7 @@ void ConnectionProvider::stop() {
|
||||
}
|
||||
|
||||
provider::ResourceHandle<data::stream::IOStream> ConnectionProvider::get() {
|
||||
auto socket = m_interface->accept(m_open);
|
||||
auto socket = m_interface->accept(m_open, std::chrono::milliseconds(500));
|
||||
if(socket) {
|
||||
socket->setMaxAvailableToReadWrtie(m_maxAvailableToRead, m_maxAvailableToWrite);
|
||||
}
|
||||
|
@ -26,11 +26,39 @@
|
||||
|
||||
namespace oatpp { namespace web { namespace server {
|
||||
|
||||
void AsyncHttpConnectionHandler::onTaskStart(const provider::ResourceHandle<data::stream::IOStream>& connection) {
|
||||
|
||||
std::lock_guard<oatpp::concurrency::SpinLock> lock(m_connectionsLock);
|
||||
m_connections.insert({(v_uint64) connection.object.get(), connection});
|
||||
|
||||
if(!m_continue.load()) {
|
||||
connection.invalidator->invalidate(connection.object);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
void AsyncHttpConnectionHandler::onTaskEnd(const provider::ResourceHandle<data::stream::IOStream>& connection) {
|
||||
std::lock_guard<oatpp::concurrency::SpinLock> lock(m_connectionsLock);
|
||||
m_connections.erase((v_uint64) connection.object.get());
|
||||
}
|
||||
|
||||
void AsyncHttpConnectionHandler::invalidateAllConnections() {
|
||||
std::lock_guard<oatpp::concurrency::SpinLock> lock(m_connectionsLock);
|
||||
for(auto& c : m_connections) {
|
||||
const auto& handle = c.second;
|
||||
handle.invalidator->invalidate(handle.object);
|
||||
}
|
||||
}
|
||||
|
||||
v_uint64 AsyncHttpConnectionHandler::getConnectionsCount() {
|
||||
std::lock_guard<oatpp::concurrency::SpinLock> lock(m_connectionsLock);
|
||||
return m_connections.size();
|
||||
}
|
||||
|
||||
AsyncHttpConnectionHandler::AsyncHttpConnectionHandler(const std::shared_ptr<HttpProcessor::Components>& components,
|
||||
v_int32 threadCount)
|
||||
: m_executor(std::make_shared<oatpp::async::Executor>(threadCount))
|
||||
, m_components(components)
|
||||
, m_spawns(0)
|
||||
, m_continue(true)
|
||||
{
|
||||
m_executor->detach();
|
||||
@ -40,7 +68,6 @@ AsyncHttpConnectionHandler::AsyncHttpConnectionHandler(const std::shared_ptr<Htt
|
||||
const std::shared_ptr<oatpp::async::Executor>& executor)
|
||||
: m_executor(executor)
|
||||
, m_components(components)
|
||||
, m_spawns(0)
|
||||
, m_continue(true)
|
||||
{}
|
||||
|
||||
@ -78,16 +105,20 @@ void AsyncHttpConnectionHandler::handleConnection(const provider::ResourceHandle
|
||||
connection.object->setOutputStreamIOMode(oatpp::data::stream::IOMode::ASYNCHRONOUS);
|
||||
connection.object->setInputStreamIOMode(oatpp::data::stream::IOMode::ASYNCHRONOUS);
|
||||
|
||||
m_executor->execute<HttpProcessor::Coroutine>(m_components, connection, &m_spawns);
|
||||
m_executor->execute<HttpProcessor::Coroutine>(m_components, connection, this);
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
void AsyncHttpConnectionHandler::stop() {
|
||||
/* Wait until all connection-threads are done */
|
||||
m_continue.store(false);
|
||||
while(m_spawns.load() != 0) {
|
||||
|
||||
/* invalidate all connections */
|
||||
invalidateAllConnections();
|
||||
|
||||
/* Wait until all connection-threads are done */
|
||||
while(getConnectionsCount() > 0) {
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(100));
|
||||
}
|
||||
}
|
||||
|
@ -28,18 +28,30 @@
|
||||
#include "oatpp/web/server/HttpProcessor.hpp"
|
||||
#include "oatpp/network/ConnectionHandler.hpp"
|
||||
#include "oatpp/core/async/Executor.hpp"
|
||||
#include "oatpp/core/concurrency/SpinLock.hpp"
|
||||
|
||||
#include <unordered_map>
|
||||
|
||||
namespace oatpp { namespace web { namespace server {
|
||||
|
||||
/**
|
||||
* Asynchronous &id:oatpp::network::ConnectionHandler; for handling http communication.
|
||||
*/
|
||||
class AsyncHttpConnectionHandler : public base::Countable, public network::ConnectionHandler {
|
||||
class AsyncHttpConnectionHandler : public base::Countable, public network::ConnectionHandler, public HttpProcessor::TaskProcessingListener {
|
||||
protected:
|
||||
|
||||
void onTaskStart(const provider::ResourceHandle<data::stream::IOStream>& connection) override;
|
||||
void onTaskEnd(const provider::ResourceHandle<data::stream::IOStream>& connection) override;
|
||||
|
||||
void invalidateAllConnections();
|
||||
v_uint64 getConnectionsCount();
|
||||
|
||||
private:
|
||||
std::shared_ptr<oatpp::async::Executor> m_executor;
|
||||
std::shared_ptr<HttpProcessor::Components> m_components;
|
||||
std::atomic_long m_spawns;
|
||||
std::atomic_bool m_continue;
|
||||
std::unordered_map<v_uint64, provider::ResourceHandle<data::stream::IOStream>> m_connections;
|
||||
oatpp::concurrency::SpinLock m_connectionsLock;
|
||||
public:
|
||||
|
||||
/**
|
||||
|
@ -37,9 +37,37 @@
|
||||
|
||||
namespace oatpp { namespace web { namespace server {
|
||||
|
||||
void HttpConnectionHandler::onTaskStart(const provider::ResourceHandle<data::stream::IOStream>& connection) {
|
||||
|
||||
std::lock_guard<oatpp::concurrency::SpinLock> lock(m_connectionsLock);
|
||||
m_connections.insert({(v_uint64) connection.object.get(), connection});
|
||||
|
||||
if(!m_continue.load()) {
|
||||
connection.invalidator->invalidate(connection.object);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
void HttpConnectionHandler::onTaskEnd(const provider::ResourceHandle<data::stream::IOStream>& connection) {
|
||||
std::lock_guard<oatpp::concurrency::SpinLock> lock(m_connectionsLock);
|
||||
m_connections.erase((v_uint64) connection.object.get());
|
||||
}
|
||||
|
||||
void HttpConnectionHandler::invalidateAllConnections() {
|
||||
std::lock_guard<oatpp::concurrency::SpinLock> lock(m_connectionsLock);
|
||||
for(auto& c : m_connections) {
|
||||
const auto& handle = c.second;
|
||||
handle.invalidator->invalidate(handle.object);
|
||||
}
|
||||
}
|
||||
|
||||
v_uint64 HttpConnectionHandler::getConnectionsCount() {
|
||||
std::lock_guard<oatpp::concurrency::SpinLock> lock(m_connectionsLock);
|
||||
return m_connections.size();
|
||||
}
|
||||
|
||||
HttpConnectionHandler::HttpConnectionHandler(const std::shared_ptr<HttpProcessor::Components>& components)
|
||||
: m_components(components)
|
||||
, m_spawns(0)
|
||||
, m_continue(true)
|
||||
{}
|
||||
|
||||
@ -74,7 +102,7 @@ void HttpConnectionHandler::handleConnection(const provider::ResourceHandle<data
|
||||
connection.object->setInputStreamIOMode(oatpp::data::stream::IOMode::BLOCKING);
|
||||
|
||||
/* Create working thread */
|
||||
std::thread thread(&HttpProcessor::Task::run, std::move(HttpProcessor::Task(m_components, connection, &m_spawns)));
|
||||
std::thread thread(&HttpProcessor::Task::run, std::move(HttpProcessor::Task(m_components, connection, this)));
|
||||
|
||||
/* Get hardware concurrency -1 in order to have 1cpu free of workers. */
|
||||
v_int32 concurrency = oatpp::concurrency::getHardwareConcurrency();
|
||||
@ -95,8 +123,11 @@ void HttpConnectionHandler::handleConnection(const provider::ResourceHandle<data
|
||||
void HttpConnectionHandler::stop() {
|
||||
m_continue.store(false);
|
||||
|
||||
/* invalidate all connections */
|
||||
invalidateAllConnections();
|
||||
|
||||
/* Wait until all connection-threads are done */
|
||||
while(m_spawns.load() != 0) {
|
||||
while(getConnectionsCount() > 0) {
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(100));
|
||||
}
|
||||
}
|
||||
|
@ -27,6 +27,9 @@
|
||||
|
||||
#include "oatpp/web/server/HttpProcessor.hpp"
|
||||
#include "oatpp/network/ConnectionHandler.hpp"
|
||||
#include "oatpp/core/concurrency/SpinLock.hpp"
|
||||
|
||||
#include <unordered_map>
|
||||
|
||||
namespace oatpp { namespace web { namespace server {
|
||||
|
||||
@ -34,11 +37,20 @@ namespace oatpp { namespace web { namespace server {
|
||||
* Simple ConnectionHandler (&id:oatpp::network::ConnectionHandler;) for handling HTTP communication. <br>
|
||||
* Will create one thread per each connection to handle communication.
|
||||
*/
|
||||
class HttpConnectionHandler : public base::Countable, public network::ConnectionHandler {
|
||||
class HttpConnectionHandler : public base::Countable, public network::ConnectionHandler, public HttpProcessor::TaskProcessingListener {
|
||||
protected:
|
||||
|
||||
void onTaskStart(const provider::ResourceHandle<data::stream::IOStream>& connection) override;
|
||||
void onTaskEnd(const provider::ResourceHandle<data::stream::IOStream>& connection) override;
|
||||
|
||||
void invalidateAllConnections();
|
||||
v_uint64 getConnectionsCount();
|
||||
|
||||
private:
|
||||
std::shared_ptr<HttpProcessor::Components> m_components;
|
||||
std::atomic_long m_spawns;
|
||||
std::atomic_bool m_continue;
|
||||
std::unordered_map<v_uint64, provider::ResourceHandle<data::stream::IOStream>> m_connections;
|
||||
oatpp::concurrency::SpinLock m_connectionsLock;
|
||||
public:
|
||||
|
||||
/**
|
||||
|
@ -220,45 +220,33 @@ HttpProcessor::ConnectionState HttpProcessor::processNextRequest(ProcessingResou
|
||||
|
||||
HttpProcessor::Task::Task(const std::shared_ptr<Components>& components,
|
||||
const provider::ResourceHandle<oatpp::data::stream::IOStream>& connection,
|
||||
std::atomic_long *taskCounter)
|
||||
TaskProcessingListener* taskListener)
|
||||
: m_components(components)
|
||||
, m_connection(connection)
|
||||
, m_counter(taskCounter)
|
||||
, m_taskListener(taskListener)
|
||||
{
|
||||
(*m_counter)++;
|
||||
m_taskListener->onTaskStart(m_connection);
|
||||
}
|
||||
|
||||
HttpProcessor::Task::Task(const HttpProcessor::Task ©)
|
||||
: m_components(copy.m_components)
|
||||
, m_connection(copy.m_connection)
|
||||
, m_counter(copy.m_counter)
|
||||
HttpProcessor::Task::Task(HttpProcessor::Task &&other)
|
||||
: m_components(std::move(other.m_components))
|
||||
, m_connection(std::move(other.m_connection))
|
||||
, m_taskListener(other.m_taskListener)
|
||||
{
|
||||
(*m_counter)++;
|
||||
other.m_taskListener = nullptr;
|
||||
}
|
||||
|
||||
HttpProcessor::Task::Task(HttpProcessor::Task &&move)
|
||||
: m_components(std::move(move.m_components))
|
||||
, m_connection(std::move(move.m_connection))
|
||||
, m_counter(move.m_counter)
|
||||
{
|
||||
move.m_counter = nullptr;
|
||||
}
|
||||
|
||||
HttpProcessor::Task &HttpProcessor::Task::operator=(const HttpProcessor::Task &t) {
|
||||
if (this != &t) {
|
||||
m_components = t.m_components;
|
||||
m_connection = t.m_connection;
|
||||
m_counter = t.m_counter;
|
||||
(*m_counter)++;
|
||||
HttpProcessor::Task::~Task() {
|
||||
if (m_taskListener != nullptr) {
|
||||
m_taskListener->onTaskEnd(m_connection);
|
||||
}
|
||||
return *this;
|
||||
}
|
||||
|
||||
HttpProcessor::Task &HttpProcessor::Task::operator=(HttpProcessor::Task &&t) {
|
||||
m_components = std::move(t.m_components);
|
||||
m_connection = std::move(t.m_connection);
|
||||
m_counter = t.m_counter;
|
||||
t.m_counter = nullptr;
|
||||
HttpProcessor::Task &HttpProcessor::Task::operator=(HttpProcessor::Task &&other) {
|
||||
m_components = std::move(other.m_components);
|
||||
m_connection = std::move(other.m_connection);
|
||||
m_taskListener = other.m_taskListener;
|
||||
other.m_taskListener = nullptr;
|
||||
return *this;
|
||||
}
|
||||
|
||||
@ -283,18 +271,13 @@ void HttpProcessor::Task::run(){
|
||||
}
|
||||
|
||||
}
|
||||
HttpProcessor::Task::~Task() {
|
||||
if (m_counter != nullptr) {
|
||||
(*m_counter)--;
|
||||
}
|
||||
}
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
|
||||
// HttpProcessor::Coroutine
|
||||
|
||||
HttpProcessor::Coroutine::Coroutine(const std::shared_ptr<Components>& components,
|
||||
const provider::ResourceHandle<oatpp::data::stream::IOStream>& connection,
|
||||
std::atomic_long *taskCounter)
|
||||
TaskProcessingListener* taskListener)
|
||||
: m_components(components)
|
||||
, m_connection(connection)
|
||||
, m_headersInBuffer(components->config->headersInBufferInitial)
|
||||
@ -302,13 +285,13 @@ HttpProcessor::Coroutine::Coroutine(const std::shared_ptr<Components>& component
|
||||
, m_headersOutBuffer(std::make_shared<oatpp::data::stream::BufferOutputStream>(components->config->headersOutBufferInitial))
|
||||
, m_inStream(data::stream::InputStreamBufferedProxy::createShared(m_connection.object, std::make_shared<std::string>(data::buffer::IOBuffer::BUFFER_SIZE, 0)))
|
||||
, m_connectionState(ConnectionState::ALIVE)
|
||||
, m_counter(taskCounter)
|
||||
, m_taskListener(taskListener)
|
||||
{
|
||||
(*m_counter)++;
|
||||
m_taskListener->onTaskStart(m_connection);
|
||||
}
|
||||
|
||||
HttpProcessor::Coroutine::~Coroutine() {
|
||||
(*m_counter)--;
|
||||
m_taskListener->onTaskEnd(m_connection);
|
||||
}
|
||||
|
||||
HttpProcessor::Coroutine::Action HttpProcessor::Coroutine::act() {
|
||||
|
@ -179,6 +179,17 @@ private:
|
||||
ConnectionState& connectionState);
|
||||
static ConnectionState processNextRequest(ProcessingResources& resources);
|
||||
|
||||
public:
|
||||
|
||||
/**
|
||||
* Listener of the connection processing task.
|
||||
*/
|
||||
class TaskProcessingListener {
|
||||
public:
|
||||
virtual void onTaskStart(const provider::ResourceHandle<data::stream::IOStream>& connection) = 0;
|
||||
virtual void onTaskEnd(const provider::ResourceHandle<data::stream::IOStream>& connection) = 0;
|
||||
};
|
||||
|
||||
public:
|
||||
|
||||
/**
|
||||
@ -190,7 +201,7 @@ public:
|
||||
private:
|
||||
std::shared_ptr<Components> m_components;
|
||||
provider::ResourceHandle<oatpp::data::stream::IOStream> m_connection;
|
||||
std::atomic_long *m_counter;
|
||||
TaskProcessingListener* m_taskListener;
|
||||
public:
|
||||
|
||||
/**
|
||||
@ -200,31 +211,22 @@ public:
|
||||
*/
|
||||
Task(const std::shared_ptr<Components>& components,
|
||||
const provider::ResourceHandle<oatpp::data::stream::IOStream>& connection,
|
||||
std::atomic_long *taskCounter);
|
||||
TaskProcessingListener* taskListener);
|
||||
|
||||
/**
|
||||
* Copy-Constructor to correctly count tasks.
|
||||
*/
|
||||
Task(const Task ©);
|
||||
|
||||
/**
|
||||
* Copy-Assignment to correctly count tasks.
|
||||
* @param t - Task to copy
|
||||
* @return
|
||||
*/
|
||||
Task &operator=(const Task &t);
|
||||
Task(const Task&) = delete;
|
||||
Task &operator=(const Task&) = delete;
|
||||
|
||||
/**
|
||||
* Move-Constructor to correclty count tasks;
|
||||
*/
|
||||
Task(Task &&move);
|
||||
Task(Task &&other);
|
||||
|
||||
/**
|
||||
* Move-Assignment to correctly count tasks.
|
||||
* @param t
|
||||
* @return
|
||||
*/
|
||||
Task &operator=(Task &&t);
|
||||
Task &operator=(Task &&other);
|
||||
|
||||
/**
|
||||
* Destructor, needed for counting.
|
||||
@ -258,10 +260,9 @@ public:
|
||||
oatpp::web::server::HttpRouter::BranchRouter::Route m_currentRoute;
|
||||
std::shared_ptr<protocol::http::incoming::Request> m_currentRequest;
|
||||
std::shared_ptr<protocol::http::outgoing::Response> m_currentResponse;
|
||||
std::atomic_long *m_counter;
|
||||
TaskProcessingListener* m_taskListener;
|
||||
public:
|
||||
|
||||
|
||||
/**
|
||||
* Constructor.
|
||||
* @param components - &l:HttpProcessor::Components;.
|
||||
@ -269,7 +270,9 @@ public:
|
||||
*/
|
||||
Coroutine(const std::shared_ptr<Components>& components,
|
||||
const provider::ResourceHandle<oatpp::data::stream::IOStream>& connection,
|
||||
std::atomic_long *taskCounter);
|
||||
TaskProcessingListener* taskListener);
|
||||
|
||||
~Coroutine() override;
|
||||
|
||||
Action act() override;
|
||||
|
||||
@ -283,8 +286,6 @@ public:
|
||||
Action onRequestDone();
|
||||
|
||||
Action handleError(Error* error) override;
|
||||
|
||||
~Coroutine() override;
|
||||
|
||||
};
|
||||
|
||||
|
@ -91,6 +91,8 @@ add_executable(oatppAllTests
|
||||
oatpp/web/server/handler/AuthorizationHandlerTest.hpp
|
||||
oatpp/web/server/HttpRouterTest.cpp
|
||||
oatpp/web/server/HttpRouterTest.hpp
|
||||
oatpp/web/server/ServerStopTest.cpp
|
||||
oatpp/web/server/ServerStopTest.hpp
|
||||
oatpp/web/ClientRetryTest.cpp
|
||||
oatpp/web/ClientRetryTest.hpp
|
||||
oatpp/web/PipelineTest.cpp
|
||||
|
@ -9,6 +9,7 @@
|
||||
#include "oatpp/web/server/api/ApiControllerTest.hpp"
|
||||
#include "oatpp/web/server/handler/AuthorizationHandlerTest.hpp"
|
||||
#include "oatpp/web/server/HttpRouterTest.hpp"
|
||||
#include "oatpp/web/server/ServerStopTest.hpp"
|
||||
#include "oatpp/web/mime/multipart/StatefulParserTest.hpp"
|
||||
|
||||
#include "oatpp/network/virtual_/PipeTest.hpp"
|
||||
@ -142,6 +143,16 @@ void runTests() {
|
||||
OATPP_RUN_TEST(oatpp::test::web::server::api::ApiControllerTest);
|
||||
OATPP_RUN_TEST(oatpp::test::web::server::handler::AuthorizationHandlerTest);
|
||||
|
||||
{
|
||||
|
||||
oatpp::test::web::server::ServerStopTest test_virtual(0);
|
||||
test_virtual.run();
|
||||
|
||||
oatpp::test::web::server::ServerStopTest test_port(8000);
|
||||
test_port.run();
|
||||
|
||||
}
|
||||
|
||||
{
|
||||
|
||||
oatpp::test::web::PipelineTest test_virtual(0, 3000);
|
||||
|
240
test/oatpp/web/server/ServerStopTest.cpp
Normal file
240
test/oatpp/web/server/ServerStopTest.cpp
Normal file
@ -0,0 +1,240 @@
|
||||
/***************************************************************************
|
||||
*
|
||||
* Project _____ __ ____ _ _
|
||||
* ( _ ) /__\ (_ _)_| |_ _| |_
|
||||
* )(_)( /(__)\ )( (_ _)(_ _)
|
||||
* (_____)(__)(__)(__) |_| |_|
|
||||
*
|
||||
*
|
||||
* Copyright 2018-present, Leonid Stryzhevskyi <lganzzzo@gmail.com>
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*
|
||||
***************************************************************************/
|
||||
|
||||
#include "ServerStopTest.hpp"
|
||||
|
||||
#include "oatpp/web/client/HttpRequestExecutor.hpp"
|
||||
#include "oatpp/web/server/AsyncHttpConnectionHandler.hpp"
|
||||
#include "oatpp/web/server/HttpConnectionHandler.hpp"
|
||||
#include "oatpp/web/protocol/http/outgoing/StreamingBody.hpp"
|
||||
|
||||
#include "oatpp/network/virtual_/server/ConnectionProvider.hpp"
|
||||
#include "oatpp/network/virtual_/client/ConnectionProvider.hpp"
|
||||
#include "oatpp/network/tcp/server/ConnectionProvider.hpp"
|
||||
#include "oatpp/network/tcp/client/ConnectionProvider.hpp"
|
||||
#include "oatpp/network/Server.hpp"
|
||||
|
||||
namespace oatpp { namespace test { namespace web { namespace server {
|
||||
|
||||
namespace {
|
||||
|
||||
class ReadCallback : public oatpp::data::stream::ReadCallback {
|
||||
public:
|
||||
|
||||
v_io_size read(void *buffer, v_buff_size count, async::Action &action) override {
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(100));
|
||||
char *data = (char *) buffer;
|
||||
data[0] = 'A';
|
||||
return 1;
|
||||
}
|
||||
|
||||
};
|
||||
|
||||
class AsyncReadCallback : public oatpp::data::stream::ReadCallback {
|
||||
private:
|
||||
bool wait = false;
|
||||
public:
|
||||
|
||||
v_io_size read(void *buffer, v_buff_size count, async::Action &action) override {
|
||||
wait = !wait;
|
||||
if(wait) {
|
||||
action = oatpp::async::Action::createWaitRepeatAction(
|
||||
oatpp::base::Environment::getMicroTickCount() + 100 * 1000);
|
||||
return oatpp::IOError::RETRY_READ;
|
||||
}
|
||||
char *data = (char *) buffer;
|
||||
data[0] = 'A';
|
||||
return 1;
|
||||
}
|
||||
|
||||
};
|
||||
|
||||
class StreamingHandler : public oatpp::web::server::HttpRequestHandler {
|
||||
public:
|
||||
|
||||
std::shared_ptr<OutgoingResponse> handle(const std::shared_ptr<IncomingRequest> &request) override {
|
||||
auto body = std::make_shared<oatpp::web::protocol::http::outgoing::StreamingBody>
|
||||
(std::make_shared<ReadCallback>());
|
||||
return OutgoingResponse::createShared(Status::CODE_200, body);
|
||||
}
|
||||
|
||||
};
|
||||
|
||||
class AsyncStreamingHandler : public oatpp::web::server::HttpRequestHandler {
|
||||
public:
|
||||
|
||||
oatpp::async::CoroutineStarterForResult<const std::shared_ptr<OutgoingResponse> &>
|
||||
handleAsync(const std::shared_ptr<IncomingRequest> &request) {
|
||||
|
||||
class StreamCoroutine
|
||||
: public oatpp::async::CoroutineWithResult<StreamCoroutine, const std::shared_ptr<OutgoingResponse> &> {
|
||||
public:
|
||||
|
||||
Action act() override {
|
||||
auto body = std::make_shared<oatpp::web::protocol::http::outgoing::StreamingBody>
|
||||
(std::make_shared<AsyncReadCallback>());
|
||||
return _return(OutgoingResponse::createShared(Status::CODE_200, body));
|
||||
}
|
||||
|
||||
};
|
||||
|
||||
return StreamCoroutine::startForResult();
|
||||
|
||||
}
|
||||
|
||||
};
|
||||
|
||||
std::shared_ptr<oatpp::network::Server>
|
||||
runServer(const std::shared_ptr<oatpp::network::ServerConnectionProvider>& connectionProvider) {
|
||||
|
||||
auto router = oatpp::web::server::HttpRouter::createShared();
|
||||
router->route("GET", "/stream", std::make_shared<StreamingHandler>());
|
||||
|
||||
auto connectionHandler = oatpp::web::server::HttpConnectionHandler::createShared(router);
|
||||
|
||||
auto server = std::make_shared<oatpp::network::Server>(connectionProvider, connectionHandler);
|
||||
|
||||
std::thread t([server, connectionHandler] {
|
||||
server->run();
|
||||
OATPP_LOGD("TEST", "server stopped");
|
||||
connectionHandler->stop();
|
||||
OATPP_LOGD("TEST", "connectionHandler stopped");
|
||||
});
|
||||
t.detach();
|
||||
|
||||
return server;
|
||||
|
||||
}
|
||||
|
||||
std::shared_ptr<oatpp::network::Server>
|
||||
runAsyncServer(const std::shared_ptr<oatpp::network::ServerConnectionProvider>& connectionProvider) {
|
||||
|
||||
auto router = oatpp::web::server::HttpRouter::createShared();
|
||||
router->route("GET", "/stream", std::make_shared<AsyncStreamingHandler>());
|
||||
|
||||
auto executor = std::make_shared<oatpp::async::Executor>();
|
||||
|
||||
auto connectionHandler = oatpp::web::server::AsyncHttpConnectionHandler::createShared(router, executor);
|
||||
|
||||
auto server = std::make_shared<oatpp::network::Server>(connectionProvider, connectionHandler);
|
||||
|
||||
std::thread t([server, connectionHandler, executor] {
|
||||
server->run();
|
||||
OATPP_LOGD("TEST_ASYNC", "server stopped");
|
||||
connectionHandler->stop();
|
||||
OATPP_LOGD("TEST_ASYNC", "connectionHandler stopped");
|
||||
executor->waitTasksFinished();
|
||||
executor->stop();
|
||||
executor->join();
|
||||
OATPP_LOGD("TEST_ASYNC", "executor stopped");
|
||||
});
|
||||
t.detach();
|
||||
|
||||
return server;
|
||||
|
||||
}
|
||||
|
||||
void runClient(const std::shared_ptr<oatpp::network::ClientConnectionProvider>& connectionProvider) {
|
||||
|
||||
oatpp::web::client::HttpRequestExecutor executor(connectionProvider);
|
||||
|
||||
auto response = executor.execute("GET", "/stream", oatpp::web::protocol::http::Headers({}), nullptr, nullptr);
|
||||
|
||||
OATPP_ASSERT(response->getStatusCode() == 200);
|
||||
auto data = response->readBodyToString();
|
||||
|
||||
OATPP_ASSERT(data)
|
||||
OATPP_LOGD("TEST", "data->size() == %d", data->size())
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
void ServerStopTest::onRun() {
|
||||
|
||||
std::shared_ptr<oatpp::network::ServerConnectionProvider> serverConnectionProvider;
|
||||
std::shared_ptr<oatpp::network::ClientConnectionProvider> clientConnectionProvider;
|
||||
|
||||
if(m_port == 0) {
|
||||
auto interface = oatpp::network::virtual_::Interface::obtainShared("virtualhost");
|
||||
serverConnectionProvider = oatpp::network::virtual_::server::ConnectionProvider::createShared(interface);
|
||||
clientConnectionProvider = oatpp::network::virtual_::client::ConnectionProvider::createShared(interface);
|
||||
} else {
|
||||
serverConnectionProvider = oatpp::network::tcp::server::ConnectionProvider::createShared({"localhost", 8000});
|
||||
clientConnectionProvider = oatpp::network::tcp::client::ConnectionProvider::createShared({"localhost", 8000});
|
||||
}
|
||||
|
||||
{
|
||||
OATPP_LOGD(TAG, "Run Simple API test on host=%s, port=%s",
|
||||
serverConnectionProvider->getProperty("host").toString()->c_str(),
|
||||
serverConnectionProvider->getProperty("port").toString()->c_str())
|
||||
|
||||
auto server = runServer(serverConnectionProvider);
|
||||
std::list<std::thread> threads;
|
||||
|
||||
for(v_int32 i = 0; i < 100; i ++) {
|
||||
threads.emplace_back(std::thread([clientConnectionProvider]{
|
||||
runClient(clientConnectionProvider);
|
||||
}));
|
||||
}
|
||||
std::this_thread::sleep_for(std::chrono::seconds(10));
|
||||
server->stop();
|
||||
|
||||
for(auto& t : threads) {
|
||||
t.join();
|
||||
}
|
||||
|
||||
/* wait connection handler to stop */
|
||||
std::this_thread::sleep_for(std::chrono::seconds(5));
|
||||
OATPP_LOGD(TAG, "DONE");
|
||||
}
|
||||
|
||||
{
|
||||
OATPP_LOGD(TAG, "Run Async API test on host=%s, port=%s",
|
||||
serverConnectionProvider->getProperty("host").toString()->c_str(),
|
||||
serverConnectionProvider->getProperty("port").toString()->c_str())
|
||||
|
||||
auto server = runAsyncServer(serverConnectionProvider);
|
||||
std::list<std::thread> threads;
|
||||
|
||||
for(v_int32 i = 0; i < 100; i ++) {
|
||||
threads.emplace_back(std::thread([clientConnectionProvider]{
|
||||
runClient(clientConnectionProvider);
|
||||
}));
|
||||
}
|
||||
std::this_thread::sleep_for(std::chrono::seconds(10));
|
||||
server->stop();
|
||||
|
||||
for(auto& t : threads) {
|
||||
t.join();
|
||||
}
|
||||
|
||||
/* wait connection handler to stop */
|
||||
std::this_thread::sleep_for(std::chrono::seconds(5));
|
||||
OATPP_LOGD(TAG, "DONE");
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}}}}
|
48
test/oatpp/web/server/ServerStopTest.hpp
Normal file
48
test/oatpp/web/server/ServerStopTest.hpp
Normal file
@ -0,0 +1,48 @@
|
||||
/***************************************************************************
|
||||
*
|
||||
* Project _____ __ ____ _ _
|
||||
* ( _ ) /__\ (_ _)_| |_ _| |_
|
||||
* )(_)( /(__)\ )( (_ _)(_ _)
|
||||
* (_____)(__)(__)(__) |_| |_|
|
||||
*
|
||||
*
|
||||
* Copyright 2018-present, Leonid Stryzhevskyi <lganzzzo@gmail.com>
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*
|
||||
***************************************************************************/
|
||||
|
||||
#ifndef oatpp_test_web_server_ServerStopTest_hpp
|
||||
#define oatpp_test_web_server_ServerStopTest_hpp
|
||||
|
||||
#include "oatpp-test/UnitTest.hpp"
|
||||
|
||||
namespace oatpp { namespace test { namespace web { namespace server {
|
||||
|
||||
class ServerStopTest : public UnitTest {
|
||||
private:
|
||||
v_uint16 m_port;
|
||||
public:
|
||||
|
||||
ServerStopTest(v_uint16 port)
|
||||
: UnitTest("TEST[web::server::ServerStopTest]")
|
||||
, m_port(port)
|
||||
{}
|
||||
|
||||
void onRun() override;
|
||||
|
||||
};
|
||||
|
||||
}}}}
|
||||
|
||||
#endif /* oatpp_test_web_server_ServerStopTest_hpp */
|
Loading…
x
Reference in New Issue
Block a user