AsyncWebSocket API

This commit is contained in:
lganzzzo 2019-01-16 01:58:27 +02:00
parent 34ee031a83
commit 25e3af3572
9 changed files with 427 additions and 31 deletions

View File

@ -131,6 +131,9 @@ private:
{
AbstractCoroutine* savedCP = _CP;
_CP = _CP->m_parent;
_FP = nullptr;
/* Please note that savedCP->m_parentReturnAction should not be "REPEAT nor WAIT_RETRY" */
/* as funtion pointer (FP) is invalidated */
Action a = takeAction(savedCP->m_parentReturnAction);
savedCP->m_parentReturnAction.m_coroutine = nullptr;
savedCP->free();

View File

@ -261,7 +261,7 @@ oatpp::async::Action ChunkedBuffer::flushToStreamAsync(oatpp::async::AbstractCor
}
Action writeCurrData() {
return oatpp::data::stream::writeDataAsyncInline(m_stream.get(), m_currData, m_currDataSize, m_nextAction);
return oatpp::data::stream::writeExactSizeDataAsyncInline(m_stream.get(), m_currData, m_currDataSize, m_nextAction);
}
};

View File

@ -220,10 +220,10 @@ oatpp::async::Action transferAsync(oatpp::async::AbstractCoroutine* parentCorout
}
Action doWrite() {
return oatpp::data::stream::writeDataAsyncInline(m_toStream.get(),
m_writeBufferPtr,
m_bytesLeft,
yieldTo(&TransferCoroutine::act));
return oatpp::data::stream::writeExactSizeDataAsyncInline(m_toStream.get(),
m_writeBufferPtr,
m_bytesLeft,
yieldTo(&TransferCoroutine::act));
}
Action handleError(const oatpp::async::Error& error) override {
@ -241,10 +241,10 @@ oatpp::async::Action transferAsync(oatpp::async::AbstractCoroutine* parentCorout
}
oatpp::async::Action writeDataAsyncInline(oatpp::data::stream::OutputStream* stream,
const void*& data,
os::io::Library::v_size& size,
const oatpp::async::Action& nextAction) {
oatpp::async::Action writeSomeDataAsyncInline(oatpp::data::stream::OutputStream* stream,
const void*& data,
os::io::Library::v_size& size,
const oatpp::async::Action& nextAction) {
auto res = stream->write(data, size);
if(res == oatpp::data::stream::Errors::ERROR_IO_WAIT_RETRY) {
return oatpp::async::Action::_WAIT_RETRY;
@ -252,11 +252,36 @@ oatpp::async::Action writeDataAsyncInline(oatpp::data::stream::OutputStream* str
return oatpp::async::Action::_REPEAT;
} else if(res == oatpp::data::stream::Errors::ERROR_IO_PIPE) {
return oatpp::async::Action::_ABORT;
} else if( res < 0) {
} else if(res < 0) {
return oatpp::async::Action(oatpp::async::Error(Errors::ERROR_ASYNC_FAILED_TO_WRITE_DATA));
} else if(res < size) {
data = &((p_char8) data)[res];
size = size - res;
}
data = &((p_char8) data)[res];
size = size - res;
if(res < size && res > 0) {
return oatpp::async::Action::_REPEAT;
}
return nextAction;
}
oatpp::async::Action writeExactSizeDataAsyncInline(oatpp::data::stream::OutputStream* stream,
const void*& data,
os::io::Library::v_size& size,
const oatpp::async::Action& nextAction) {
auto res = stream->write(data, size);
if(res == oatpp::data::stream::Errors::ERROR_IO_WAIT_RETRY) {
return oatpp::async::Action::_WAIT_RETRY;
} else if(res == oatpp::data::stream::Errors::ERROR_IO_RETRY) {
return oatpp::async::Action::_REPEAT;
} else if(res == oatpp::data::stream::Errors::ERROR_IO_PIPE) {
return oatpp::async::Action::_ABORT;
} else if(res < 0) {
return oatpp::async::Action(oatpp::async::Error(Errors::ERROR_ASYNC_FAILED_TO_WRITE_DATA));
} else if(res == 0) {
return oatpp::async::Action(oatpp::async::Error(Errors::ERROR_ASYNC_FAILED_TO_WRITE_DATA));
}
data = &((p_char8) data)[res];
size = size - res;
if(res < size) {
return oatpp::async::Action::_REPEAT;
}
return nextAction;
@ -273,11 +298,9 @@ oatpp::async::Action readSomeDataAsyncInline(oatpp::data::stream::InputStream* s
return oatpp::async::Action::_REPEAT;
} else if( res < 0) {
return oatpp::async::Action(oatpp::async::Error(Errors::ERROR_ASYNC_FAILED_TO_READ_DATA));
} else if(res < bytesLeftToRead) {
data = &((p_char8) data)[res];
bytesLeftToRead -= res;
return nextAction;
}
}
// res == 0 is not an error here
data = &((p_char8) data)[res];
bytesLeftToRead -= res;
return nextAction;
}
@ -295,12 +318,14 @@ oatpp::async::Action readExactSizeDataAsyncInline(oatpp::data::stream::InputStre
return oatpp::async::Action::_ABORT;
} else if( res < 0) {
return oatpp::async::Action(oatpp::async::Error(Errors::ERROR_ASYNC_FAILED_TO_READ_DATA));
} else if(res < bytesLeftToRead) {
data = &((p_char8) data)[res];
bytesLeftToRead -= res;
} else if(res == 0) { // Connection is probably closed or eof is reached
return oatpp::async::Action(oatpp::async::Error(Errors::ERROR_ASYNC_FAILED_TO_READ_DATA));
}
data = &((p_char8) data)[res];
bytesLeftToRead -= res;
if(res < bytesLeftToRead) {
return oatpp::async::Action::_REPEAT;
}
bytesLeftToRead -= res;
return nextAction;
}

View File

@ -162,10 +162,15 @@ oatpp::async::Action transferAsync(oatpp::async::AbstractCoroutine* parentCorout
* Async write data withot starting new Coroutine.
* Should be called from a separate Coroutine method
*/
oatpp::async::Action writeDataAsyncInline(oatpp::data::stream::OutputStream* stream,
const void*& data,
os::io::Library::v_size& size,
const oatpp::async::Action& nextAction);
oatpp::async::Action writeSomeDataAsyncInline(oatpp::data::stream::OutputStream* stream,
const void*& data,
os::io::Library::v_size& size,
const oatpp::async::Action& nextAction);
oatpp::async::Action writeExactSizeDataAsyncInline(oatpp::data::stream::OutputStream* stream,
const void*& data,
os::io::Library::v_size& size,
const oatpp::async::Action& nextAction);
oatpp::async::Action readSomeDataAsyncInline(oatpp::data::stream::InputStream* stream,
void*& data,

View File

@ -26,13 +26,11 @@
#define oatpp_web_client_WebSocketConnector_hpp
#include "./HttpRequestExecutor.hpp"
#include "oatpp/web/protocol/websocket/WebSocket.hpp"
namespace oatpp { namespace web { namespace client {
class WebSocketConnector {
public:
typedef protocol::websocket::WebSocket WebSocket;
typedef oatpp::data::stream::IOStream Connection;
typedef oatpp::async::Action Action;
typedef Action (oatpp::async::AbstractCoroutine::*AsyncCallback)(const std::shared_ptr<Connection>&);

View File

@ -74,7 +74,7 @@ public:
{}
Action act() override {
return oatpp::data::stream::writeDataAsyncInline(m_stream.get(), m_currData, m_currDataSize, finish());
return oatpp::data::stream::writeExactSizeDataAsyncInline(m_stream.get(), m_currData, m_currDataSize, finish());
}
};

View File

@ -145,7 +145,7 @@ public:
}
Action writeCurrData() {
return oatpp::data::stream::writeDataAsyncInline(m_stream.get(), m_currData, m_currDataSize, m_nextAction);
return oatpp::data::stream::writeExactSizeDataAsyncInline(m_stream.get(), m_currData, m_currDataSize, m_nextAction);
}
};

View File

@ -24,6 +24,8 @@
#include "AsyncWebSocket.hpp"
#include "./Utils.hpp"
namespace oatpp { namespace web { namespace protocol { namespace websocket {
bool AsyncWebSocket::checkForContinuation(const Frame::Header& frameHeader) {
@ -127,6 +129,81 @@ oatpp::async::Action AsyncWebSocket::readFrameHeaderAsync(oatpp::async::Abstract
}
oatpp::async::Action AsyncWebSocket::writeFrameHeaderAsync(oatpp::async::AbstractCoroutine* parentCoroutine,
const Action& actionOnReturn,
const std::shared_ptr<Frame::Header>& frameHeader)
{
class WriteFrameCoroutine : public oatpp::async::Coroutine<WriteFrameCoroutine> {
private:
std::shared_ptr<oatpp::data::stream::IOStream> m_connection;
std::shared_ptr<Frame::Header> m_frameHeader;
private:
v_int32 m_lenType;
v_word16 m_bb;
v_word16 m_messageLen2;
v_word32 m_messageLen3 [2];
v_word8 m_messageLengthScenario;
private:
const void* m_currData;
os::io::Library::v_size m_bytesToWrite;
public:
WriteFrameCoroutine(const std::shared_ptr<oatpp::data::stream::IOStream> connection,
const std::shared_ptr<Frame::Header>& frameHeader)
: m_connection(connection)
, m_frameHeader(frameHeader)
{
;
Frame::packHeaderBits(m_bb, *m_frameHeader, m_messageLengthScenario);
m_bb = htons(m_bb);
m_currData = &m_bb;
m_bytesToWrite = 2;
}
Action act() override {
return oatpp::data::stream::writeExactSizeDataAsyncInline(m_connection.get(), m_currData, m_bytesToWrite, yieldTo(&WriteFrameCoroutine::onBbWritten));
}
Action onBbWritten() {
if(m_messageLengthScenario == 2) {
m_messageLen2 = htons(m_frameHeader->payloadLength);
m_currData = &m_messageLen2;
m_bytesToWrite = 2;
return yieldTo(&WriteFrameCoroutine::writeMessageLen);
} else if(m_messageLengthScenario == 3) {
m_messageLen3[0] = htonl(m_frameHeader->payloadLength >> 32);
m_messageLen3[1] = htonl(m_frameHeader->payloadLength & 0xFFFFFFFF);
m_currData = m_messageLen3;
m_bytesToWrite = 8;
return yieldTo(&WriteFrameCoroutine::writeMessageLen);
}
return yieldTo(&WriteFrameCoroutine::onLenWritten);
}
Action writeMessageLen() {
return oatpp::data::stream::writeExactSizeDataAsyncInline(m_connection.get(), m_currData, m_bytesToWrite, yieldTo(&WriteFrameCoroutine::onLenWritten));
}
Action onLenWritten() {
if(m_frameHeader->hasMask) {
m_currData = m_frameHeader->mask;
m_bytesToWrite = 4;
return yieldTo(&WriteFrameCoroutine::writeMask);
}
return finish();
}
Action writeMask() {
return oatpp::data::stream::writeExactSizeDataAsyncInline(m_connection.get(), m_currData, m_bytesToWrite, finish());
}
};
return parentCoroutine->startCoroutine<WriteFrameCoroutine>(actionOnReturn, m_connection, frameHeader);
}
oatpp::async::Action AsyncWebSocket::readPayloadAsync(oatpp::async::AbstractCoroutine* parentCoroutine,
const Action& actionOnReturn,
const std::shared_ptr<Frame::Header>& frameHeader,
@ -245,4 +322,261 @@ oatpp::async::Action AsyncWebSocket::readPayloadAsync(oatpp::async::AbstractCoro
}
oatpp::async::Action AsyncWebSocket::handleFrameAsync(oatpp::async::AbstractCoroutine* parentCoroutine,
const Action& actionOnReturn,
const std::shared_ptr<Frame::Header>& frameHeader)
{
class HandleFrameCoroutine : public oatpp::async::Coroutine<HandleFrameCoroutine> {
private:
std::shared_ptr<AsyncWebSocket> m_socket;
std::shared_ptr<Frame::Header> m_frameHeader;
std::shared_ptr<Listener> m_listener;
private:
std::shared_ptr<oatpp::data::stream::ChunkedBuffer> m_shortMessageStream;
public:
HandleFrameCoroutine(const std::shared_ptr<AsyncWebSocket>& socket,
const std::shared_ptr<Frame::Header>& frameHeader)
: m_socket(socket)
, m_frameHeader(frameHeader)
, m_listener(socket->m_listener)
{}
Action act() override {
switch (m_frameHeader->opcode) {
case Frame::OPCODE_CONTINUATION:
if(m_socket->m_lastOpcode < 0) {
throw std::runtime_error("[oatpp::web::protocol::websocket::AsyncWebSocket::handleFrameAsync(){HandleFrameCoroutine}]: Invalid communication state.");
}
return m_socket->readPayloadAsync(this, finish(), m_frameHeader, nullptr);
case Frame::OPCODE_TEXT:
if(m_socket->checkForContinuation(*m_frameHeader)) {
return m_socket->readPayloadAsync(this, finish(), m_frameHeader, nullptr);
} else {
throw std::runtime_error("[oatpp::web::protocol::websocket::AsyncWebSocket::handleFrameAsync(){HandleFrameCoroutine}]: Invalid communication state. OPCODE_CONTINUATION expected");
}
case Frame::OPCODE_BINARY:
if(m_socket->checkForContinuation(*m_frameHeader)) {
return m_socket->readPayloadAsync(this, finish(), m_frameHeader, nullptr);
} else {
throw std::runtime_error("[oatpp::web::protocol::websocket::AsyncWebSocket::handleFrameAsync(){HandleFrameCoroutine}]: Invalid communication state. OPCODE_CONTINUATION expected");
}
case Frame::OPCODE_CLOSE:
m_shortMessageStream = oatpp::data::stream::ChunkedBuffer::createShared();
return m_socket->readPayloadAsync(this, yieldTo(&HandleFrameCoroutine::onClose), m_frameHeader, m_shortMessageStream);
case Frame::OPCODE_PING:
m_shortMessageStream = oatpp::data::stream::ChunkedBuffer::createShared();
return m_socket->readPayloadAsync(this, yieldTo(&HandleFrameCoroutine::onPing), m_frameHeader, m_shortMessageStream);
case Frame::OPCODE_PONG:
m_shortMessageStream = oatpp::data::stream::ChunkedBuffer::createShared();
return m_socket->readPayloadAsync(this, yieldTo(&HandleFrameCoroutine::onPong), m_frameHeader, m_shortMessageStream);
default:
throw std::runtime_error("[oatpp::web::protocol::websocket::AsyncWebSocket::handleFrameAsync(){HandleFrameCoroutine}]: Unknown frame");
break;
}
}
Action onClose() {
if(m_listener) {
v_word16 code = 0;
oatpp::String message;
if(m_shortMessageStream->getSize() >= 2) {
m_shortMessageStream->readSubstring(&code, 0, 2);
code = ntohs(code);
message = m_shortMessageStream->getSubstring(2, m_shortMessageStream->getSize() - 2);
}
if(!message) {
message = "";
}
return m_listener->onClose(this, finish(), m_socket, code, message);
}
return finish();
}
Action onPing() {
if(m_listener) {
return m_listener->onPing(this, finish(), m_socket, m_shortMessageStream->toString());
}
return finish();
}
Action onPong() {
if(m_listener) {
return m_listener->onPong(this, finish(), m_socket, m_shortMessageStream->toString());
}
return finish();
}
};
return parentCoroutine->startCoroutine<HandleFrameCoroutine>(actionOnReturn, getSharedPtr<AsyncWebSocket>(), frameHeader);
}
oatpp::async::Action AsyncWebSocket::listenAsync(oatpp::async::AbstractCoroutine* parentCoroutine, const Action& actionOnReturn) {
class ListenCoroutine : public oatpp::async::Coroutine<ListenCoroutine> {
private:
std::shared_ptr<AsyncWebSocket> m_socket;
std::shared_ptr<Frame::Header> m_frameHeader;
public:
ListenCoroutine(const std::shared_ptr<AsyncWebSocket>& socket)
: m_socket(socket)
, m_frameHeader(std::make_shared<Frame::Header>())
{
m_frameHeader->opcode = -1;
}
Action act() override {
if(m_frameHeader->opcode != Frame::OPCODE_CLOSE ) {
return m_socket->readFrameHeaderAsync(this,
m_socket->handleFrameAsync(this,
yieldTo(&ListenCoroutine::act),
m_frameHeader),
m_frameHeader);
}
return finish();
}
Action handleError(const async::Error& error) override {
return finish();
}
};
return parentCoroutine->startCoroutine<ListenCoroutine>(actionOnReturn, getSharedPtr<AsyncWebSocket>());
}
oatpp::async::Action AsyncWebSocket::sendFrameHeaderAsync(oatpp::async::AbstractCoroutine* parentCoroutine,
const Action& actionOnReturn,
const std::shared_ptr<Frame::Header>& frameHeader,
bool fin, v_word8 opcode, v_int64 messageSize)
{
frameHeader->fin = fin;
frameHeader->rsv1 = false;
frameHeader->rsv2 = false;
frameHeader->rsv3 = false;
frameHeader->opcode = opcode;
frameHeader->hasMask = m_maskOutgoingMessages;
frameHeader->payloadLength = messageSize;
if(frameHeader->hasMask) {
Utils::generateMaskForFrame(*frameHeader);
}
return writeFrameHeaderAsync(parentCoroutine, actionOnReturn, frameHeader);
}
oatpp::async::Action AsyncWebSocket::sendOneFrameAsync(oatpp::async::AbstractCoroutine* parentCoroutine,
const Action& actionOnReturn,
bool fin, v_word8 opcode, const oatpp::String& message)
{
class SendFrameCoroutine : public oatpp::async::Coroutine<SendFrameCoroutine> {
private:
std::shared_ptr<AsyncWebSocket> m_socket;
bool m_fin;
v_word8 m_opcode;
oatpp::String m_message;
std::shared_ptr<Frame::Header> m_frameHeader;
private:
p_char8 m_encoded = nullptr;
private:
const void* m_currData;
os::io::Library::v_size m_bytesToWrite;
public:
SendFrameCoroutine(const std::shared_ptr<AsyncWebSocket>& socket,
bool fin, v_word8 opcode, const oatpp::String& message)
: m_socket(socket)
, m_fin(fin)
, m_opcode(opcode)
, m_message(message)
, m_frameHeader(std::make_shared<Frame::Header>())
{}
~SendFrameCoroutine() {
if(m_encoded != nullptr) {
delete [] m_encoded;
}
}
Action act() override {
if(m_message && m_message->getSize() > 0) {
return m_socket->sendFrameHeaderAsync(this, yieldTo(&SendFrameCoroutine::prepareWriteMessage), m_frameHeader, m_fin, m_opcode, m_message->getSize());
} else {
return m_socket->sendFrameHeaderAsync(this, finish(), m_frameHeader, m_fin, m_opcode, 0);
}
}
Action prepareWriteMessage() {
if(m_frameHeader->hasMask) {
m_encoded = new v_char8[m_message->getSize()];
for(v_int32 i = 0; i < m_message->getSize(); i ++) {
m_encoded[i] = m_message->getData()[i] ^ m_frameHeader->mask[i % 4];
}
m_currData = m_encoded;
m_bytesToWrite = m_message->getSize();
} else {
m_currData = m_message->getData();
m_bytesToWrite = m_message->getSize();
}
return yieldTo(&SendFrameCoroutine::writeMessage);
}
Action writeMessage() {
return oatpp::data::stream::writeExactSizeDataAsyncInline(m_socket->m_connection.get(), m_currData, m_bytesToWrite, finish());
}
};
return parentCoroutine->startCoroutine<SendFrameCoroutine>(actionOnReturn, getSharedPtr<AsyncWebSocket>(), fin, opcode, message);
}
oatpp::async::Action AsyncWebSocket::sendCloseAsync(oatpp::async::AbstractCoroutine* parentCoroutine, const Action& actionOnReturn, v_word16 code, const oatpp::String& message) {
code = htons(code);
oatpp::data::stream::ChunkedBuffer buffer;
buffer.write(&code, 2);
if(message) {
buffer.write(message->getData(), message->getSize());
}
return sendOneFrameAsync(parentCoroutine, actionOnReturn, true, Frame::OPCODE_CLOSE, buffer.toString());
}
oatpp::async::Action AsyncWebSocket::sendCloseAsync(oatpp::async::AbstractCoroutine* parentCoroutine, const Action& actionOnReturn) {
return sendOneFrameAsync(parentCoroutine, actionOnReturn, true, Frame::OPCODE_CLOSE, nullptr);
}
oatpp::async::Action AsyncWebSocket::sendPingAsync(oatpp::async::AbstractCoroutine* parentCoroutine, const Action& actionOnReturn, const oatpp::String& message) {
return sendOneFrameAsync(parentCoroutine, actionOnReturn, true, Frame::OPCODE_PING, message);
}
oatpp::async::Action AsyncWebSocket::sendPongAsync(oatpp::async::AbstractCoroutine* parentCoroutine, const Action& actionOnReturn, const oatpp::String& message) {
return sendOneFrameAsync(parentCoroutine, actionOnReturn, true, Frame::OPCODE_PONG, message);
}
oatpp::async::Action AsyncWebSocket::sendOneFrameTextAsync(oatpp::async::AbstractCoroutine* parentCoroutine, const Action& actionOnReturn, const oatpp::String& message) {
return sendOneFrameAsync(parentCoroutine, actionOnReturn, true, Frame::OPCODE_TEXT, message);
}
oatpp::async::Action AsyncWebSocket::sendOneFrameBinaryAsync(oatpp::async::AbstractCoroutine* parentCoroutine, const Action& actionOnReturn, const oatpp::String& message) {
return sendOneFrameAsync(parentCoroutine, actionOnReturn, true, Frame::OPCODE_BINARY, message);
}
}}}}

View File

@ -97,7 +97,9 @@ private:
const std::shared_ptr<Frame::Header>& frameHeader,
const std::shared_ptr<oatpp::data::stream::ChunkedBuffer>& shortMessageStream);
void handleFrame(const Frame::Header& frameHeader);
Action handleFrameAsync(oatpp::async::AbstractCoroutine* parentCoroutine,
const Action& actionOnReturn,
const std::shared_ptr<Frame::Header>& frameHeader);
private:
std::shared_ptr<oatpp::data::stream::IOStream> m_connection;
@ -118,6 +120,10 @@ public:
, m_listening(false)
{}
static std::shared_ptr<AsyncWebSocket> createShared(const std::shared_ptr<oatpp::data::stream::IOStream>& connection, bool maskOutgoingMessages) {
return std::make_shared<AsyncWebSocket>(connection, maskOutgoingMessages);
}
std::shared_ptr<oatpp::data::stream::IOStream> getConnection() const {
return m_connection;
}
@ -126,7 +132,32 @@ public:
m_listener = listener;
}
Action listenAsync(oatpp::async::AbstractCoroutine* parentCoroutine, const Action& actionOnReturn);
Action writeFrameHeaderAsync(oatpp::async::AbstractCoroutine* parentCoroutine,
const Action& actionOnReturn,
const std::shared_ptr<Frame::Header>& frameHeader);
Action sendFrameHeaderAsync(oatpp::async::AbstractCoroutine* parentCoroutine,
const Action& actionOnReturn,
const std::shared_ptr<Frame::Header>& frameHeader,
bool fin, v_word8 opcode, v_int64 messageSize);
Action sendOneFrameAsync(oatpp::async::AbstractCoroutine* parentCoroutine,
const Action& actionOnReturn,
bool fin, v_word8 opcode, const oatpp::String& message);
Action sendCloseAsync(oatpp::async::AbstractCoroutine* parentCoroutine, const Action& actionOnReturn, v_word16 code, const oatpp::String& message);
Action sendCloseAsync(oatpp::async::AbstractCoroutine* parentCoroutine, const Action& actionOnReturn);
Action sendPingAsync(oatpp::async::AbstractCoroutine* parentCoroutine, const Action& actionOnReturn, const oatpp::String& message);
Action sendPongAsync(oatpp::async::AbstractCoroutine* parentCoroutine, const Action& actionOnReturn, const oatpp::String& message);
Action sendOneFrameTextAsync(oatpp::async::AbstractCoroutine* parentCoroutine, const Action& actionOnReturn, const oatpp::String& message);
Action sendOneFrameBinaryAsync(oatpp::async::AbstractCoroutine* parentCoroutine, const Action& actionOnReturn, const oatpp::String& message);
};