Async processor optimization

This commit is contained in:
lganzzzo 2018-10-02 03:15:15 +03:00
parent af7dbbb0ab
commit 897938bc28
8 changed files with 43 additions and 48 deletions

View File

@ -57,24 +57,18 @@ bool Processor::checkWaitingQueue() {
return hasActions;
}
bool Processor::considerCheckWaitingQueue(bool immediate) {
bool Processor::considerContinueImmediately() {
if(immediate) {
++ m_sleepCountdown;
if(m_sleepCountdown > 1000) {
std::this_thread::yield();
return checkWaitingQueue();
}
} else {
m_sleepCountdown = 0;
if(m_checkWaitingQueueCountdown < 10){
++ m_checkWaitingQueueCountdown;
return true;
}
m_checkWaitingQueueCountdown = 0;
bool hasAction = checkWaitingQueue();
if(hasAction) {
m_inactivityTick = 0;
} else if(m_inactivityTick == 0) {
m_inactivityTick = oatpp::base::Environment::getMicroTickCount();
} else if(oatpp::base::Environment::getMicroTickCount() - m_inactivityTick > 1000 * 100 /* 100 millis */) {
return m_activeQueue.first != nullptr;
}
checkWaitingQueue();
return true;
}
@ -107,7 +101,7 @@ bool Processor::iterate(v_int32 numIterations) {
}
}
return (considerCheckWaitingQueue(m_activeQueue.first == nullptr));
return considerContinueImmediately();
}

View File

@ -34,14 +34,13 @@ class Processor {
private:
bool checkWaitingQueue();
bool considerCheckWaitingQueue(bool immediate);
bool considerContinueImmediately();
private:
oatpp::collection::FastQueue<AbstractCoroutine> m_activeQueue;
oatpp::collection::FastQueue<AbstractCoroutine> m_waitingQueue;
private:
v_int32 m_sleepCountdown = 0;
v_int32 m_checkWaitingQueueCountdown = 0;
v_int64 m_inactivityTick = 0;
public:
void addCoroutine(AbstractCoroutine* coroutine);

View File

@ -204,4 +204,10 @@ void* Environment::getComponent(const std::string& typeName, const std::string&
return componentIt->second;
}
v_int64 Environment::getMicroTickCount(){
std::chrono::microseconds ms = std::chrono::duration_cast<std::chrono::microseconds>
(std::chrono::system_clock::now().time_since_epoch());
return ms.count();
}
}}

View File

@ -154,6 +154,8 @@ public:
static void* getComponent(const std::string& typeName);
static void* getComponent(const std::string& typeName, const std::string& componentName);
static v_int64 getMicroTickCount();
};
#ifndef OATPP_DISABLE_LOGV

View File

