async processor POC

This commit is contained in:
lganzzzo 2018-03-18 19:00:46 +02:00
parent e54b64343f
commit cc1f7f4541
19 changed files with 810 additions and 234 deletions

View File

@ -0,0 +1,9 @@
//
// Processor.cpp
// crud
//
// Created by Leonid on 3/17/18.
// Copyright © 2018 oatpp. All rights reserved.
//
#include "Processor.hpp"

View File

@ -0,0 +1,231 @@
//
// Processor.hpp
// crud
//
// Created by Leonid on 3/17/18.
// Copyright © 2018 oatpp. All rights reserved.
//
#ifndef oatpp_async_Processor_hpp
#define oatpp_async_Processor_hpp
#include "./Routine.hpp"
#include "../concurrency/SpinLock.hpp"
namespace oatpp { namespace async {
class Processor {
public:
Processor& getThreadLocalProcessor(){
static thread_local Processor processor;
return processor;
}
private:
class Queue {
public:
class Entry {
public:
Entry(Routine* pRoutine, Entry* pNext)
: routine(pRoutine)
, next(pNext)
{}
Routine* routine;
Entry* next;
};
private:
oatpp::concurrency::SpinLock::Atom m_atom;
Entry* m_first;
Entry* m_last;
public:
Queue()
: m_atom(false)
, m_first(nullptr)
, m_last(nullptr)
{}
Entry* peekFront() {
return m_first;
}
Entry* popFront() {
oatpp::concurrency::SpinLock lock(m_atom);
auto result = m_first;
if(m_first != nullptr) {
m_first = m_first->next;
if(m_first == nullptr) {
m_last = nullptr;
}
}
return result;
}
void pushFront(Routine* routine) {
pushFront(new Entry(routine, nullptr));
}
void pushFront(Entry* entry){
oatpp::concurrency::SpinLock lock(m_atom);
entry->next = m_first;
m_first = entry;
if(m_last == nullptr) {
m_last = entry;
}
}
void pushBack(Routine* routine) {
pushBack(new Entry(routine, nullptr));
}
void pushBack(Entry* entry) {
oatpp::concurrency::SpinLock lock(m_atom);
entry->next = nullptr;
if(m_last != nullptr) {
m_last->next = entry;
m_last = entry;
} else {
m_first = entry;
m_last = entry;
}
}
};
private:
void abortCurrentRoutine(){
auto entry = m_queue.popFront();
auto curr = entry->routine;
while(curr != nullptr) {
auto parent = curr->m_parent;
delete curr;
curr = parent;
}
delete entry;
}
void returnFromCurrentRoutine(){
//OATPP_LOGD("R", "_return");
auto entry = m_queue.popFront();
auto routine = entry->routine->m_parent;
delete entry->routine;
if(routine != nullptr) {
entry->routine = routine;
routine->blocks.popNoData();
m_queue.pushBack(entry);
} else {
delete entry;
}
}
void doAction(Action& a){
if(a.getType() == Action::TYPE_RETRY) {
m_queue.pushBack(m_queue.popFront());
return;
} else if(a.getType() == Action::TYPE_RETURN) {
auto entry = m_queue.popFront();
auto routine = entry->routine->m_parent;
delete entry->routine;
delete entry;
if(routine != nullptr) {
m_queue.pushBack(routine);
}
return;
} else if(a.getType() == Action::TYPE_ABORT){
abortCurrentRoutine();
return;
} else if(a.getType() == Action::TYPE_ROUTINE) {
auto entry = m_queue.popFront();
if(!a.m_routine->blocks.isEmpty()){
Routine* r = a.m_routine;
a.m_routine = nullptr;
r->m_parent = entry->routine;
entry->routine = r;
} else {
entry->routine->blocks.popNoData();
}
m_queue.pushBack(entry);
return;
}
throw std::runtime_error("Invalid action type");
}
void propagateError(Error& error){
Routine* curr = m_queue.peekFront()->routine;
while (curr != nullptr) {
if(!curr->blocks.isEmpty()) {
auto block = curr->blocks.peek();
if(block.errorHandler != nullptr) {
try {
auto action = block.errorHandler(error);
if(action.isErrorAction()) {
error = action.getError();
} else {
doAction(action);
return;
}
} catch(...) {
error = {"Unknown", true};
}
}
}
auto parent = curr->m_parent;
auto entry = m_queue.popFront();
delete curr;
delete entry;
curr = parent;
if(curr != nullptr) {
m_queue.pushFront(curr);
}
}
}
private:
Queue m_queue;
public:
void addRoutine(const Routine::Builder& routine){
m_queue.pushBack(routine.m_routine);
routine.m_routine = nullptr;
}
bool iterate() {
auto entry = m_queue.peekFront();
if(entry != nullptr) {
auto r = entry->routine;
if(r->blocks.isEmpty()){
returnFromCurrentRoutine();
return true;
}
auto& block = r->blocks.peek();
try{
Action action = block.function();
if(action.isErrorAction()){
propagateError(action.getError());
} else {
doAction(action);
}
} catch(...) {
Error error {"Unknown", true };
propagateError(error);
}
return true;
}
return false;
}
};
}}
#endif /* Processor_hpp */

