WebSocket client functionality

This commit is contained in:
lganzzzo 2019-01-11 14:59:32 +02:00
parent 20a080a1dc
commit 2bc7e882c6
3 changed files with 77 additions and 34 deletions

View File

@ -119,7 +119,7 @@ std::shared_ptr<WebSocket> Connector::clientConnect(const Headers& clientHandsha
auto clientWebsocketAccept = oatpp::encoding::Base64::encode(sha1.finalBinary());
if(clientWebsocketAccept == websocketAccept) {
return std::make_shared<WebSocket>(serverResponse->getConnection());
return std::make_shared<WebSocket>(serverResponse->getConnection(), true);
}
}

View File

@ -26,6 +26,15 @@
namespace oatpp { namespace web { namespace protocol { namespace websocket {
thread_local std::mt19937 WebSocket::RANDOM_GENERATOR (std::random_device{}());
thread_local std::uniform_int_distribution<size_t> WebSocket::RANDOM_DISTRIBUTION (0, 255);
void WebSocket::generateMaskForFrame(Frame::Header& frameHeader) const {
for(v_int32 i = 0; i < 4; i ++) {
frameHeader.mask[i] = RANDOM_DISTRIBUTION(RANDOM_GENERATOR);
}
}
bool WebSocket::checkForContinuation(const Frame::Header& frameHeader) {
if(m_lastOpcode == Frame::OPCODE_TEXT || m_lastOpcode == Frame::OPCODE_BINARY) {
return false;
@ -145,14 +154,22 @@ void WebSocket::readPayload(const Frame::Header& frameHeader, oatpp::data::strea
if(res > 0) {
v_char8 decoded[res];
for(v_int32 i = 0; i < res; i ++) {
decoded[i] = buffer[i] ^ frameHeader.mask[(i + progress) % 4];
}
if(shortMessageStream) {
shortMessageStream->write(decoded, res);
} else if(m_listener) {
m_listener->readMessage(*this, decoded, res);
if(frameHeader.hasMask) {
v_char8 decoded[res];
for(v_int32 i = 0; i < res; i ++) {
decoded[i] = buffer[i] ^ frameHeader.mask[(i + progress) % 4];
}
if(shortMessageStream) {
shortMessageStream->write(decoded, res);
} else if(m_listener) {
m_listener->readMessage(*this, decoded, res);
}
} else {
if(shortMessageStream) {
shortMessageStream->write(buffer, res);
} else if(m_listener) {
m_listener->readMessage(*this, buffer, res);
}
}
progress += res;
@ -261,6 +278,8 @@ void WebSocket::listen() {
readFrameHeader(frameHeader);
handleFrame(frameHeader);
} while(frameHeader.opcode != Frame::OPCODE_CLOSE && m_listening);
} catch(const std::runtime_error& error) {
OATPP_LOGD("[oatpp::web::protocol::websocket::WebSocket::listen()]", "Unhandled error occurred. Message='%s'", error.what());
} catch(...) {
OATPP_LOGD("[oatpp::web::protocol::websocket::WebSocket::listen()]", "Unhandled error occurred");
}
@ -271,30 +290,41 @@ void WebSocket::stopListening() const {
m_listening = false;
}
void WebSocket::sendFrame(bool fin, v_word8 opcode, v_int64 messageSize) const {
void WebSocket::sendFrameHeader(Frame::Header& frameHeader, bool fin, v_word8 opcode, v_int64 messageSize) const {
frameHeader.fin = fin;
frameHeader.rsv1 = false;
frameHeader.rsv2 = false;
frameHeader.rsv3 = false;
frameHeader.opcode = opcode;
frameHeader.hasMask = m_maskOutgoingMessages;
frameHeader.payloadLength = messageSize;
Frame::Header frame;
frame.fin = fin;
frame.rsv1 = false;
frame.rsv2 = false;
frame.rsv3 = false;
frame.opcode = opcode;
frame.hasMask = false;
frame.payloadLength = messageSize;
writeFrameHeader(frame);
if(frameHeader.hasMask) {
generateMaskForFrame(frameHeader);
}
writeFrameHeader(frameHeader);
}
bool WebSocket::sendOneFrameMessage(v_word8 opcode, const oatpp::String& message) const {
bool WebSocket::sendOneFrame(bool fin, v_word8 opcode, const oatpp::String& message) const {
Frame::Header frameHeader;
if(message && message->getSize() > 0) {
sendFrame(true, opcode, message->getSize());
auto res = oatpp::data::stream::writeExactSizeData(m_connection.get(), message->getData(), message->getSize());
sendFrameHeader(frameHeader, fin, opcode, message->getSize());
oatpp::os::io::Library::v_size res;
if(frameHeader.hasMask) {
v_char8 encoded[message->getSize()];
for(v_int32 i = 0; i < message->getSize(); i ++) {
encoded[i] = message->getData()[i] ^ frameHeader.mask[i % 4];
}
res = oatpp::data::stream::writeExactSizeData(m_connection.get(), encoded, message->getSize());
} else {
res = oatpp::data::stream::writeExactSizeData(m_connection.get(), message->getData(), message->getSize());
}
if(res != message->getSize()) {
return false;
}
} else {
sendFrame(true, opcode, 0);
sendFrameHeader(frameHeader, fin, opcode, 0);
}
return true;
}
@ -309,7 +339,7 @@ void WebSocket::sendClose(v_word16 code, const oatpp::String& message) const {
buffer.write(message->getData(), message->getSize());
}
if(!sendOneFrameMessage(Frame::OPCODE_CLOSE, buffer.toString())) {
if(!sendOneFrame(true, Frame::OPCODE_CLOSE, buffer.toString())) {
stopListening();
throw std::runtime_error("[oatpp::web::protocol::websocket::WebSocket::sendClose(...)]: Unknown error while writing to socket.");
}
@ -317,35 +347,35 @@ void WebSocket::sendClose(v_word16 code, const oatpp::String& message) const {
}
void WebSocket::sendClose() const {
if(!sendOneFrameMessage(Frame::OPCODE_CLOSE, nullptr)) {
if(!sendOneFrame(true, Frame::OPCODE_CLOSE, nullptr)) {
stopListening();
throw std::runtime_error("[oatpp::web::protocol::websocket::WebSocket::sendClose()]: Unknown error while writing to socket.");
}
}
void WebSocket::sendPing(const oatpp::String& message) const {
if(!sendOneFrameMessage(Frame::OPCODE_PING, message)) {
if(!sendOneFrame(true, Frame::OPCODE_PING, message)) {
stopListening();
throw std::runtime_error("[oatpp::web::protocol::websocket::WebSocket::sendPing()]: Unknown error while writing to socket.");
}
}
void WebSocket::sendPong(const oatpp::String& message) const {
if(!sendOneFrameMessage(Frame::OPCODE_PONG, message)) {
if(!sendOneFrame(true, Frame::OPCODE_PONG, message)) {
stopListening();
throw std::runtime_error("[oatpp::web::protocol::websocket::WebSocket::sendPong()]: Unknown error while writing to socket.");
}
}
void WebSocket::sendOneFrameText(const oatpp::String& message) const {
if(!sendOneFrameMessage(Frame::OPCODE_TEXT, message)) {
if(!sendOneFrame(true, Frame::OPCODE_TEXT, message)) {
stopListening();
throw std::runtime_error("[oatpp::web::protocol::websocket::WebSocket::sendOneFrameText()]: Unknown error while writing to socket.");
}
}
void WebSocket::sendOneFrameBinary(const oatpp::String& message) const {
if(!sendOneFrameMessage(Frame::OPCODE_BINARY, message)) {
if(!sendOneFrame(true, Frame::OPCODE_BINARY, message)) {
stopListening();
throw std::runtime_error("[oatpp::web::protocol::websocket::WebSocket::sendOneFrameBinary()]: Unknown error while writing to socket.");
}

View File

@ -29,9 +29,15 @@
#include "oatpp/core/data/stream/ChunkedBuffer.hpp"
#include <random>
namespace oatpp { namespace web { namespace protocol { namespace websocket {
class WebSocket {
private:
/* Random used to generate message masks */
static thread_local std::mt19937 RANDOM_GENERATOR;
static thread_local std::uniform_int_distribution<size_t> RANDOM_DISTRIBUTION;
public:
class Listener {
@ -66,6 +72,8 @@ public:
private:
void generateMaskForFrame(Frame::Header& frameHeader) const;
bool checkForContinuation(const Frame::Header& frameHeader);
void readFrameHeader(Frame::Header& frameHeader) const;
void handleFrame(const Frame::Header& frameHeader);
@ -78,13 +86,18 @@ private:
private:
std::shared_ptr<oatpp::data::stream::IOStream> m_connection;
bool m_maskOutgoingMessages;
std::shared_ptr<Listener> m_listener;
v_int32 m_lastOpcode;
mutable bool m_listening;
public:
WebSocket(const std::shared_ptr<oatpp::data::stream::IOStream>& connection)
/**
* maskOutgoingMessages for servers should be false. For clients should be true
*/
WebSocket(const std::shared_ptr<oatpp::data::stream::IOStream>& connection, bool maskOutgoingMessages)
: m_connection(connection)
, m_maskOutgoingMessages(maskOutgoingMessages)
, m_listener(nullptr)
, m_lastOpcode(-1)
, m_listening(false)
@ -127,14 +140,14 @@ public:
* Use this method if you know what you are doing.
* Send default frame to peer with fin, opcode and messageSize set
*/
void sendFrame(bool fin, v_word8 opcode, v_int64 messageSize) const;
void sendFrameHeader(Frame::Header& frameHeader, bool fin, v_word8 opcode, v_int64 messageSize) const;
/**
* Send one frame message with custom opcode
* Send one frame message with custom fin and opcode
* return true on success, false on error.
* if false returned socket should be closed manually
*/
bool sendOneFrameMessage(v_word8 opcode, const oatpp::String& message) const;
bool sendOneFrame(bool fin, v_word8 opcode, const oatpp::String& message) const;
/**
* throws on error and closes socket