From 2592f79934521f49d3cab487e08543f3d5fca002 Mon Sep 17 00:00:00 2001 From: lganzzzo Date: Sun, 20 Oct 2019 05:27:22 +0300 Subject: [PATCH] Tests. ConnectionPoolTest. --- src/oatpp/core/async/Processor.cpp | 20 ++- src/oatpp/network/ConnectionPool.cpp | 210 ++++++++++++++++++---- src/oatpp/network/ConnectionPool.hpp | 31 +++- test/CMakeLists.txt | 5 +- test/oatpp/AllTestsMain.cpp | 4 + test/oatpp/network/ConnectionPoolTest.cpp | 179 ++++++++++++++++++ test/oatpp/network/ConnectionPoolTest.hpp | 43 +++++ 7 files changed, 447 insertions(+), 45 deletions(-) create mode 100644 test/oatpp/network/ConnectionPoolTest.cpp create mode 100644 test/oatpp/network/ConnectionPoolTest.hpp diff --git a/src/oatpp/core/async/Processor.cpp b/src/oatpp/core/async/Processor.cpp index 6fe750a8..ab9471dc 100644 --- a/src/oatpp/core/async/Processor.cpp +++ b/src/oatpp/core/async/Processor.cpp @@ -176,15 +176,25 @@ void Processor::pushQueues() { if (m_pushList.count < MAX_BATCH_SIZE && m_queue.first != nullptr) { std::unique_lock lock(m_taskLock, std::try_to_lock); if (lock.owns_lock()) { - while(m_pushList.first != nullptr) { - addCoroutine(m_pushList.popFront()); + oatpp::collection::FastQueue tmpList; + oatpp::collection::FastQueue::moveAll(m_pushList, tmpList); + lock.unlock(); + while(tmpList.first != nullptr) { + addCoroutine(tmpList.popFront()); } } } else { - std::lock_guard lock(m_taskLock); - while(m_pushList.first != nullptr) { - addCoroutine(m_pushList.popFront()); + oatpp::collection::FastQueue tmpList; + + { + std::lock_guard lock(m_taskLock); + oatpp::collection::FastQueue::moveAll(m_pushList, tmpList); } + + while(tmpList.first != nullptr) { + addCoroutine(tmpList.popFront()); + } + } } diff --git a/src/oatpp/network/ConnectionPool.cpp b/src/oatpp/network/ConnectionPool.cpp index 376c5ab0..31f1e03e 100644 --- a/src/oatpp/network/ConnectionPool.cpp +++ b/src/oatpp/network/ConnectionPool.cpp @@ -24,38 +24,29 @@ #include "ConnectionPool.hpp" +#include +#include + namespace oatpp { namespace network { //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// // ConnectionPool::Pool -void ConnectionPool::pushConnection(Pool* pool, const std::shared_ptr& connection, v_int32 inc) { +void ConnectionPool::Pool::onNewItem(oatpp::async::CoroutineWaitList& list) { - { - std::lock_guard lock(pool->lock); - - if (inc >= 0) { - pool->connections.push_back({connection, oatpp::base::Environment::getMicroTickCount()}); - } else { - pool->size --; - } - - if (inc > 0) { - pool->size ++; - } + std::lock_guard lockGuard(lock); + if(!isOpen) { + list.notifyAll(); + return; } - pool->condition.notify_one(); + if(size < maxConnections || connections.size() > 0) { + list.notifyFirst(); + } } -std::shared_ptr ConnectionPool::popConnection_NON_BLOCKING(Pool* pool) { - auto result = pool->connections.front(); - pool->connections.pop_front(); - return result.connection; -} - //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// // ConnectionPool::ConnectionWrapper @@ -109,33 +100,192 @@ void ConnectionPool::ConnectionWrapper::invalidate() { m_recycleConnection = false; } +bool ConnectionPool::ConnectionWrapper::isValid() { + return m_connection && m_recycleConnection; +} + //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// // ConnectionPool +ConnectionPool::ConnectionPool(const std::shared_ptr& connectionProvider, + v_int64 maxConnections, + v_int64 maxConnectionTTL) + : m_pool(std::make_shared(maxConnections, maxConnectionTTL)) + , m_connectionProvider(connectionProvider) +{ + + std::thread poolCleanupTask(cleanupTask, m_pool); + poolCleanupTask.detach(); + +} + +void ConnectionPool::cleanupTask(std::shared_ptr pool) { + + while(pool->isOpen) { + + { + + std::lock_guard lock(pool->lock); + auto ticks = oatpp::base::Environment::getMicroTickCount(); + + std::list::iterator i = pool->connections.begin(); + while (i != pool->connections.end()) { + + if(ticks - i->timestamp > pool->maxConnectionTTL) { + i = pool->connections.erase(i); + pool->size --; + } else { + i ++; + } + + } + + } + + std::this_thread::sleep_for(std::chrono::milliseconds(500)); + + } + +} + +void ConnectionPool::pushConnection(Pool* pool, const std::shared_ptr& connection, v_int32 inc) { + + { + std::lock_guard lock(pool->lock); + + if(!pool->isOpen) { + pool->size --; + return; + } + + if (inc >= 0) { + pool->connections.push_back({connection, oatpp::base::Environment::getMicroTickCount()}); + } else { + pool->size --; + } + + if (inc > 0) { + pool->size ++; + } + + } + + pool->condition.notify_one(); + pool->waitList.notifyFirst(); + +} + +std::shared_ptr ConnectionPool::popConnection_NON_BLOCKING(Pool* pool) { + auto result = pool->connections.front(); + pool->connections.pop_front(); + return result.connection; +} + std::shared_ptr ConnectionPool::getConnection() { - std::unique_lock lock(m_pool->lock); - while (m_pool->size >= m_maxConnections && m_pool->connections.size() == 0) { - m_pool->condition.wait(lock); + { + + std::unique_lock lock(m_pool->lock); + + while (m_pool->isOpen && m_pool->size >= m_pool->maxConnections && m_pool->connections.size() == 0) { + m_pool->condition.wait(lock); + } + + if(!m_pool->isOpen) { + return nullptr; + } + + if (m_pool->connections.size() > 0) { + return std::make_shared(popConnection_NON_BLOCKING(m_pool.get()), m_pool); + } else { + m_pool->size ++; + } + } - if(m_pool->connections.size() == 0) { - m_pool->size ++; - return std::make_shared(m_connectionProvider->getConnection(), m_pool); - } - - return std::make_shared(popConnection_NON_BLOCKING(m_pool.get()), m_pool); + return std::make_shared(m_connectionProvider->getConnection(), m_pool); } oatpp::async::CoroutineStarterForResult&> ConnectionPool::getConnectionAsync() { - + class ConnectionCoroutine : public oatpp::async::CoroutineWithResult&> { + private: + std::shared_ptr m_pool; + std::shared_ptr m_connectionProvider; + bool m_wasInc; + public: + + ConnectionCoroutine(const std::shared_ptr& pool, + const std::shared_ptr& connectionProvider) + : m_pool(pool) + , m_connectionProvider(connectionProvider) + , m_wasInc(false) + {} + + Action act() override { + + { + /* Careful!!! Using non-async lock */ + std::unique_lock lock(m_pool->lock); + + if (m_pool->isOpen && m_pool->size >= m_pool->maxConnections && m_pool->connections.size() == 0) { + lock.unlock(); + return Action::createWaitListAction(&m_pool->waitList); + } + + if(!m_pool->isOpen) { + lock.unlock(); + return _return(nullptr); + } + + if (m_pool->connections.size() > 0) { + auto connection = std::make_shared(popConnection_NON_BLOCKING(m_pool.get()), m_pool); + lock.unlock(); + return _return(connection); + } else { + m_pool->size ++; + m_wasInc = true; + } + + } + + return m_connectionProvider->getConnectionAsync().callbackTo(&ConnectionCoroutine::onConnection); + + } + + Action onConnection(const std::shared_ptr& connection) { + return _return(std::make_shared(connection, m_pool)); + } + + Action handleError(oatpp::async::Error* error) override { + if(m_wasInc) { + /* Careful!!! Using non-async lock */ + std::lock_guard lock(m_pool->lock); + m_pool->size --; + } + return error; + } + + }; + + return ConnectionCoroutine::startForResult(m_pool, m_connectionProvider); } void ConnectionPool::close() { + { + std::lock_guard lock(m_pool->lock); + m_pool->isOpen = false; + auto size = m_pool->connections.size(); + m_pool->connections.clear(); + m_pool->size -= size; + } + + m_pool->condition.notify_all(); + m_pool->waitList.notifyAll(); + } }} diff --git a/src/oatpp/network/ConnectionPool.hpp b/src/oatpp/network/ConnectionPool.hpp index d335e6b8..cef7666d 100644 --- a/src/oatpp/network/ConnectionPool.hpp +++ b/src/oatpp/network/ConnectionPool.hpp @@ -28,6 +28,7 @@ #include "ConnectionProvider.hpp" #include "oatpp/core/data/stream/Stream.hpp" +#include "oatpp/core/async/CoroutineWaitList.hpp" #include #include @@ -46,13 +47,27 @@ private: private: - class Pool { + class Pool : private oatpp::async::CoroutineWaitList::Listener { friend ConnectionPool; private: + void onNewItem(oatpp::async::CoroutineWaitList& list) override; + private: + oatpp::async::CoroutineWaitList waitList; std::condition_variable condition; std::mutex lock; std::list connections; v_int64 size = 0; + v_int64 maxConnections; + v_int64 maxConnectionTTL; + private: + std::atomic isOpen; + public: + Pool(v_int64 pMaxConnections, v_int64 pMmaxConnectionTTL) + : maxConnections(pMaxConnections) + , maxConnectionTTL(pMmaxConnectionTTL) + , isOpen(true) + {} + }; public: @@ -80,6 +95,7 @@ public: oatpp::data::stream::IOMode getInputStreamIOMode() override; void invalidate(); + bool isValid(); }; @@ -101,11 +117,13 @@ private: */ static std::shared_ptr popConnection_NON_BLOCKING(Pool* pool); +public: + + static void cleanupTask(std::shared_ptr pool); + private: std::shared_ptr m_pool; std::shared_ptr m_connectionProvider; - v_int64 m_maxConnections; - v_int64 m_maxConnectionTTL; public: /** @@ -114,12 +132,7 @@ public: */ ConnectionPool(const std::shared_ptr& connectionProvider, v_int64 maxConnections, - v_int64 maxConnectionTTL) - : m_pool(std::make_shared()) - , m_connectionProvider(connectionProvider) - , m_maxConnections(maxConnections) - , m_maxConnectionTTL(maxConnectionTTL) - {} + v_int64 maxConnectionTTL); /** * Get connection. diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index c679575b..39ac8591 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -29,6 +29,8 @@ add_executable(oatppAllTests oatpp/encoding/Base64Test.hpp oatpp/encoding/UnicodeTest.cpp oatpp/encoding/UnicodeTest.hpp + oatpp/network/ConnectionPoolTest.cpp + oatpp/network/ConnectionPoolTest.hpp oatpp/network/UrlTest.cpp oatpp/network/UrlTest.hpp oatpp/network/virtual_/InterfaceTest.cpp @@ -64,7 +66,8 @@ add_executable(oatppAllTests oatpp/web/app/ControllerAsync.hpp oatpp/web/app/DTOs.hpp oatpp/web/app/ControllerWithInterceptors.hpp - oatpp/web/app/ControllerWithInterceptorsAsync.hpp) + oatpp/web/app/ControllerWithInterceptorsAsync.hpp +) target_link_libraries(oatppAllTests PRIVATE oatpp PRIVATE oatpp-test) diff --git a/test/oatpp/AllTestsMain.cpp b/test/oatpp/AllTestsMain.cpp index 7a0efa42..799a6210 100644 --- a/test/oatpp/AllTestsMain.cpp +++ b/test/oatpp/AllTestsMain.cpp @@ -15,6 +15,7 @@ #include "oatpp/network/virtual_/PipeTest.hpp" #include "oatpp/network/virtual_/InterfaceTest.hpp" #include "oatpp/network/UrlTest.hpp" +#include "oatpp/network/ConnectionPoolTest.hpp" #include "oatpp/core/data/stream/BufferStreamTest.hpp" #include "oatpp/core/data/stream/ChunkedBufferTest.hpp" @@ -86,6 +87,9 @@ void runTests() { OATPP_RUN_TEST(oatpp::test::encoding::UnicodeTest); OATPP_RUN_TEST(oatpp::test::network::UrlTest); + + OATPP_RUN_TEST(oatpp::test::network::ConnectionPoolTest); + OATPP_RUN_TEST(oatpp::test::network::virtual_::PipeTest); OATPP_RUN_TEST(oatpp::test::network::virtual_::InterfaceTest); diff --git a/test/oatpp/network/ConnectionPoolTest.cpp b/test/oatpp/network/ConnectionPoolTest.cpp new file mode 100644 index 00000000..1791dccc --- /dev/null +++ b/test/oatpp/network/ConnectionPoolTest.cpp @@ -0,0 +1,179 @@ +/*************************************************************************** + * + * Project _____ __ ____ _ _ + * ( _ ) /__\ (_ _)_| |_ _| |_ + * )(_)( /(__)\ )( (_ _)(_ _) + * (_____)(__)(__)(__) |_| |_| + * + * + * Copyright 2018-present, Leonid Stryzhevskyi + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + ***************************************************************************/ + +#include "ConnectionPoolTest.hpp" + +#include "oatpp/network/ConnectionPool.hpp" +#include "oatpp/core/async/Executor.hpp" + +namespace oatpp { namespace test { namespace network { + +namespace { + +typedef oatpp::network::ConnectionPool ConnectionPool; + +class StubStream : public oatpp::data::stream::IOStream, public oatpp::base::Countable { +public: + + data::v_io_size write(const void *buff, data::v_io_size count) override { + throw std::runtime_error("It's a stub!"); + } + + data::v_io_size read(void *buff, data::v_io_size count) override { + throw std::runtime_error("It's a stub!"); + } + + oatpp::async::Action suggestOutputStreamAction(data::v_io_size ioResult) override { + throw std::runtime_error("It's a stub!"); + } + + oatpp::async::Action suggestInputStreamAction(data::v_io_size ioResult) override { + throw std::runtime_error("It's a stub!"); + } + + void setOutputStreamIOMode(oatpp::data::stream::IOMode ioMode) override { + throw std::runtime_error("It's a stub!"); + } + + oatpp::data::stream::IOMode getOutputStreamIOMode() override { + throw std::runtime_error("It's a stub!"); + } + + void setInputStreamIOMode(oatpp::data::stream::IOMode ioMode) override { + throw std::runtime_error("It's a stub!"); + } + + oatpp::data::stream::IOMode getInputStreamIOMode() override { + throw std::runtime_error("It's a stub!"); + } + +}; + +class StubStreamProvider : public oatpp::network::ConnectionProvider { +public: + + StubStreamProvider() + : counter(0) + {} + + std::atomic counter; + + virtual std::shared_ptr getConnection() { + ++ counter; + return std::make_shared(); + } + + virtual oatpp::async::CoroutineStarterForResult&> getConnectionAsync() { + + class ConnectionCoroutine : public oatpp::async::CoroutineWithResult&> { + public: + + Action act() override { + return _return(std::make_shared()); + } + + }; + + ++ counter; + return ConnectionCoroutine::startForResult(); + + } + + virtual void close() { + // DO NOTHING + } + +}; + +class ClientCoroutine : public oatpp::async::Coroutine { +private: + std::shared_ptr m_pool; + std::shared_ptr m_connection; + v_int32 m_repeats; +public: + + ClientCoroutine(const std::shared_ptr& pool) + : m_pool(pool) + , m_repeats(0) + {} + + Action act() override { + return m_pool->getConnectionAsync().callbackTo(&ClientCoroutine::onConnection); + } + + Action onConnection(const std::shared_ptr& connection) { + m_connection = connection; + return yieldTo(&ClientCoroutine::useConnection); + } + + Action useConnection() { + if(m_repeats < 1) { + m_repeats ++; + return waitRepeat(std::chrono::milliseconds(100)); + } + return finish(); + } + +}; + +void clientMethod(std::shared_ptr pool) { + auto connection = pool->getConnection(); + std::this_thread::sleep_for(std::chrono::milliseconds(100)); +} + +} + +void ConnectionPoolTest::onRun() { + + oatpp::async::Executor executor(1, 1, 1); + + auto connectionProvider = std::make_shared(); + auto pool = std::make_shared(connectionProvider, 10 /* maxConnections */, 10 * 1000 * 1000 /* 10s maxConnectionTTL */); + + std::list threads; + + for(v_int32 i = 0; i < 100; i ++ ) { + + threads.push_back(std::thread(clientMethod, pool)); + executor.execute(pool); + + } + + for(std::thread& thread : threads) { + thread.join(); + } + + executor.waitTasksFinished(); + + OATPP_LOGD(TAG, "connections_counter=%d", connectionProvider->counter.load()); + OATPP_ASSERT(connectionProvider->counter <= 10); + + pool->close(); + + executor.stop(); + executor.join(); + +} + +}}} diff --git a/test/oatpp/network/ConnectionPoolTest.hpp b/test/oatpp/network/ConnectionPoolTest.hpp new file mode 100644 index 00000000..b02a8199 --- /dev/null +++ b/test/oatpp/network/ConnectionPoolTest.hpp @@ -0,0 +1,43 @@ +/*************************************************************************** + * + * Project _____ __ ____ _ _ + * ( _ ) /__\ (_ _)_| |_ _| |_ + * )(_)( /(__)\ )( (_ _)(_ _) + * (_____)(__)(__)(__) |_| |_| + * + * + * Copyright 2018-present, Leonid Stryzhevskyi + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + ***************************************************************************/ + +#ifndef oatpp_test_network_ConnectionPoolTest_hpp +#define oatpp_test_network_ConnectionPoolTest_hpp + +#include "oatpp-test/UnitTest.hpp" + +namespace oatpp { namespace test { namespace network { + +class ConnectionPoolTest : public UnitTest { +public: + + ConnectionPoolTest():UnitTest("TEST[network::ConnectionPoolTest]"){} + void onRun() override; + +}; + +}}} + + +#endif // oatpp_test_network_ConnectionPoolTest_hpp