generalization of ConnectionProvider. ConnectionProvider for virtual_::client/virtual_::server

This commit is contained in:
lganzzzo 2018-11-15 22:39:18 +02:00
parent e55755a77a
commit 4a107a2bef
11 changed files with 325 additions and 50 deletions

View File

@ -25,5 +25,24 @@
#include "./ConnectionProvider.hpp"
namespace oatpp { namespace network {
const char* const ConnectionProvider::PROPERTY_HOST = "host";
const char* const ConnectionProvider::PROPERTY_PORT = "port";
void ConnectionProvider::setProperty(const oatpp::String& key, const oatpp::String& value) {
m_properties[key] = value;
}
const std::unordered_map<oatpp::String, oatpp::String>& ConnectionProvider::getProperties() {
return m_properties;
}
oatpp::String ConnectionProvider::getProperty(const oatpp::String& key) {
auto it = m_properties.find(key);
if(it == m_properties.end()) {
return nullptr;
}
return it->second;
}
}}

View File

@ -27,55 +27,60 @@
#include "oatpp/core/data/stream/Stream.hpp"
#include "oatpp/core/async/Coroutine.hpp"
#include <unordered_map>
namespace oatpp { namespace network {
/**
* Abstract ConnectionProvider.
* It may be anything that returns oatpp::data::stream::IOStream
* User of ConnectionProvider should care about IOStream only.
* All other properties are optional
*/
class ConnectionProvider {
public:
static const char* const PROPERTY_HOST;
static const char* const PROPERTY_PORT;
public:
typedef oatpp::data::stream::IOStream IOStream;
typedef oatpp::async::Action Action;
typedef oatpp::async::Action (oatpp::async::AbstractCoroutine::*AsyncCallback)(const std::shared_ptr<IOStream>&);
private:
std::unordered_map<oatpp::String, oatpp::String> m_properties;
protected:
/**
* Set optional property
*/
void setProperty(const oatpp::String& key, const oatpp::String& value);
public:
virtual ~ConnectionProvider() {}
virtual std::shared_ptr<IOStream> getConnection() = 0;
virtual Action getConnectionAsync(oatpp::async::AbstractCoroutine* parentCoroutine,
AsyncCallback callback) = 0;
/**
* Some optional properties that user might want to know.
* All properties are optional and user should not rely on this
*/
const std::unordered_map<oatpp::String, oatpp::String>& getProperties();
/**
* Get optional property
*/
oatpp::String getProperty(const oatpp::String& key);
};
/**
* No properties here. It is just a logical division
*/
class ServerConnectionProvider : public ConnectionProvider {
protected:
v_word16 m_port;
public:
ServerConnectionProvider(v_word16 port)
: m_port(port)
{}
v_word16 getPort(){
return m_port;
}
};
/**
* No properties here. It is just a logical division
*/
class ClientConnectionProvider : public ConnectionProvider {
protected:
oatpp::String m_host;
v_word16 m_port;
public:
ClientConnectionProvider(const oatpp::String& host, v_word16 port)
: m_host(host)
, m_port(port)
{}
oatpp::String getHost() {
return m_host;
}
v_word16 getPort(){
return m_port;
}
};
}}

View File