View File

@ -0,0 +1,19 @@
//
// Block.cpp
// crud
//
// Created by Leonid on 3/17/18.
// Copyright © 2018 oatpp. All rights reserved.
//
#include "Routine.hpp"
namespace oatpp { namespace async {
const v_int32 Action::TYPE_RETRY = 0;
const v_int32 Action::TYPE_RETURN = 1;
const v_int32 Action::TYPE_ABORT = 2;
const v_int32 Action::TYPE_ERROR = 3;
const v_int32 Action::TYPE_ROUTINE = 4;
}}

164
core/src/async/Routine.hpp Normal file
View File

@ -0,0 +1,164 @@
//
// Block.hpp
// crud
//
// Created by Leonid on 3/17/18.
// Copyright © 2018 oatpp. All rights reserved.
//
#ifndef oatpp_async_Block_hpp
#define oatpp_async_Block_hpp
#include "Stack.hpp"
#include "../base/Environment.hpp"
#include <functional>
namespace oatpp { namespace async {
class Action; // FWD
struct Error {
public:
const char* error;
bool isExceptionThrown;
};
struct Block {
public:
typedef std::function<Action ()> Function;
typedef std::function<Action (const Error&)> ErrorHandler;
public:
Function function;
ErrorHandler errorHandler;
};
class Routine {
friend Processor;
private:
Routine* m_parent;
public:
Routine()
: m_parent(nullptr)
{}
Stack<Block> blocks;
public:
class Builder {
friend Action;
friend Processor;
private:
mutable Routine* m_routine;
public:
Builder()
: m_routine(new Routine())
{}
Builder(const Builder& other)
: m_routine(other.m_routine)
{
other.m_routine = nullptr;
}
~Builder() {
if(m_routine != nullptr) {
delete m_routine;
}
}
Builder& _then(const Block& block){
m_routine->blocks.pushBack(block);
return *this;
}
};
public:
static Builder _do(const Block& block){
Builder builder;
builder._then(block);
return builder;
}
};
class Action {
friend Processor;
public:
static const v_int32 TYPE_RETRY;
static const v_int32 TYPE_RETURN;
static const v_int32 TYPE_ABORT;
static const v_int32 TYPE_ERROR;
static const v_int32 TYPE_ROUTINE;
public:
static Action& _retry() {
static Action a(TYPE_RETRY);
return a;
}
static Action& _return(){
static Action a(TYPE_RETURN);
return a;
}
static Action& _abort(){
static Action a(TYPE_ABORT);
return a;
}
private:
v_int32 m_type;
Error m_error;
Routine* m_routine;
private:
Action(v_int32 type)
: m_type(type)
{}
public:
Action(const Error& error)
: m_type (TYPE_ERROR)
, m_error(error)
, m_routine(nullptr)
{}
Action(const Routine::Builder& routine)
: m_type(TYPE_ROUTINE)
, m_error({nullptr, false})
, m_routine(routine.m_routine)
{
routine.m_routine = nullptr;
}
Action(std::nullptr_t nullp)
: m_type(TYPE_ROUTINE)
, m_error({nullptr, false})
, m_routine(new Routine())
{}
~Action() {
if(m_routine != nullptr){
delete m_routine;
}
}
Error& getError(){
return m_error;
}
bool isErrorAction(){
return m_type == TYPE_ERROR;
}
v_int32 getType(){
return m_type;
}
};
}}
#endif /* oatpp_async_Block_hpp */

