diff --git a/core/base/Config.hpp b/core/base/Config.hpp index b7a115c9..00055288 100644 --- a/core/base/Config.hpp +++ b/core/base/Config.hpp @@ -37,6 +37,20 @@ */ //#define OATPP_DISABLE_ENV_OBJECT_COUNTERS +/** + * Define this to disable memory-pool allocations. + * This will make oatpp::base::memory::MemoryPool, method obtain and free call new and delete directly + */ +//#define OATPP_DISABLE_POOL_ALLOCATIONS + +/** + * Number of shards of ThreadDistributedMemoryPool (Default pool for many oatpp objects) + * Higher number reduces threads racing for resources on each shard. + */ +#ifndef OATPP_THREAD_DISTRIBUTED_MEM_POOL_SHARDS_COUNT + #define OATPP_THREAD_DISTRIBUTED_MEM_POOL_SHARDS_COUNT 10 +#endif + /** * DISABLE logs level V */ diff --git a/core/base/StrBuffer.cpp b/core/base/StrBuffer.cpp index 19ec4ee0..126e1c02 100644 --- a/core/base/StrBuffer.cpp +++ b/core/base/StrBuffer.cpp @@ -44,11 +44,18 @@ void StrBuffer::setAndCopy(const void* data, const void* originData, v_int32 siz std::shared_ptr StrBuffer::allocShared(const void* data, v_int32 size, bool copyAsOwnData) { if(copyAsOwnData) { + //OATPP_LOGD("StrBuffer", "allocating. size=%d, str='%s'", size, data); memory::AllocationExtras extras(size + 1); - const auto& ptr = memory::allocateSharedWithExtras(extras); + std::shared_ptr ptr; + if(size > 100) { + ptr = memory::allocateSharedWithExtras(extras); + } else { + ptr = poolAllocateSharedWithExtras(extras, getSmallStringPool()); + } ptr->setAndCopy(extras.extraPtr, data, size); return ptr; } + //OATPP_LOGD("StrBuffer", "making. size=%d, str='%s'", size, data); return std::make_shared(data, size, copyAsOwnData); } diff --git a/core/base/StrBuffer.hpp b/core/base/StrBuffer.hpp index f5375ba8..8c0c6508 100644 --- a/core/base/StrBuffer.hpp +++ b/core/base/StrBuffer.hpp @@ -33,8 +33,178 @@ namespace oatpp { namespace base { class StrBuffer : public oatpp::base::Controllable { +public: + + class SmStringPool { + private: + + class EntryHeader { + public: + + EntryHeader(SmStringPool* pPool, EntryHeader* pNext) + : pool(pPool) + , next(pNext) + {} + + SmStringPool* pool; + EntryHeader* next; + + }; + + private: + + void allocChunk() { + v_int32 entryBlockSize = sizeof(EntryHeader) + m_entrySize; + v_int32 chunkMemSize = entryBlockSize * m_chunkSize; + p_char8 mem = new v_char8[chunkMemSize]; + m_chunks.push_back(mem); + for(v_int32 i = 0; i < m_chunkSize; i++){ + EntryHeader* entry = new (mem + i * entryBlockSize) EntryHeader(this, m_rootEntry); + m_rootEntry = entry; + } + } + + private: + std::string m_name; + v_int32 m_entrySize; + v_int32 m_chunkSize; + std::list m_chunks; + EntryHeader* m_rootEntry; + oatpp::concurrency::SpinLock::Atom m_atom; + v_int32 m_objectsCount; + public: + + SmStringPool(const std::string& name, v_int32 entrySize, v_int32 chunkSize) + : m_name(name) + , m_entrySize(entrySize) + , m_chunkSize(chunkSize) + , m_rootEntry(nullptr) + , m_atom(false) + , m_objectsCount(0) + { + allocChunk(); + } + + ~SmStringPool() { + auto it = m_chunks.begin(); + while (it != m_chunks.end()) { + p_char8 chunk = *it; + delete [] chunk; + it++; + } + } + + void* obtain() { +#ifdef OATPP_DISABLE_POOL_ALLOCATIONS + return new v_char8[m_entrySize]; +#else + //oatpp::concurrency::SpinLock lock(m_atom); + if(m_rootEntry != nullptr) { + auto entry = m_rootEntry; + m_rootEntry = m_rootEntry->next; + ++ m_objectsCount; + return ((p_char8) entry) + sizeof(EntryHeader); + } else { + allocChunk(); + if(m_rootEntry == nullptr) { + throw std::runtime_error("oatpp::base::memory::MemoryPool: Unable to allocate entry"); + } + auto entry = m_rootEntry; + m_rootEntry = m_rootEntry->next; + ++ m_objectsCount; + return ((p_char8) entry) + sizeof(EntryHeader); + } +#endif + } + + void freeByEntryHeader(EntryHeader* entry) { + oatpp::concurrency::SpinLock lock(m_atom); + entry->next = m_rootEntry; + m_rootEntry = entry; + -- m_objectsCount; + } + + static void free(void* entry) { +#ifdef OATPP_DISABLE_POOL_ALLOCATIONS + delete [] ((p_char8) entry); +#else + EntryHeader* header = (EntryHeader*)(((p_char8) entry) - sizeof (EntryHeader)); + header->pool->freeByEntryHeader(header); +#endif + } + + std::string getName(){ + return m_name; + } + + v_int32 getEntrySize(){ + return m_entrySize; + } + + v_int64 getSize(){ + return m_chunks.size() * m_chunkSize; + } + + v_int32 getObjectsCount(){ + return m_objectsCount; + } + + }; + + typedef oatpp::base::memory::AllocationExtras AllocationExtras; + + template + class SmStringPoolSharedObjectAllocator { + public: + typedef T value_type; + public: + AllocationExtras& m_info; + P* m_pool; + public: + + SmStringPoolSharedObjectAllocator(AllocationExtras& info, P* pool) + : m_info(info) + , m_pool(pool) + {}; + + template + SmStringPoolSharedObjectAllocator(const SmStringPoolSharedObjectAllocator& other) + : m_info(other.m_info) + , m_pool(other.m_pool) + {}; + + T* allocate(std::size_t n) { + void* mem; + //OATPP_LOGD("Allocator", "Pool. Size=%d", sizeWanted); + mem = m_pool->obtain(); + m_info.baseSize = sizeof(T); + m_info.extraPtr = &((p_char8) mem)[sizeof(T)]; + return static_cast(mem); + } + + void deallocate(T* ptr, size_t n) { + //OATPP_LOGD("Allocator", "Free Pool"); + //oatpp::base::memory::MemoryPool::free(ptr); + SmStringPool::free(ptr); + } + + }; + + template + static std::shared_ptr poolAllocateSharedWithExtras(AllocationExtras& extras, P* pool, Args... args){ + typedef SmStringPoolSharedObjectAllocator _Allocator; + _Allocator allocator(extras, pool); + return std::allocate_shared(allocator, args...); + } + public: OBJECT_POOL_THREAD_LOCAL(StrBuffer_Pool, StrBuffer, 32) +private: + static SmStringPool* getSmallStringPool() { + static thread_local SmStringPool pool("Small_String_Pool", 200, 32); + //static oatpp::base::memory::ThreadDistributedMemoryPool pool("Small_String_Pool", 300, 128); + return &pool; + } private: p_char8 m_data; v_int32 m_size; diff --git a/core/base/memory/MemoryPool.cpp b/core/base/memory/MemoryPool.cpp index 12da04d0..5e32e37d 100644 --- a/core/base/memory/MemoryPool.cpp +++ b/core/base/memory/MemoryPool.cpp @@ -32,7 +32,7 @@ std::unordered_map MemoryPool::POOLS; std::atomic MemoryPool::poolIdCounter(0); ThreadDistributedMemoryPool::ThreadDistributedMemoryPool(const std::string& name, v_int32 entrySize, v_int32 chunkSize) - : m_shardsCount(10) + : m_shardsCount(OATPP_THREAD_DISTRIBUTED_MEM_POOL_SHARDS_COUNT) , m_shards(new MemoryPool*[m_shardsCount]) { for(v_int32 i = 0; i < m_shardsCount; i++){ diff --git a/core/base/memory/MemoryPool.hpp b/core/base/memory/MemoryPool.hpp index c4c67f01..5713b6f1 100644 --- a/core/base/memory/MemoryPool.hpp +++ b/core/base/memory/MemoryPool.hpp @@ -34,8 +34,6 @@ #include //#define OATPP_DISABLE_POOL_ALLOCATIONS -//#ifndef OATPP_MEMORY_POOL_SHARDING - namespace oatpp { namespace base { namespace memory { class MemoryPool { diff --git a/web/protocol/http/outgoing/BufferBody.hpp b/web/protocol/http/outgoing/BufferBody.hpp index f488ccfb..f947dc8f 100644 --- a/web/protocol/http/outgoing/BufferBody.hpp +++ b/web/protocol/http/outgoing/BufferBody.hpp @@ -48,7 +48,7 @@ public: } void declareHeaders(const std::shared_ptr& headers) noexcept override { - headers->put(oatpp::web::protocol::http::Header::CONTENT_LENGTH, + headers->put(oatpp::String(oatpp::web::protocol::http::Header::CONTENT_LENGTH, false), oatpp::utils::conversion::int32ToStr(m_buffer->getSize())); } diff --git a/web/server/HttpProcessor.cpp b/web/server/HttpProcessor.cpp index 32bcca61..52f959ab 100644 --- a/web/server/HttpProcessor.cpp +++ b/web/server/HttpProcessor.cpp @@ -33,10 +33,10 @@ bool HttpProcessor::considerConnectionKeepAlive(const std::shared_ptr& response){ if(request) { - auto& inKeepAlive = request->headers->get(protocol::http::Header::CONNECTION, nullptr); + auto& inKeepAlive = request->headers->get(oatpp::String(protocol::http::Header::CONNECTION, false), nullptr); if(inKeepAlive && oatpp::base::StrBuffer::equalsCI_FAST(inKeepAlive.get(), protocol::http::Header::Value::CONNECTION_KEEP_ALIVE)) { - if(response->headers->putIfNotExists(protocol::http::Header::CONNECTION, inKeepAlive)){ + if(response->headers->putIfNotExists(oatpp::String(protocol::http::Header::CONNECTION, false), inKeepAlive)){ return true; } else { auto& outKeepAlive = response->headers->get(protocol::http::Header::CONNECTION, nullptr); @@ -45,8 +45,8 @@ bool HttpProcessor::considerConnectionKeepAlive(const std::shared_ptrheaders->putIfNotExists(protocol::http::Header::CONNECTION, protocol::http::Header::Value::CONNECTION_CLOSE)) { - auto& outKeepAlive = response->headers->get(protocol::http::Header::CONNECTION, nullptr); + if(!response->headers->putIfNotExists(oatpp::String(protocol::http::Header::CONNECTION, false), oatpp::String(protocol::http::Header::Value::CONNECTION_CLOSE, false))) { + auto& outKeepAlive = response->headers->get(oatpp::String(protocol::http::Header::CONNECTION, false), nullptr); return (outKeepAlive && oatpp::base::StrBuffer::equalsCI_FAST(outKeepAlive.get(), protocol::http::Header::Value::CONNECTION_KEEP_ALIVE)); } @@ -111,8 +111,8 @@ HttpProcessor::processRequest(HttpRouter* router, return errorHandler->handleError(protocol::http::Status::CODE_500, "Unknown error"); } - response->headers->putIfNotExists(protocol::http::Header::SERVER, - protocol::http::Header::Value::SERVER); + response->headers->putIfNotExists(oatpp::String(protocol::http::Header::SERVER, false), + oatpp::String(protocol::http::Header::Value::SERVER, false)); keepAlive = HttpProcessor::considerConnectionKeepAlive(request, response); return response;