@ -26,13 +26,6 @@
namespace oatpp { namespace test {
v_int64 PerformanceChecker::getMicroTickCount(){
std::chrono::microseconds ms = std::chrono::duration_cast<std::chrono::microseconds>
(std::chrono::system_clock::now().time_since_epoch());
return ms.count();
}
ThreadLocalObjectsChecker::ThreadLocalObjectsChecker(const char* tag)
: m_tag(tag)
, m_objectsCount(oatpp::base::Environment::getThreadLocalObjectsCount())

View File

@ -30,24 +30,22 @@
namespace oatpp { namespace test {
class PerformanceChecker {
public:
static v_int64 getMicroTickCount();
private:
const char* m_tag;
v_int64 m_ticks;
public:
PerformanceChecker(const char* tag)
: m_tag(tag)
, m_ticks(getMicroTickCount())
, m_ticks(oatpp::base::Environment::getMicroTickCount())
{}
~PerformanceChecker(){
v_int64 elapsedTicks = getMicroTickCount() - m_ticks;
v_int64 elapsedTicks = oatpp::base::Environment::getMicroTickCount() - m_ticks;
OATPP_LOGD(m_tag, "%d(micro)", elapsedTicks);
}
v_int64 getElapsedTicks(){
return getMicroTickCount() - m_ticks;
return oatpp::base::Environment::getMicroTickCount() - m_ticks;
}
};

View File

@ -29,11 +29,11 @@ namespace oatpp { namespace web { namespace protocol { namespace http { namespac
bool CommunicationUtils::considerConnectionKeepAlive(const std::shared_ptr<protocol::http::incoming::Request>& request,
const std::shared_ptr<protocol::http::outgoing::Response>& response){
/* Set keep-alive to value specified in the client's request, if no Connection header present in response. */
/* Set keep-alive to value specified in response otherwise */
if(request) {
auto& inKeepAlive = request->headers->get(String(Header::CONNECTION, false), nullptr);
/* Set keep-alive to value specified in the client's request, if no Connection header present in response. */
/* Set keep-alive to value specified in response otherwise */
auto& inKeepAlive = request->headers->get(String(Header::CONNECTION, false), nullptr);
if(inKeepAlive && oatpp::base::StrBuffer::equalsCI_FAST(inKeepAlive.get(), Header::Value::CONNECTION_KEEP_ALIVE)) {
if(response->headers->putIfNotExists(String(Header::CONNECTION, false), inKeepAlive)){
return true;
@ -42,18 +42,19 @@ bool CommunicationUtils::considerConnectionKeepAlive(const std::shared_ptr<proto
return (outKeepAlive && oatpp::base::StrBuffer::equalsCI_FAST(outKeepAlive.get(), Header::Value::CONNECTION_KEEP_ALIVE));
}
}
}
/* If protocol == HTTP/1.1 */
/* Set HTTP/1.1 default Connection header value (Keep-Alive), if no Connection header present in response. */
/* Set keep-alive to value specified in response otherwise */
String& protocol = request->startingLine->protocol;
if(protocol && oatpp::base::StrBuffer::equalsCI_FAST(protocol.get(), "HTTP/1.1")) {
if(!response->headers->putIfNotExists(String(Header::CONNECTION, false), String(Header::Value::CONNECTION_KEEP_ALIVE, false))) {
auto& outKeepAlive = response->headers->get(String(Header::CONNECTION, false), nullptr);
return (outKeepAlive && oatpp::base::StrBuffer::equalsCI_FAST(outKeepAlive.get(), Header::Value::CONNECTION_KEEP_ALIVE));
/* If protocol == HTTP/1.1 */
/* Set HTTP/1.1 default Connection header value (Keep-Alive), if no Connection header present in response. */
/* Set keep-alive to value specified in response otherwise */
String& protocol = request->startingLine->protocol;
if(protocol && oatpp::base::StrBuffer::equalsCI_FAST(protocol.get(), "HTTP/1.1")) {
if(!response->headers->putIfNotExists(String(Header::CONNECTION, false), String(Header::Value::CONNECTION_KEEP_ALIVE, false))) {
auto& outKeepAlive = response->headers->get(String(Header::CONNECTION, false), nullptr);
return (outKeepAlive && oatpp::base::StrBuffer::equalsCI_FAST(outKeepAlive.get(), Header::Value::CONNECTION_KEEP_ALIVE));
}
return true;
}
return true;
}
/* If protocol != HTTP/1.1 */

View File

@ -69,20 +69,22 @@ void AsyncHttpConnectionHandler::Task::run(){
/* Load all waiting connections into processor */
consumeConnections(processor);
/* Process all, and check for incoming connections once in 100 iterations */
while (processor.iterate(100)) {
/* Process all, and check for incoming connections once in 1000 iterations */
while (processor.iterate(1000)) {
consumeConnections(processor);
}
std::unique_lock<std::mutex> lock(m_taskMutex);
if(processor.isEmpty()) {
/* No tasks in the processor. Wait for incoming connections */
//OATPP_LOGD("proc", "waiting for new connections");
while (m_connections.getFirstNode() == nullptr) {
m_taskCondition.wait(lock);
}
} else {
/* There is still something in slow queue. Wait and get back to processing */
/* Waiting for IO is not Applicable here as slow queue may contain NON-IO tasks */
//OATPP_LOGD("proc", "waiting slow queue");
m_taskCondition.wait_for(lock, std::chrono::milliseconds(10));
}