9
core/src/async/Stack.cpp Normal file
View File

@ -0,0 +1,9 @@
//
// Stack.cpp
// crud
//
// Created by Leonid on 3/17/18.
// Copyright © 2018 oatpp. All rights reserved.
//
#include "Stack.hpp"

123
core/src/async/Stack.hpp Normal file
View File

@ -0,0 +1,123 @@
//
// Stack.hpp
// crud
//
// Created by Leonid on 3/17/18.
// Copyright © 2018 oatpp. All rights reserved.
//
#ifndef oatpp_async_Stack_hpp
#define oatpp_async_Stack_hpp
#include <stdexcept>
namespace oatpp { namespace async {
class Processor;
template<class T>
class Stack {
friend Processor;
private:
class Entry {
public:
Entry(const T& pData, Entry* pNext)
: data(pData)
, next(pNext)
{}
T data;
Entry* next;
};
private:
void null(){
m_root = nullptr;
m_last = nullptr;
}
private:
Entry* m_root;
Entry* m_last;
public:
Stack()
: m_root(nullptr)
, m_last(nullptr)
{}
~Stack(){
clear();
}
void push(const T& data){
m_root = new Entry(data, m_root);
if(m_last == nullptr) {
m_last = m_root;
}
}
void pushBack(const T& data){
Entry* entry = new Entry(data, nullptr);
if(m_last != nullptr) {
m_last->next = entry;
m_last = entry;
} else {
m_root = entry;
m_last = entry;
}
}
T& peek() const {
if(m_root != nullptr) {
return m_root->data;
} else {
throw std::runtime_error("Peek on empty stack");
}
}
T pop() {
if(m_root != nullptr) {
Entry* resultEntry = m_root;
m_root = m_root->next;
if(m_root == nullptr){
m_last = nullptr;
}
T result = resultEntry->data;
delete resultEntry;
return result;
} else {
throw std::runtime_error("Pop on empty stack");
}
}
void popNoData() {
if(m_root != nullptr) {
Entry* resultEntry = m_root;
m_root = m_root->next;
if(m_root == nullptr){
m_last = nullptr;
}
delete resultEntry;
} else {
throw std::runtime_error("Pop on empty stack");
}
}
void clear(){
Entry* curr = m_root;
while (curr != nullptr) {
Entry* next = curr->next;
delete curr;
curr = next;
}
m_root = nullptr;
m_last = nullptr;
}
bool isEmpty() const {
return m_root == nullptr;
}
};
}}
#endif /* Stack_hpp */

View File

