Merge branch 'v1.3.0' into types_refactoring

This commit is contained in:
lganzzzo 2021-03-25 04:26:43 +02:00
commit 403835eeb7
6 changed files with 141 additions and 27 deletions

View File

@ -30,6 +30,8 @@ AsyncHttpConnectionHandler::AsyncHttpConnectionHandler(const std::shared_ptr<Htt
v_int32 threadCount)
: m_executor(std::make_shared<oatpp::async::Executor>(threadCount))
, m_components(components)
, m_spawns(0)
, m_continue(true)
{
m_executor->detach();
}
@ -38,6 +40,8 @@ AsyncHttpConnectionHandler::AsyncHttpConnectionHandler(const std::shared_ptr<Htt
const std::shared_ptr<oatpp::async::Executor>& executor)
: m_executor(executor)
, m_components(components)
, m_spawns(0)
, m_continue(true)
{}
std::shared_ptr<AsyncHttpConnectionHandler> AsyncHttpConnectionHandler::createShared(const std::shared_ptr<HttpRouter>& router, v_int32 threadCount){
@ -69,15 +73,23 @@ void AsyncHttpConnectionHandler::handleConnection(const std::shared_ptr<IOStream
(void)params;
connection->setOutputStreamIOMode(oatpp::data::stream::IOMode::ASYNCHRONOUS);
connection->setInputStreamIOMode(oatpp::data::stream::IOMode::ASYNCHRONOUS);
if (m_continue.load()) {
m_executor->execute<HttpProcessor::Coroutine>(m_components, connection);
connection->setOutputStreamIOMode(oatpp::data::stream::IOMode::ASYNCHRONOUS);
connection->setInputStreamIOMode(oatpp::data::stream::IOMode::ASYNCHRONOUS);
m_executor->execute<HttpProcessor::Coroutine>(m_components, connection, &m_spawns);
}
}
void AsyncHttpConnectionHandler::stop() {
// DO NOTHING
/* Wait until all connection-threads are done */
m_continue.store(false);
while(m_spawns.load() != 0) {
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
}
}}}

View File