@ -26,7 +26,7 @@
#include "oatpp/network/Connection.hpp"
#include "oatpp/core/data/stream/ChunkedBuffer.hpp"
#include "oatpp/core/utils/ConversionUtils.hpp"
#include "oatpp/test/Checker.hpp"
#include <fcntl.h>
@ -35,6 +35,14 @@
#include <sys/socket.h>
namespace oatpp { namespace network { namespace client {
SimpleTCPConnectionProvider::SimpleTCPConnectionProvider(const oatpp::String& host, v_word16 port)
: m_host(host)
, m_port(port)
{
setProperty(PROPERTY_HOST, m_host);
setProperty(PROPERTY_PORT, oatpp::utils::conversion::int32ToStr(port));
}
std::shared_ptr<oatpp::data::stream::IOStream> SimpleTCPConnectionProvider::getConnection(){

View File

@ -33,20 +33,28 @@
namespace oatpp { namespace network { namespace client {
class SimpleTCPConnectionProvider : public base::Controllable, public ClientConnectionProvider {
protected:
oatpp::String m_host;
v_word16 m_port;
public:
SimpleTCPConnectionProvider(const oatpp::String& host, v_int32 port)
: ClientConnectionProvider(host, port)
{}
SimpleTCPConnectionProvider(const oatpp::String& host, v_word16 port);
public:
static std::shared_ptr<SimpleTCPConnectionProvider>
createShared(const oatpp::String& host, v_int32 port){
static std::shared_ptr<SimpleTCPConnectionProvider> createShared(const oatpp::String& host, v_word16 port){
return std::make_shared<SimpleTCPConnectionProvider>(host, port);
}
std::shared_ptr<IOStream> getConnection() override;
Action getConnectionAsync(oatpp::async::AbstractCoroutine* parentCoroutine, AsyncCallback callback) override;
oatpp::String getHost() {
return m_host;
}
v_word16 getPort(){
return m_port;
}
};
}}}

View File

@ -35,9 +35,17 @@
#include <sys/socket.h>
#include <netinet/tcp.h>
namespace oatpp { namespace network { namespace server {
SimpleTCPConnectionProvider::SimpleTCPConnectionProvider(v_word16 port, bool nonBlocking)
: m_port(port)
, m_nonBlocking(nonBlocking)
{
m_serverHandle = instantiateServer();
setProperty(PROPERTY_HOST, "localhost");
setProperty(PROPERTY_PORT, oatpp::utils::conversion::int32ToStr(port));
}
oatpp::os::io::Library::v_handle SimpleTCPConnectionProvider::instantiateServer(){
oatpp::os::io::Library::v_handle serverHandle;

View File

@ -35,17 +35,13 @@ namespace oatpp { namespace network { namespace server {
class SimpleTCPConnectionProvider : public base::Controllable, public ServerConnectionProvider {
private:
oatpp::os::io::Library::v_handle m_serverHandle;
v_word16 m_port;
bool m_nonBlocking;
oatpp::os::io::Library::v_handle m_serverHandle;
private:
oatpp::os::io::Library::v_handle instantiateServer();
public:
SimpleTCPConnectionProvider(v_word16 port, bool nonBlocking = false)
: ServerConnectionProvider(port)
, m_nonBlocking(nonBlocking)
{
m_serverHandle = instantiateServer();
}
SimpleTCPConnectionProvider(v_word16 port, bool nonBlocking = false);
public:
static std::shared_ptr<SimpleTCPConnectionProvider> createShared(v_word16 port, bool nonBlocking = false){
@ -65,8 +61,14 @@ public:
* For Asynchronous IO in oatpp it is considered to be a good practice
* to accept connections in a seperate thread with the blocking accept()
* and then process connections in Asynchronous manner with non-blocking read/write
*
* It may be implemented later
*/
throw std::runtime_error("oatpp::network::server::SimpleTCPConnectionProvider::getConnectionAsync not implemented.");
throw std::runtime_error("[oatpp::network::server::SimpleTCPConnectionProvider::getConnectionAsync()] not implemented.");
}
v_word16 getPort(){
return m_port;
}
};

View File

@ -0,0 +1,71 @@
/***************************************************************************
*
* Project _____ __ ____ _ _
* ( _ ) /__\ (_ _)_| |_ _| |_
* )(_)( /(__)\ )( (_ _)(_ _)
* (_____)(__)(__)(__) |_| |_|
*
*
* Copyright 2018-present, Leonid Stryzhevskyi, <lganzzzo@gmail.com>
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
***************************************************************************/
#include "ConnectionProvider.hpp"
namespace oatpp { namespace network { namespace virtual_ { namespace client {
std::shared_ptr<ConnectionProvider::IOStream> ConnectionProvider::getConnection() {
auto submission = m_interface->connect();
auto socket = submission->getSocket();
socket->setNonBlocking(false);
return socket;
}
oatpp::async::Action ConnectionProvider::getConnectionAsync(oatpp::async::AbstractCoroutine* parentCoroutine, AsyncCallback callback) {
class ConnectCoroutine : public oatpp::async::CoroutineWithResult<ConnectCoroutine, std::shared_ptr<oatpp::data::stream::IOStream>> {
private:
std::shared_ptr<virtual_::Interface> m_interface;
std::shared_ptr<virtual_::Interface::ConnectionSubmission> m_submission;
public:
ConnectCoroutine(const std::shared_ptr<virtual_::Interface>& interface)
: m_interface(interface)
{}
Action act() override {
m_submission = m_interface->connectNonBlocking();
if(m_submission){
return yieldTo(&ConnectCoroutine::obtainSocket);
}
return waitRetry();
}
Action obtainSocket() {
auto socket = m_submission->getSocketNonBlocking();
if(socket) {
socket->setNonBlocking(true);
return _return(socket);
}
return waitRetry();
}
};
return parentCoroutine->startCoroutineForResult<ConnectCoroutine>(callback, m_interface);
}
}}}}

View File

@ -0,0 +1,54 @@
/***************************************************************************
*
* Project _____ __ ____ _ _
* ( _ ) /__\ (_ _)_| |_ _| |_
* )(_)( /(__)\ )( (_ _)(_ _)
* (_____)(__)(__)(__) |_| |_|
*
*
* Copyright 2018-present, Leonid Stryzhevskyi, <lganzzzo@gmail.com>
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
***************************************************************************/
#ifndef oatpp_network_virtual__client_ConnectionProvider_hpp
#define oatpp_network_virtual__client_ConnectionProvider_hpp
#include "oatpp/network/virtual_/Interface.hpp"
#include "oatpp/network/ConnectionProvider.hpp"
namespace oatpp { namespace network { namespace virtual_ { namespace client {
class ConnectionProvider : public oatpp::network::ClientConnectionProvider {
private:
std::shared_ptr<virtual_::Interface> m_interface;
public:
ConnectionProvider(const std::shared_ptr<virtual_::Interface>& interface)
: m_interface(interface)
{
setProperty(PROPERTY_HOST, m_interface->getName());
setProperty(PROPERTY_PORT, "0");
}
std::shared_ptr<IOStream> getConnection() override;
Action getConnectionAsync(oatpp::async::AbstractCoroutine* parentCoroutine,
AsyncCallback callback) override;
};
}}}}
#endif /* oatpp_network_virtual__client_ConnectionProvider_hpp */

View File

@ -0,0 +1,35 @@
/***************************************************************************
*
* Project _____ __ ____ _ _
* ( _ ) /__\ (_ _)_| |_ _| |_
* )(_)( /(__)\ )( (_ _)(_ _)
* (_____)(__)(__)(__) |_| |_|
*
*
* Copyright 2018-present, Leonid Stryzhevskyi, <lganzzzo@gmail.com>
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
***************************************************************************/
#include "ConnectionProvider.hpp"
namespace oatpp { namespace network { namespace virtual_ { namespace server {
std::shared_ptr<ConnectionProvider::IOStream> ConnectionProvider::getConnection() {
auto socket = m_interface->accept();
socket->setNonBlocking(false);
return socket;
}
}}}}

View File

@ -0,0 +1,64 @@
/***************************************************************************
*
* Project _____ __ ____ _ _
* ( _ ) /__\ (_ _)_| |_ _| |_
* )(_)( /(__)\ )( (_ _)(_ _)
* (_____)(__)(__)(__) |_| |_|
*
*
* Copyright 2018-present, Leonid Stryzhevskyi, <lganzzzo@gmail.com>
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
***************************************************************************/
#ifndef oatpp_network_virtual__server_ConnectionProvider_hpp
#define oatpp_network_virtual__server_ConnectionProvider_hpp
#include "oatpp/network/virtual_/Interface.hpp"
#include "oatpp/network/ConnectionProvider.hpp"
namespace oatpp { namespace network { namespace virtual_ { namespace server {
class ConnectionProvider : public oatpp::network::ServerConnectionProvider {
private:
std::shared_ptr<virtual_::Interface> m_interface;
public:
ConnectionProvider(const std::shared_ptr<virtual_::Interface>& interface)
: m_interface(interface)
{
setProperty(PROPERTY_HOST, m_interface->getName());
setProperty(PROPERTY_PORT, "0");
}
std::shared_ptr<IOStream> getConnection() override;
Action getConnectionAsync(oatpp::async::AbstractCoroutine* parentCoroutine,
AsyncCallback callback) override {
/**
* No need to implement this.
* For Asynchronous IO in oatpp it is considered to be a good practice
* to accept connections in a seperate thread with the blocking accept()
* and then process connections in Asynchronous manner with non-blocking read/write
*
* It may be implemented later
*/
throw std::runtime_error("[oatpp::network::virtual_::server::ConnectionProvider::getConnectionAsync()] not implemented.");
}
};
}}}}
#endif /* oatpp_network_virtual__server_ConnectionProvider_hpp */

View File

@ -96,9 +96,10 @@ HttpRequestExecutor::execute(const String& method,
}
auto request = oatpp::web::protocol::http::outgoing::Request::createShared(method, path, headers, body);
request->headers->putIfNotExists(oatpp::web::protocol::http::Header::HOST, m_connectionProvider->getHost());
request->headers->putIfNotExists(oatpp::web::protocol::http::Header::CONNECTION,
oatpp::web::protocol::http::Header::Value::CONNECTION_KEEP_ALIVE);
request->headers->putIfNotExists(oatpp::String(oatpp::web::protocol::http::Header::HOST, false),
m_connectionProvider->getProperty(oatpp::String("host", false)));
request->headers->putIfNotExists(oatpp::String(oatpp::web::protocol::http::Header::CONNECTION, false),
oatpp::String(oatpp::web::protocol::http::Header::Value::CONNECTION_KEEP_ALIVE, false));
auto ioBuffer = oatpp::data::buffer::IOBuffer::createShared();
@ -197,7 +198,7 @@ oatpp::async::Action HttpRequestExecutor::executeAsync(oatpp::async::AbstractCor
Action onConnectionReady(const std::shared_ptr<oatpp::data::stream::IOStream>& connection) {
m_connection = connection;
auto request = oatpp::web::protocol::http::outgoing::Request::createShared(m_method, m_path, m_headers, m_body);
request->headers->putIfNotExists(String(Header::HOST, false), m_connectionProvider->getHost());
request->headers->putIfNotExists(String(Header::HOST, false), m_connectionProvider->getProperty(String("host", false)));
request->headers->putIfNotExists(String(Header::CONNECTION, false), String(Header::Value::CONNECTION_KEEP_ALIVE, false));
m_ioBuffer = oatpp::data::buffer::IOBuffer::createShared();
auto upStream = oatpp::data::stream::OutputStreamBufferedProxy::createShared(connection, m_ioBuffer);