@ -27,6 +27,10 @@
namespace oatpp { namespace data{ namespace stream {
os::io::Library::v_size IOStream::ERROR_NOTHING_TO_READ = -1001;
os::io::Library::v_size IOStream::ERROR_CLOSED = -1002;
os::io::Library::v_size IOStream::ERROR_TRY_AGAIN = -1003;
os::io::Library::v_size OutputStream::writeAsString(v_int32 value){
v_char8 a[100];
v_int32 size = utils::conversion::int32ToCharSequence(value, &a[0]);

View File

@ -67,6 +67,10 @@ public:
};
class IOStream : public InputStream, public OutputStream {
public:
static os::io::Library::v_size ERROR_NOTHING_TO_READ;
static os::io::Library::v_size ERROR_CLOSED;
static os::io::Library::v_size ERROR_TRY_AGAIN;
public:
typedef os::io::Library::v_size v_size;
};

View File

@ -8,8 +8,6 @@
#include "AsyncConnection.hpp"
#include "io/AsyncIOStream.hpp"
namespace oatpp { namespace network {
AsyncConnection::AsyncConnection(Library::v_handle handle)
@ -22,29 +20,33 @@ AsyncConnection::~AsyncConnection(){
AsyncConnection::Library::v_size AsyncConnection::write(const void *buff, Library::v_size count){
auto result = Library::handle_write(m_handle, buff, count); // Socket should be non blocking!!!
if(result == EAGAIN || result == EWOULDBLOCK){
return io::AsyncIOStream::ERROR_TRY_AGAIN;
} else if(result == -1) {
return io::AsyncIOStream::ERROR_NOTHING_TO_READ;
} else if(result == 0) {
return io::AsyncIOStream::ERROR_CLOSED;
if(result < 0) {
auto e = errno;
//OATPP_LOGD("write", "errno=%d", e);
if(e == EAGAIN || e == EWOULDBLOCK){
return ERROR_TRY_AGAIN;
}
}
return result;
}
AsyncConnection::Library::v_size AsyncConnection::read(void *buff, Library::v_size count){
//OATPP_LOGD("AsyncConnection", "read. handler=%d", m_handle);
auto result = Library::handle_read(m_handle, buff, count); // Socket should be non blocking!!!
if(result == EAGAIN || result == EWOULDBLOCK){
return io::AsyncIOStream::ERROR_TRY_AGAIN;
} else if(result == -1) {
return io::AsyncIOStream::ERROR_NOTHING_TO_READ;
} else if(result == 0) {
return io::AsyncIOStream::ERROR_CLOSED;
if(result < 0) {
auto e = errno;
//OATPP_LOGD("read", "errno=%d", e);
if(e == EAGAIN || e == EWOULDBLOCK){
return ERROR_TRY_AGAIN;
}
}
return result;
}
void AsyncConnection::close(){
//OATPP_LOGD("Connection", "close()");
Library::handle_close(m_handle);
}

View File

@ -11,6 +11,7 @@
#include "../../../oatpp-lib/core/src/base/memory/ObjectPool.hpp"
#include "../../../oatpp-lib/core/src/data/stream/Stream.hpp"
#include "../../../oatpp-lib/core/src/async/Routine.hpp"
#include "../../../oatpp-lib/core/src/os/io/Library.hpp"
namespace oatpp { namespace network {

View File

@ -1,17 +0,0 @@
//
// Connection.cpp
// crud
//
// Created by Leonid on 3/16/18.
// Copyright © 2018 oatpp. All rights reserved.
//
#include "AsyncIOStream.hpp"
namespace oatpp { namespace network { namespace io {
const v_int32 AsyncIOStream::ERROR_TRY_AGAIN = -1001;
const v_int32 AsyncIOStream::ERROR_NOTHING_TO_READ = -1002;
const v_int32 AsyncIOStream::ERROR_CLOSED = -1003;
}}}

View File

@ -1,98 +0,0 @@
//
// AsyncIOStream.hpp
// crud
//
// Created by Leonid on 3/16/18.
// Copyright © 2018 oatpp. All rights reserved.
//
#ifndef oatpp_web_server_io_AsyncIOStream_hpp
#define oatpp_web_server_io_AsyncIOStream_hpp
#include "./Queue.hpp"
#include "../../../core/src/base/memory/ObjectPool.hpp"
#include "../../../core/src/data/stream/Stream.hpp"
#include "../../../core/src/base/Controllable.hpp"
namespace oatpp { namespace network { namespace io {
class AsyncIOStream : public oatpp::base::Controllable, public oatpp::data::stream::IOStream {
public:
typedef oatpp::os::io::Library Library;
public:
static const v_int32 ERROR_TRY_AGAIN;
static const v_int32 ERROR_NOTHING_TO_READ;
static const v_int32 ERROR_CLOSED;
public:
OBJECT_POOL(AsyncIOStream_Pool, AsyncIOStream, 32);
SHARED_OBJECT_POOL(Shared_AsyncIOStream_Pool, AsyncIOStream, 32);
private:
std::shared_ptr<oatpp::data::stream::IOStream> m_stream;
Queue& m_queue;
private:
void processRequest(IORequest& request) {
m_queue.pushBack(request);
while (request.type != IORequest::TYPE_DONE) {
Queue::Entry* entry = m_queue.popFront();
if(entry != nullptr) {
IORequest& request = entry->request;
if(request.type == IORequest::TYPE_READ) {
request.actualSize = request.stream->read(request.data, request.size);
} else if(request.type == IORequest::TYPE_WRITE) {
request.actualSize = request.stream->write(request.data, request.size);
} else {
throw std::runtime_error("Invalid state of IORequest");
}
if(request.actualSize != ERROR_TRY_AGAIN) {
request.type = IORequest::TYPE_DONE;
delete entry;
} else {
m_queue.pushBack(entry);
}
} else if(request.type == IORequest::TYPE_DONE) {
return;
} else {
throw std::runtime_error("Invalid state of Queue");
}
std::this_thread::yield();
}
}
public:
AsyncIOStream(const std::shared_ptr<oatpp::data::stream::IOStream>& stream,
Queue& queue = Queue::getInstance())
: m_stream(stream)
, m_queue(queue)
{}
public:
static std::shared_ptr<AsyncIOStream> createShared(const std::shared_ptr<oatpp::data::stream::IOStream>& stream,
Queue& queue = Queue::getInstance()){
return Shared_AsyncIOStream_Pool::allocateShared(stream, std::ref(queue));
}
Library::v_size write(const void *buff, Library::v_size count) override {
//auto result = m_stream->write(buff, count);
//if(result == ERROR_TRY_AGAIN) {
IORequest request(m_stream, const_cast<void*>(buff), count, IORequest::TYPE_WRITE);
processRequest(request);
return request.actualSize;
//}
//return result;
}
Library::v_size read(void *buff, Library::v_size count) override {
//auto result = m_stream->read(buff, count);
//if(result == ERROR_TRY_AGAIN) {
IORequest request(m_stream, buff, count, IORequest::TYPE_READ);
processRequest(request);
return request.actualSize;
//}
//return result;
}
};
}}}
#endif /* oatpp_web_server_io_AsyncIOStream_hpp */

View File

@ -55,8 +55,8 @@ public:
template<class Type>
static typename Type::PtrWrapper decodeToDTO(const std::shared_ptr<Protocol::Headers>& headers,
const std::shared_ptr<oatpp::data::stream::InputStream>& bodyStream,
const std::shared_ptr<oatpp::data::mapping::ObjectMapper>& objectMapper){
const std::shared_ptr<oatpp::data::stream::InputStream>& bodyStream,
const std::shared_ptr<oatpp::data::mapping::ObjectMapper>& objectMapper){
return objectMapper->readFromString<Type>(decodeToString(headers, bodyStream));
}

View File

@ -56,4 +56,36 @@ void Response::send(const std::shared_ptr<data::stream::OutputStream>& stream){
}
oatpp::async::Action Response::sendAsync(const std::shared_ptr<data::stream::OutputStream>& stream){
if(body){
body->declareHeaders(headers);
} else {
headers->put(Header::CONTENT_LENGTH, "0");
}
stream->write("HTTP/1.1 ", 9);
stream->writeAsString(status.code);
stream->write(" ", 1);
stream->OutputStream::write(status.description);
stream->write("\r\n", 2);
auto curr = headers->getFirstEntry();
while(curr != nullptr){
stream->write(curr->getKey()->getData(), curr->getKey()->getSize());
stream->write(": ", 2);
stream->write(curr->getValue()->getData(), curr->getValue()->getSize());
stream->write("\r\n", 2);
curr = curr->getNext();
}
stream->write("\r\n", 2);
if(body) {
body->writeToStream(stream);
}
return nullptr; // TODO make Async
}
}}}}}

View File

@ -27,6 +27,7 @@
#include "./Body.hpp"
#include "./../Http.hpp"
#include "../../../../../core/src/async/Routine.hpp"
namespace oatpp { namespace web { namespace protocol { namespace http { namespace outgoing {
@ -56,6 +57,8 @@ public:
void send(const std::shared_ptr<data::stream::OutputStream>& stream);
oatpp::async::Action sendAsync(const std::shared_ptr<data::stream::OutputStream>& stream);
};
}}}}}

View File

@ -33,7 +33,6 @@
#include "./HttpError.hpp"
#include "../../../../oatpp-lib/network/src/io/AsyncIOStream.hpp"
#include "../../../../oatpp-lib/network/src/AsyncConnection.hpp"
#include "../../../../oatpp-lib/core/test/Checker.hpp"
@ -44,11 +43,15 @@ namespace oatpp { namespace web { namespace server {
void AsyncHttpConnectionHandler::Task::run(){
//Backlog& backlog = Backlog::getInstance();
while(true) {
while (m_processor.iterate()) {
}
std::this_thread::sleep_for(std::chrono::milliseconds(10));
}
/*
while(true) {
Backlog::Entry* entry = m_backLog.popFront();
Backlog::Entry* entry = backlog.popFront();
if(entry != nullptr) {
auto& state = entry->connectionState;
auto response = HttpProcessor::processRequest(m_router, state->connection, m_errorHandler,
@ -63,7 +66,7 @@ void AsyncHttpConnectionHandler::Task::run(){
}
if(state->keepAlive){
m_backLog.pushBack(entry);
backlog.pushBack(entry);
} else {
delete entry;
}
@ -72,26 +75,38 @@ void AsyncHttpConnectionHandler::Task::run(){
}
}
*/
}
void AsyncHttpConnectionHandler::handleConnection(const std::shared_ptr<oatpp::data::stream::IOStream>& connection){
auto task = m_tasks[m_taskBalancer % m_threadCount];
auto asyncConnection = oatpp::network::io::AsyncIOStream::createShared(connection, task->getIOQueue());
auto ioBuffer = oatpp::data::buffer::IOBuffer::createShared();
auto state = ConnectionState::createShared();
state->connection = asyncConnection;
auto state = HttpProcessor::ConnectionState::createShared();
state->connection = connection;
state->ioBuffer = ioBuffer;
state->outStream = oatpp::data::stream::OutputStreamBufferedProxy::createShared(asyncConnection, ioBuffer);
state->inStream = oatpp::data::stream::InputStreamBufferedProxy::createShared(asyncConnection, ioBuffer);
state->outStream = oatpp::data::stream::OutputStreamBufferedProxy::createShared(connection, ioBuffer);
state->inStream = oatpp::data::stream::InputStreamBufferedProxy::createShared(connection, ioBuffer);
state->keepAlive = true;
//Backlog::getInstance().pushBack(state);
task->getBacklog().pushBack(state);
auto routine = oatpp::async::Routine::_do({
[this, state]{
return HttpProcessor::processRequestAsync(m_router.get(), m_errorHandler, state);
}, [] (const oatpp::async::Error& error) {
//OATPP_LOGD("AsyncHttpConnectionHandler", "received error");
if(error.error == HttpProcessor::RETURN_KEEP_ALIVE) {
return oatpp::async::Action::_retry();
}
return oatpp::async::Action(nullptr);
}
});
task->getProcessor().addRoutine(routine);
m_taskBalancer ++;
}
}}}

View File

@ -32,8 +32,6 @@
#include "../protocol/http/incoming/Request.hpp"
#include "../protocol/http/outgoing/Response.hpp"
#include "../../../../oatpp-lib/network/src/io/Queue.hpp"
#include "../../../../oatpp-lib/network/src/server/Server.hpp"
#include "../../../../oatpp-lib/network/src/Connection.hpp"
@ -43,95 +41,18 @@
#include "../../../../oatpp-lib/core/src/data/stream/StreamBufferedProxy.hpp"
#include "../../../../oatpp-lib/core/src/data/buffer/IOBuffer.hpp"
#include "../../../../oatpp-lib/core/src/async/Processor.hpp"
namespace oatpp { namespace web { namespace server {
class AsyncHttpConnectionHandler : public base::Controllable, public network::server::ConnectionHandler {
private:
class ConnectionState {
SHARED_OBJECT_POOL(ConnectionState_Pool, ConnectionState, 32)
public:
static std::shared_ptr<ConnectionState> createShared(){
return ConnectionState_Pool::allocateShared();
}
std::shared_ptr<oatpp::data::stream::IOStream> connection;
std::shared_ptr<oatpp::data::buffer::IOBuffer> ioBuffer;
std::shared_ptr<oatpp::data::stream::OutputStreamBufferedProxy> outStream;
std::shared_ptr<oatpp::data::stream::InputStreamBufferedProxy> inStream;
bool keepAlive = true;
};
class Backlog {
public:
/*static Backlog& getInstance(){
static Backlog backlog;
return backlog;
}*/
public:
class Entry {
public:
OBJECT_POOL(AsyncHttpConnectionHandler_Backlog_Entry_Pool, Entry, 32)
public:
Entry(const std::shared_ptr<ConnectionState>& pConnectionState, Entry* pNext)
: connectionState(pConnectionState)
, next(pNext)
{}
std::shared_ptr<ConnectionState> connectionState;
Entry* next;
};
private:
oatpp::concurrency::SpinLock::Atom m_atom;
Entry* m_first;
Entry* m_last;
public:
Backlog()
: m_atom(false)
, m_first(nullptr)
, m_last(nullptr)
{}
Entry* popFront() {
oatpp::concurrency::SpinLock lock(m_atom);
auto result = m_first;
if(m_first != nullptr) {
m_first = m_first->next;
if(m_first == nullptr){
m_last = nullptr;
}
}
return result;
}
void pushBack(const std::shared_ptr<ConnectionState>& connectionState) {
pushBack(new Entry(connectionState, nullptr));
}
void pushBack(Entry* entry) {
oatpp::concurrency::SpinLock lock(m_atom);
entry->next = nullptr;
if(m_last != nullptr) {
m_last->next = entry;
m_last = entry;
} else {
m_first = entry;
m_last = entry;
}
}
};
private:
class Task : public base::Controllable, public concurrency::Runnable{
private:
HttpRouter* m_router;
std::shared_ptr<handler::ErrorHandler> m_errorHandler;
oatpp::network::io::Queue m_ioQueue;
Backlog m_backLog;
oatpp::async::Processor m_processor;
public:
Task(HttpRouter* router,
const std::shared_ptr<handler::ErrorHandler>& errorHandler)
@ -147,12 +68,8 @@ private:
void run() override;
oatpp::network::io::Queue& getIOQueue(){
return m_ioQueue;
}
Backlog& getBacklog(){
return m_backLog;
oatpp::async::Processor& getProcessor(){
return m_processor;
}
};

View File

@ -8,10 +8,11 @@
#include "HttpProcessor.hpp"
#include "./HttpError.hpp"
#include "../../../network/src/io/AsyncIOStream.hpp"
namespace oatpp { namespace web { namespace server {
const char* HttpProcessor::RETURN_KEEP_ALIVE = "RETURN_KEEP_ALIVE";
bool HttpProcessor::considerConnectionKeepAlive(const std::shared_ptr<protocol::http::incoming::Request>& request,
const std::shared_ptr<protocol::http::outgoing::Response>& response){
@ -89,12 +90,134 @@ HttpProcessor::processRequest(HttpRouter* router,
return errorHandler->handleError(protocol::http::Status::CODE_404, "Current url has no mapping");
}
} if(readCount == oatpp::network::io::AsyncIOStream::ERROR_NOTHING_TO_READ) {
} if(readCount == oatpp::data::stream::IOStream::ERROR_NOTHING_TO_READ) {
keepAlive = true;
}
return nullptr;
}
std::shared_ptr<protocol::http::outgoing::Response>
HttpProcessor::getResponse(HttpRouter* router,
oatpp::os::io::Library::v_size firstReadCount,
const std::shared_ptr<handler::ErrorHandler>& errorHandler,
const std::shared_ptr<ConnectionState>& connectionState) {
oatpp::parser::ParsingCaret caret((p_char8) connectionState->ioBuffer->getData(), connectionState->ioBuffer->getSize());
auto line = protocol::http::Protocol::parseRequestStartingLine(caret);
if(!line){
return errorHandler->handleError(protocol::http::Status::CODE_400, "Can't read starting line");
}
auto route = router->getRoute(line->method, line->path);
if(!route.isNull()) {
oatpp::web::protocol::http::Status error;
auto headers = protocol::http::Protocol::parseHeaders(caret, error);
if(error.code != 0){
return errorHandler->handleError(error, " Can't parse headers");
}
auto bodyStream = connectionState->inStream;
bodyStream->setBufferPosition(caret.getPosition(), (v_int32) firstReadCount);
auto request = protocol::http::incoming::Request::createShared(line, route.matchMap, headers, bodyStream);
std::shared_ptr<protocol::http::outgoing::Response> response;
try{
response = route.processUrl(request);
} catch (HttpError& error) {
return errorHandler->handleError(error.getStatus(), error.getMessage());
} catch (std::exception& error) {
return errorHandler->handleError(protocol::http::Status::CODE_500, error.what());
} catch (...) {
return errorHandler->handleError(protocol::http::Status::CODE_500, "Unknown error");
}
response->headers->putIfNotExists(protocol::http::Header::SERVER,
protocol::http::Header::Value::SERVER);
connectionState->keepAlive = HttpProcessor::considerConnectionKeepAlive(request, response);
return response;
} else {
return errorHandler->handleError(protocol::http::Status::CODE_404, "Current url has no mapping");
}
}
oatpp::async::Action
HttpProcessor::processRequestAsync(HttpRouter* router,
const std::shared_ptr<handler::ErrorHandler>& errorHandler,
const std::shared_ptr<ConnectionState>& connectionState)
{
struct LocaleState {
LocaleState(const std::shared_ptr<ConnectionState>& pConnectionState)
: connectionState(pConnectionState)
, ioBuffer(pConnectionState->ioBuffer->getData())
, ioBufferSize(pConnectionState->ioBuffer->getSize())
{}
const std::shared_ptr<ConnectionState>& connectionState;
void* ioBuffer;
v_int32 ioBufferSize;
oatpp::os::io::Library::v_size readCount;
};
auto state = std::make_shared<LocaleState>(connectionState);
return oatpp::async::Routine::_do({
[state] {
state->readCount = state->connectionState->connection->read(state->ioBuffer, state->ioBufferSize);
if(state->readCount > 0) {
//OATPP_LOGD("FR", "line=%s", (const char*) state->ioBuffer);
return oatpp::async::Action(nullptr);
} else if(state->readCount == oatpp::data::stream::IOStream::ERROR_TRY_AGAIN){
return oatpp::async::Action::_retry();
}
return oatpp::async::Action::_abort();
}, nullptr
})._then({
[state, router, errorHandler] {
auto response = getResponse(router, state->readCount, errorHandler, state->connectionState);
return oatpp::async::Routine::_do({
[state, response] {
state->connectionState->outStream->setBufferPosition(0, 0);
return response->sendAsync(state->connectionState->outStream);
}, nullptr
})._then({
[state] {
state->connectionState->outStream->flush();
return nullptr;
}, nullptr
});
}, nullptr
})._then({
[state] {
//OATPP_LOGD("Connection Processor", "Connection finished");
if(state->connectionState->keepAlive){
//OATPP_LOGD("CP", "try-keep-alive");
oatpp::async::Error error {RETURN_KEEP_ALIVE, false};
return oatpp::async::Action(error);
}
return oatpp::async::Action(nullptr);
}, nullptr
});
}
}}}

View File

@ -18,9 +18,39 @@
#include "../../../../oatpp-lib/core/src/data/stream/StreamBufferedProxy.hpp"
#include "../../../../oatpp-lib/core/src/async/Processor.hpp"
namespace oatpp { namespace web { namespace server {
class HttpProcessor {
public:
static const char* RETURN_KEEP_ALIVE;
public:
class ConnectionState {
public:
SHARED_OBJECT_POOL(ConnectionState_Pool, ConnectionState, 32)
public:
static std::shared_ptr<ConnectionState> createShared(){
return ConnectionState_Pool::allocateShared();
}
std::shared_ptr<oatpp::data::stream::IOStream> connection;
std::shared_ptr<oatpp::data::buffer::IOBuffer> ioBuffer;
std::shared_ptr<oatpp::data::stream::OutputStreamBufferedProxy> outStream;
std::shared_ptr<oatpp::data::stream::InputStreamBufferedProxy> inStream;
bool keepAlive = true;
};
private:
static std::shared_ptr<protocol::http::outgoing::Response>
getResponse(HttpRouter* router,
oatpp::os::io::Library::v_size firstReadCount,
const std::shared_ptr<handler::ErrorHandler>& errorHandler,
const std::shared_ptr<ConnectionState>& connectionState);
public:
static bool considerConnectionKeepAlive(const std::shared_ptr<protocol::http::incoming::Request>& request,
const std::shared_ptr<protocol::http::outgoing::Response>& response);
@ -34,6 +64,11 @@ public:
const std::shared_ptr<oatpp::data::stream::InputStreamBufferedProxy>& inStream,
bool& keepAlive);
static oatpp::async::Action
processRequestAsync(HttpRouter* router,
const std::shared_ptr<handler::ErrorHandler>& errorHandler,
const std::shared_ptr<ConnectionState>& state);
};
}}}