@ -37,8 +37,9 @@ namespace oatpp { namespace web { namespace server {
class AsyncHttpConnectionHandler : public base::Countable, public network::ConnectionHandler {
private:
std::shared_ptr<oatpp::async::Executor> m_executor;
private:
std::shared_ptr<HttpProcessor::Components> m_components;
std::atomic_long m_spawns;
std::atomic_bool m_continue;
public:
/**

View File

@ -39,6 +39,8 @@ namespace oatpp { namespace web { namespace server {
HttpConnectionHandler::HttpConnectionHandler(const std::shared_ptr<HttpProcessor::Components>& components)
: m_components(components)
, m_spawns(0)
, m_continue(true)
{}
std::shared_ptr<HttpConnectionHandler> HttpConnectionHandler::createShared(const std::shared_ptr<HttpRouter>& router){
@ -66,27 +68,37 @@ void HttpConnectionHandler::handleConnection(const std::shared_ptr<oatpp::data::
(void)params;
connection->setOutputStreamIOMode(oatpp::data::stream::IOMode::BLOCKING);
connection->setInputStreamIOMode(oatpp::data::stream::IOMode::BLOCKING);
if (m_continue.load()) {
/* Create working thread */
std::thread thread(&HttpProcessor::Task::run, HttpProcessor::Task(m_components, connection));
/* Get hardware concurrency -1 in order to have 1cpu free of workers. */
v_int32 concurrency = oatpp::concurrency::getHardwareConcurrency();
if(concurrency > 1) {
concurrency -= 1;
connection->setOutputStreamIOMode(oatpp::data::stream::IOMode::BLOCKING);
connection->setInputStreamIOMode(oatpp::data::stream::IOMode::BLOCKING);
/* Create working thread */
std::thread thread(&HttpProcessor::Task::run, std::move(HttpProcessor::Task(m_components, connection, &m_spawns)));
/* Get hardware concurrency -1 in order to have 1cpu free of workers. */
v_int32 concurrency = oatpp::concurrency::getHardwareConcurrency();
if (concurrency > 1) {
concurrency -= 1;
}
/* Set thread affinity group CPUs [0..cpu_count - 1]. Leave one cpu free of workers */
oatpp::concurrency::setThreadAffinityToCpuRange(thread.native_handle(),
0,
concurrency - 1 /* -1 because 0-based index */);
thread.detach();
}
/* Set thread affinity group CPUs [0..cpu_count - 1]. Leave one cpu free of workers */
oatpp::concurrency::setThreadAffinityToCpuRange(thread.native_handle(), 0, concurrency - 1 /* -1 because 0-based index */);
thread.detach();
}
void HttpConnectionHandler::stop() {
// DO NOTHING
m_continue.store(false);
/* Wait until all connection-threads are done */
while(m_spawns.load() != 0) {
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
}
}}}

View File

@ -37,6 +37,8 @@ namespace oatpp { namespace web { namespace server {
class HttpConnectionHandler : public base::Countable, public network::ConnectionHandler {
private:
std::shared_ptr<HttpProcessor::Components> m_components;
std::atomic_long m_spawns;
std::atomic_bool m_continue;
public:
/**

View File

@ -219,10 +219,48 @@ HttpProcessor::ConnectionState HttpProcessor::processNextRequest(ProcessingResou
// Task
HttpProcessor::Task::Task(const std::shared_ptr<Components>& components,
const std::shared_ptr<oatpp::data::stream::IOStream>& connection)
const std::shared_ptr<oatpp::data::stream::IOStream>& connection,
std::atomic_long *taskCounter)
: m_components(components)
, m_connection(connection)
{}
, m_counter(taskCounter)
{
(*m_counter)++;
}
HttpProcessor::Task::Task(const HttpProcessor::Task &copy)
: m_components(copy.m_components)
, m_connection(copy.m_connection)
, m_counter(copy.m_counter)
{
(*m_counter)++;
}
HttpProcessor::Task::Task(HttpProcessor::Task &&move)
: m_components(std::move(move.m_components))
, m_connection(std::move(move.m_connection))
, m_counter(move.m_counter)
{
move.m_counter = nullptr;
}
HttpProcessor::Task &HttpProcessor::Task::operator=(const HttpProcessor::Task &t) {
if (this != &t) {
m_components = t.m_components;
m_connection = t.m_connection;
m_counter = t.m_counter;
(*m_counter)++;
}
return *this;
}
HttpProcessor::Task &HttpProcessor::Task::operator=(HttpProcessor::Task &&t) {
m_components = std::move(t.m_components);
m_connection = std::move(t.m_connection);
m_counter = t.m_counter;
t.m_counter = nullptr;
return *this;
}
void HttpProcessor::Task::run(){
@ -245,12 +283,18 @@ void HttpProcessor::Task::run(){
}
}
HttpProcessor::Task::~Task() {
if (m_counter != nullptr) {
(*m_counter)--;
}
}
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// HttpProcessor::Coroutine
HttpProcessor::Coroutine::Coroutine(const std::shared_ptr<Components>& components,
const std::shared_ptr<oatpp::data::stream::IOStream>& connection)
const std::shared_ptr<oatpp::data::stream::IOStream>& connection,
std::atomic_long *taskCounter)
: m_components(components)
, m_connection(connection)
, m_headersInBuffer(components->config->headersInBufferInitial)
@ -258,7 +302,14 @@ HttpProcessor::Coroutine::Coroutine(const std::shared_ptr<Components>& component
, m_headersOutBuffer(std::make_shared<oatpp::data::stream::BufferOutputStream>(components->config->headersOutBufferInitial))
, m_inStream(data::stream::InputStreamBufferedProxy::createShared(m_connection, std::make_shared<std::string>(data::buffer::IOBuffer::BUFFER_SIZE, 0)))
, m_connectionState(ConnectionState::ALIVE)
{}
, m_counter(taskCounter)
{
(*m_counter)++;
}
HttpProcessor::Coroutine::~Coroutine() {
(*m_counter)--;
}
HttpProcessor::Coroutine::Action HttpProcessor::Coroutine::act() {
return m_connection->initContextsAsync().next(yieldTo(&HttpProcessor::Coroutine::parseHeaders));

View File

@ -190,6 +190,7 @@ public:
private:
std::shared_ptr<Components> m_components;
std::shared_ptr<oatpp::data::stream::IOStream> m_connection;
std::atomic_long *m_counter;
public:
/**
@ -198,7 +199,38 @@ public:
* @param connection - &id:oatpp::data::stream::IOStream;.
*/
Task(const std::shared_ptr<Components>& components,
const std::shared_ptr<oatpp::data::stream::IOStream>& connection);
const std::shared_ptr<oatpp::data::stream::IOStream>& connection,
std::atomic_long *taskCounter);
/**
* Copy-Constructor to correctly count tasks.
*/
Task(const Task &copy);
/**
* Copy-Assignment to correctly count tasks.
* @param t - Task to copy
* @return
*/
Task &operator=(const Task &t);
/**
* Move-Constructor to correclty count tasks;
*/
Task(Task &&move);
/**
* Move-Assignment to correctly count tasks.
* @param t
* @return
*/
Task &operator=(Task &&t);
/**
* Destructor, needed for counting.
*/
~Task() override;
public:
/**
@ -226,6 +258,7 @@ public:
oatpp::web::server::HttpRouter::BranchRouter::Route m_currentRoute;
std::shared_ptr<protocol::http::incoming::Request> m_currentRequest;
std::shared_ptr<protocol::http::outgoing::Response> m_currentResponse;
std::atomic_long *m_counter;
public:
@ -235,8 +268,9 @@ public:
* @param connection - &id:oatpp::data::stream::IOStream;.
*/
Coroutine(const std::shared_ptr<Components>& components,
const std::shared_ptr<oatpp::data::stream::IOStream>& connection);
const std::shared_ptr<oatpp::data::stream::IOStream>& connection,
std::atomic_long *taskCounter);
Action act() override;
Action parseHeaders();
@ -249,6 +283,8 @@ public:
Action onRequestDone();
Action handleError(Error* error) override;
~Coroutine() override;
};