mirror of
https://github.com/oatpp/oatpp.git
synced 2025-03-19 18:10:23 +08:00
Introduce stream I/O Mode
This commit is contained in:
parent
b14313ddbd
commit
d8200cd28b
@ -143,10 +143,19 @@ data::v_io_size ChunkedBuffer::write(const void *data, data::v_io_size count){
|
||||
return count;
|
||||
|
||||
}
|
||||
|
||||
void ChunkedBuffer::setOutputStreamIOMode(IOMode ioMode) {
|
||||
m_ioMode = ioMode;
|
||||
}
|
||||
|
||||
IOMode ChunkedBuffer::getOutputStreamIOMode() {
|
||||
return m_ioMode;
|
||||
}
|
||||
|
||||
data::v_io_size ChunkedBuffer::readSubstring(void *buffer,
|
||||
data::v_io_size pos,
|
||||
data::v_io_size count) {
|
||||
data::v_io_size pos,
|
||||
data::v_io_size count)
|
||||
{
|
||||
|
||||
if(pos < 0 || pos >= m_size){
|
||||
return 0;
|
||||
|
@ -106,6 +106,7 @@ private:
|
||||
data::v_io_size m_chunkPos;
|
||||
ChunkEntry* m_firstEntry;
|
||||
ChunkEntry* m_lastEntry;
|
||||
IOMode m_ioMode;
|
||||
|
||||
private:
|
||||
|
||||
@ -166,6 +167,18 @@ public:
|
||||
*/
|
||||
data::v_io_size write(const void *data, data::v_io_size count) override;
|
||||
|
||||
/**
|
||||
* Set stream I/O mode.
|
||||
* @param ioMode
|
||||
*/
|
||||
void setOutputStreamIOMode(IOMode ioMode) override;
|
||||
|
||||
/**
|
||||
* Set stream I/O mode.
|
||||
* @return
|
||||
*/
|
||||
IOMode getOutputStreamIOMode() override;
|
||||
|
||||
/**
|
||||
* Read part of ChunkedBuffer to buffer.
|
||||
* @param buffer - buffer to write data to.
|
||||
|
@ -32,6 +32,11 @@
|
||||
|
||||
namespace oatpp { namespace data{ namespace stream {
|
||||
|
||||
enum IOMode : v_int32 {
|
||||
BLOCKING = 0,
|
||||
NON_BLOCKING = 1
|
||||
};
|
||||
|
||||
/**
|
||||
* Output Stream.
|
||||
*/
|
||||
@ -52,6 +57,18 @@ public:
|
||||
*/
|
||||
virtual data::v_io_size write(const void *data, data::v_io_size count) = 0;
|
||||
|
||||
/**
|
||||
* Set stream I/O mode.
|
||||
* @throws
|
||||
*/
|
||||
virtual void setOutputStreamIOMode(IOMode ioMode) = 0;
|
||||
|
||||
/**
|
||||
* Get stream I/O mode.
|
||||
* @return
|
||||
*/
|
||||
virtual IOMode getOutputStreamIOMode() = 0;
|
||||
|
||||
/**
|
||||
* Same as `write((p_char8)data, std::strlen(data));`.
|
||||
* @param data - data to write.
|
||||
@ -100,6 +117,19 @@ public:
|
||||
* @return - actual number of bytes read.
|
||||
*/
|
||||
virtual data::v_io_size read(void *data, data::v_io_size count) = 0;
|
||||
|
||||
/**
|
||||
* Set stream I/O mode.
|
||||
* @throws
|
||||
*/
|
||||
virtual void setInputStreamIOMode(IOMode ioMode) = 0;
|
||||
|
||||
/**
|
||||
* Get stream I/O mode.
|
||||
* @return
|
||||
*/
|
||||
virtual IOMode getInputStreamIOMode() = 0;
|
||||
|
||||
};
|
||||
|
||||
/**
|
||||
@ -126,7 +156,7 @@ public:
|
||||
public:
|
||||
|
||||
static std::shared_ptr<CompoundIOStream> createShared(const std::shared_ptr<OutputStream>& outputStream,
|
||||
const std::shared_ptr<InputStream>& inputStream){
|
||||
const std::shared_ptr<InputStream>& inputStream){
|
||||
return Shared_CompoundIOStream_Pool::allocateShared(outputStream, inputStream);
|
||||
}
|
||||
|
||||
@ -137,6 +167,22 @@ public:
|
||||
data::v_io_size read(void *data, data::v_io_size count) override {
|
||||
return m_inputStream->read(data, count);
|
||||
}
|
||||
|
||||
void setOutputStreamIOMode(IOMode ioMode) override {
|
||||
m_outputStream->setOutputStreamIOMode(ioMode);
|
||||
}
|
||||
|
||||
IOMode getOutputStreamIOMode() override {
|
||||
return m_outputStream->getOutputStreamIOMode();
|
||||
}
|
||||
|
||||
void setInputStreamIOMode(IOMode ioMode) override {
|
||||
m_inputStream->setInputStreamIOMode(ioMode);
|
||||
}
|
||||
|
||||
IOMode getInputStreamIOMode() override {
|
||||
return m_inputStream->getInputStreamIOMode();
|
||||
}
|
||||
|
||||
};
|
||||
|
||||
@ -180,7 +226,7 @@ public:
|
||||
* @return - actual number of bytes written. &id:oatpp::data::v_io_size;. <br>
|
||||
*/
|
||||
data::v_io_size writeAsString(bool value);
|
||||
|
||||
|
||||
};
|
||||
|
||||
ConsistentOutputStream& operator << (ConsistentOutputStream& s, const oatpp::String& str);
|
||||
|
@ -38,6 +38,14 @@ data::v_io_size OutputStreamBufferedProxy::write(const void *data, data::v_io_si
|
||||
}
|
||||
}
|
||||
|
||||
void OutputStreamBufferedProxy::setOutputStreamIOMode(oatpp::data::stream::IOMode ioMode) {
|
||||
m_outputStream->setOutputStreamIOMode(ioMode);
|
||||
}
|
||||
|
||||
oatpp::data::stream::IOMode OutputStreamBufferedProxy::getOutputStreamIOMode() {
|
||||
return m_outputStream->getOutputStreamIOMode();
|
||||
}
|
||||
|
||||
data::v_io_size OutputStreamBufferedProxy::flush() {
|
||||
return m_buffer->flushToStream(*m_outputStream);
|
||||
}
|
||||
@ -59,5 +67,13 @@ data::v_io_size InputStreamBufferedProxy::read(void *data, data::v_io_size count
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
void InputStreamBufferedProxy::setInputStreamIOMode(oatpp::data::stream::IOMode ioMode) {
|
||||
m_inputStream->setInputStreamIOMode(ioMode);
|
||||
}
|
||||
|
||||
oatpp::data::stream::IOMode InputStreamBufferedProxy::getInputStreamIOMode() {
|
||||
return m_inputStream->getInputStreamIOMode();
|
||||
}
|
||||
|
||||
}}}
|
||||
|
@ -73,6 +73,19 @@ public:
|
||||
}
|
||||
|
||||
data::v_io_size write(const void *data, data::v_io_size count) override;
|
||||
|
||||
/**
|
||||
* Set OutputStream I/O mode.
|
||||
* @param ioMode
|
||||
*/
|
||||
void setOutputStreamIOMode(oatpp::data::stream::IOMode ioMode) override;
|
||||
|
||||
/**
|
||||
* Set OutputStream I/O mode.
|
||||
* @return
|
||||
*/
|
||||
oatpp::data::stream::IOMode getOutputStreamIOMode() override;
|
||||
|
||||
data::v_io_size flush();
|
||||
oatpp::async::CoroutineStarter flushAsync();
|
||||
|
||||
@ -160,6 +173,18 @@ public:
|
||||
|
||||
data::v_io_size read(void *data, data::v_io_size count) override;
|
||||
|
||||
/**
|
||||
* Set InputStream I/O mode.
|
||||
* @param ioMode
|
||||
*/
|
||||
void setInputStreamIOMode(oatpp::data::stream::IOMode ioMode) override;
|
||||
|
||||
/**
|
||||
* Get InputStream I/O mode.
|
||||
* @return
|
||||
*/
|
||||
oatpp::data::stream::IOMode getInputStreamIOMode() override;
|
||||
|
||||
void setBufferPosition(data::v_io_size readPosition, data::v_io_size writePosition, bool canRead) {
|
||||
m_buffer.setBufferPosition(readPosition, writePosition, canRead);
|
||||
}
|
||||
|
@ -28,6 +28,7 @@
|
||||
#include <sys/socket.h>
|
||||
#include <thread>
|
||||
#include <chrono>
|
||||
#include <fcntl.h>
|
||||
|
||||
namespace oatpp { namespace network {
|
||||
|
||||
@ -83,6 +84,63 @@ data::v_io_size Connection::read(void *buff, data::v_io_size count){
|
||||
return result;
|
||||
}
|
||||
|
||||
void Connection::setStreamIOMode(oatpp::data::stream::IOMode ioMode) {
|
||||
|
||||
auto flags = fcntl(m_handle, F_GETFL);
|
||||
if (flags < 0) {
|
||||
throw std::runtime_error("[oatpp::network::Connection::setStreamIOMode()]: Error. Can't get socket flags.");
|
||||
}
|
||||
|
||||
switch(ioMode) {
|
||||
|
||||
case oatpp::data::stream::IOMode::BLOCKING:
|
||||
flags = flags & (~O_NONBLOCK);
|
||||
if (fcntl(m_handle, F_SETFL, flags) < 0) {
|
||||
throw std::runtime_error("[oatpp::network::Connection::setStreamIOMode()]: Error. Can't set stream I/O mode to IOMode::BLOCKING.");
|
||||
}
|
||||
break;
|
||||
|
||||
case oatpp::data::stream::IOMode::NON_BLOCKING:
|
||||
flags = (flags | O_NONBLOCK);
|
||||
if (fcntl(m_handle, F_SETFL, flags) < 0) {
|
||||
throw std::runtime_error("[oatpp::network::Connection::setStreamIOMode()]: Error. Can't set stream I/O mode to IOMode::NON_BLOCKING.");
|
||||
}
|
||||
break;
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
oatpp::data::stream::IOMode Connection::getStreamIOMode() {
|
||||
|
||||
auto flags = fcntl(m_handle, F_GETFL);
|
||||
if (flags < 0) {
|
||||
throw std::runtime_error("[oatpp::network::Connection::getStreamIOMode()]: Error. Can't get socket flags.");
|
||||
}
|
||||
|
||||
if((flags & O_NONBLOCK) > 0) {
|
||||
return oatpp::data::stream::IOMode::NON_BLOCKING;
|
||||
}
|
||||
|
||||
return oatpp::data::stream::IOMode::BLOCKING;
|
||||
|
||||
}
|
||||
|
||||
void Connection::setOutputStreamIOMode(oatpp::data::stream::IOMode ioMode) {
|
||||
setStreamIOMode(ioMode);
|
||||
}
|
||||
|
||||
oatpp::data::stream::IOMode Connection::getOutputStreamIOMode() {
|
||||
return getStreamIOMode();
|
||||
}
|
||||
|
||||
void Connection::setInputStreamIOMode(oatpp::data::stream::IOMode ioMode) {
|
||||
setStreamIOMode(ioMode);
|
||||
}
|
||||
|
||||
oatpp::data::stream::IOMode Connection::getInputStreamIOMode() {
|
||||
return getStreamIOMode();
|
||||
}
|
||||
|
||||
void Connection::close(){
|
||||
::close(m_handle);
|
||||
}
|
||||
|
@ -39,6 +39,9 @@ public:
|
||||
SHARED_OBJECT_POOL(Shared_Connection_Pool, Connection, 32);
|
||||
private:
|
||||
data::v_io_handle m_handle;
|
||||
private:
|
||||
void setStreamIOMode(oatpp::data::stream::IOMode ioMode);
|
||||
oatpp::data::stream::IOMode getStreamIOMode();
|
||||
public:
|
||||
/**
|
||||
* Constructor.
|
||||
@ -78,6 +81,30 @@ public:
|
||||
*/
|
||||
data::v_io_size read(void *buff, data::v_io_size count) override;
|
||||
|
||||
/**
|
||||
* Set OutputStream I/O mode.
|
||||
* @param ioMode
|
||||
*/
|
||||
void setOutputStreamIOMode(oatpp::data::stream::IOMode ioMode) override;
|
||||
|
||||
/**
|
||||
* Set OutputStream I/O mode.
|
||||
* @return
|
||||
*/
|
||||
oatpp::data::stream::IOMode getOutputStreamIOMode() override;
|
||||
|
||||
/**
|
||||
* Set InputStream I/O mode.
|
||||
* @param ioMode
|
||||
*/
|
||||
void setInputStreamIOMode(oatpp::data::stream::IOMode ioMode) override;
|
||||
|
||||
/**
|
||||
* Get InputStream I/O mode.
|
||||
* @return
|
||||
*/
|
||||
oatpp::data::stream::IOMode getInputStreamIOMode() override;
|
||||
|
||||
/**
|
||||
* Close socket handle.
|
||||
*/
|
||||
|
@ -37,9 +37,8 @@
|
||||
|
||||
namespace oatpp { namespace network { namespace server {
|
||||
|
||||
SimpleTCPConnectionProvider::SimpleTCPConnectionProvider(v_word16 port, bool nonBlocking)
|
||||
SimpleTCPConnectionProvider::SimpleTCPConnectionProvider(v_word16 port)
|
||||
: m_port(port)
|
||||
, m_nonBlocking(nonBlocking)
|
||||
, m_closed(false)
|
||||
{
|
||||
m_serverHandle = instantiateServer();
|
||||
@ -125,13 +124,6 @@ std::shared_ptr<oatpp::data::stream::IOStream> SimpleTCPConnectionProvider::getC
|
||||
}
|
||||
#endif
|
||||
|
||||
int flags = 0;
|
||||
if(m_nonBlocking) {
|
||||
flags |= O_NONBLOCK;
|
||||
}
|
||||
|
||||
fcntl(handle, F_SETFL, flags);
|
||||
|
||||
return Connection::createShared(handle);
|
||||
|
||||
}
|
||||
|
@ -38,29 +38,28 @@ namespace oatpp { namespace network { namespace server {
|
||||
class SimpleTCPConnectionProvider : public base::Countable, public ServerConnectionProvider {
|
||||
private:
|
||||
v_word16 m_port;
|
||||
bool m_nonBlocking;
|
||||
bool m_closed;
|
||||
oatpp::data::v_io_handle m_serverHandle;
|
||||
private:
|
||||
oatpp::data::v_io_handle instantiateServer();
|
||||
public:
|
||||
|
||||
/**
|
||||
* Constructor.
|
||||
* @param port - port to listen for incoming connections.
|
||||
* @param nonBlocking - set `true` to provide non-blocking &id:oatpp::data::stream::IOStream; for connection.
|
||||
* `false` for blocking &id:oatpp::data::stream::IOStream;. Default `false`.
|
||||
* @param port
|
||||
*/
|
||||
SimpleTCPConnectionProvider(v_word16 port, bool nonBlocking = false);
|
||||
SimpleTCPConnectionProvider(v_word16 port);
|
||||
public:
|
||||
|
||||
/**
|
||||
* Create shared SimpleTCPConnectionProvider.
|
||||
* @param port - port to listen for incoming connections.
|
||||
* @param nonBlocking - set `true` to provide non-blocking &id:oatpp::data::stream::IOStream; for connection.
|
||||
* `false` for blocking &id:oatpp::data::stream::IOStream;. Default `false`.
|
||||
* @param port
|
||||
* @return - `std::shared_ptr` to SimpleTCPConnectionProvider.
|
||||
*/
|
||||
static std::shared_ptr<SimpleTCPConnectionProvider> createShared(v_word16 port, bool nonBlocking = false){
|
||||
return std::make_shared<SimpleTCPConnectionProvider>(port, nonBlocking);
|
||||
static std::shared_ptr<SimpleTCPConnectionProvider> createShared(v_word16 port){
|
||||
return std::make_shared<SimpleTCPConnectionProvider>(port);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -25,7 +25,15 @@
|
||||
#include "Pipe.hpp"
|
||||
|
||||
namespace oatpp { namespace network { namespace virtual_ {
|
||||
|
||||
|
||||
void Pipe::Reader::setInputStreamIOMode(oatpp::data::stream::IOMode ioMode) {
|
||||
m_ioMode = ioMode;
|
||||
}
|
||||
|
||||
oatpp::data::stream::IOMode Pipe::Reader::getInputStreamIOMode() {
|
||||
return m_ioMode;
|
||||
}
|
||||
|
||||
void Pipe::Reader::setMaxAvailableToRead(data::v_io_size maxAvailableToRead) {
|
||||
m_maxAvailableToRead = maxAvailableToRead;
|
||||
}
|
||||
@ -39,7 +47,7 @@ data::v_io_size Pipe::Reader::read(void *data, data::v_io_size count) {
|
||||
Pipe& pipe = *m_pipe;
|
||||
oatpp::data::v_io_size result;
|
||||
|
||||
if(m_nonBlocking) {
|
||||
if(m_ioMode == oatpp::data::stream::IOMode::NON_BLOCKING) {
|
||||
std::unique_lock<std::mutex> lock(pipe.m_mutex, std::try_to_lock);
|
||||
if(lock.owns_lock()) {
|
||||
if (pipe.m_fifo.availableToRead() > 0) {
|
||||
@ -70,6 +78,15 @@ data::v_io_size Pipe::Reader::read(void *data, data::v_io_size count) {
|
||||
|
||||
}
|
||||
|
||||
|
||||
void Pipe::Writer::setOutputStreamIOMode(oatpp::data::stream::IOMode ioMode) {
|
||||
m_ioMode = ioMode;
|
||||
}
|
||||
|
||||
oatpp::data::stream::IOMode Pipe::Writer::getOutputStreamIOMode() {
|
||||
return m_ioMode;
|
||||
}
|
||||
|
||||
void Pipe::Writer::setMaxAvailableToWrite(data::v_io_size maxAvailableToWrite) {
|
||||
m_maxAvailableToWrtie = maxAvailableToWrite;
|
||||
}
|
||||
@ -83,7 +100,7 @@ data::v_io_size Pipe::Writer::write(const void *data, data::v_io_size count) {
|
||||
Pipe& pipe = *m_pipe;
|
||||
oatpp::data::v_io_size result;
|
||||
|
||||
if(m_nonBlocking) {
|
||||
if(m_ioMode == oatpp::data::stream::IOMode::NON_BLOCKING) {
|
||||
std::unique_lock<std::mutex> lock(pipe.m_mutex, std::try_to_lock);
|
||||
if(lock.owns_lock()) {
|
||||
if (pipe.m_fifo.availableToWrite() > 0) {
|
||||
|
@ -52,7 +52,7 @@ public:
|
||||
friend Pipe;
|
||||
private:
|
||||
Pipe* m_pipe;
|
||||
bool m_nonBlocking;
|
||||
oatpp::data::stream::IOMode m_ioMode;
|
||||
|
||||
/*
|
||||
* this one used for testing purposes only
|
||||
@ -60,22 +60,14 @@ public:
|
||||
data::v_io_size m_maxAvailableToRead;
|
||||
protected:
|
||||
|
||||
Reader(Pipe* pipe, bool nonBlocking = false)
|
||||
Reader(Pipe* pipe, oatpp::data::stream::IOMode ioMode = oatpp::data::stream::IOMode::BLOCKING)
|
||||
: m_pipe(pipe)
|
||||
, m_nonBlocking(nonBlocking)
|
||||
, m_ioMode(ioMode)
|
||||
, m_maxAvailableToRead(-1)
|
||||
{}
|
||||
|
||||
public:
|
||||
|
||||
/**
|
||||
* Set `true` to make non-blocking reads using &l:Pipe::Reader::read ();.
|
||||
* @param nonBlocking - `true` for nonblocking read.
|
||||
*/
|
||||
void setNonBlocking(bool nonBlocking) {
|
||||
m_nonBlocking = nonBlocking;
|
||||
}
|
||||
|
||||
/**
|
||||
* Limit the available amount of bytes to read from pipe.<br>
|
||||
* This method is used for testing purposes only.<br>
|
||||
@ -92,6 +84,18 @@ public:
|
||||
* @return - &id:oatpp::data::v_io_size;.
|
||||
*/
|
||||
data::v_io_size read(void *data, data::v_io_size count) override;
|
||||
|
||||
/**
|
||||
* Set InputStream I/O mode.
|
||||
* @param ioMode
|
||||
*/
|
||||
void setInputStreamIOMode(oatpp::data::stream::IOMode ioMode) override;
|
||||
|
||||
/**
|
||||
* Get InputStream I/O mode.
|
||||
* @return
|
||||
*/
|
||||
oatpp::data::stream::IOMode getInputStreamIOMode() override;
|
||||
|
||||
};
|
||||
|
||||
@ -103,7 +107,7 @@ public:
|
||||
friend Pipe;
|
||||
private:
|
||||
Pipe* m_pipe;
|
||||
bool m_nonBlocking;
|
||||
oatpp::data::stream::IOMode m_ioMode;
|
||||
|
||||
/*
|
||||
* this one used for testing purposes only
|
||||
@ -111,22 +115,14 @@ public:
|
||||
data::v_io_size m_maxAvailableToWrtie;
|
||||
protected:
|
||||
|
||||
Writer(Pipe* pipe, bool nonBlocking = false)
|
||||
Writer(Pipe* pipe, oatpp::data::stream::IOMode ioMode = oatpp::data::stream::IOMode::BLOCKING)
|
||||
: m_pipe(pipe)
|
||||
, m_nonBlocking(nonBlocking)
|
||||
, m_ioMode(ioMode)
|
||||
, m_maxAvailableToWrtie(-1)
|
||||
{}
|
||||
|
||||
public:
|
||||
|
||||
/**
|
||||
* Set `true` to make non-blocking writes using &l:Pipe::Writer::write ();.
|
||||
* @param nonBlocking - `true` for nonblocking write.
|
||||
*/
|
||||
void setNonBlocking(bool nonBlocking) {
|
||||
m_nonBlocking = nonBlocking;
|
||||
}
|
||||
|
||||
/**
|
||||
* Limit the available space for data writes in pipe.<br>
|
||||
* This method is used for testing purposes only.<br>
|
||||
@ -143,6 +139,18 @@ public:
|
||||
* @return - &id:oatpp::data::v_io_size;.
|
||||
*/
|
||||
data::v_io_size write(const void *data, data::v_io_size count) override;
|
||||
|
||||
/**
|
||||
* Set OutputStream I/O mode.
|
||||
* @param ioMode
|
||||
*/
|
||||
void setOutputStreamIOMode(oatpp::data::stream::IOMode ioMode) override;
|
||||
|
||||
/**
|
||||
* Set OutputStream I/O mode.
|
||||
* @return
|
||||
*/
|
||||
oatpp::data::stream::IOMode getOutputStreamIOMode() override;
|
||||
|
||||
};
|
||||
|
||||
|
@ -52,9 +52,20 @@ data::v_io_size Socket::write(const void *data, data::v_io_size count) {
|
||||
return m_pipeOut->getWriter()->write(data, count);
|
||||
}
|
||||
|
||||
void Socket::setNonBlocking(bool nonBlocking) {
|
||||
m_pipeIn->getReader()->setNonBlocking(nonBlocking);
|
||||
m_pipeOut->getWriter()->setNonBlocking(nonBlocking);
|
||||
void Socket::setOutputStreamIOMode(oatpp::data::stream::IOMode ioMode) {
|
||||
m_pipeOut->getWriter()->setOutputStreamIOMode(ioMode);
|
||||
}
|
||||
|
||||
oatpp::data::stream::IOMode Socket::getOutputStreamIOMode() {
|
||||
return m_pipeOut->getWriter()->getOutputStreamIOMode();
|
||||
}
|
||||
|
||||
void Socket::setInputStreamIOMode(oatpp::data::stream::IOMode ioMode) {
|
||||
m_pipeIn->getReader()->setInputStreamIOMode(ioMode);
|
||||
}
|
||||
|
||||
oatpp::data::stream::IOMode Socket::getInputStreamIOMode() {
|
||||
return m_pipeIn->getReader()->getInputStreamIOMode();
|
||||
}
|
||||
|
||||
void Socket::close() {
|
||||
|
@ -85,10 +85,28 @@ public:
|
||||
data::v_io_size write(const void *data, data::v_io_size count) override;
|
||||
|
||||
/**
|
||||
* Set socket for nonblocking I/O.
|
||||
* @param nonBlocking - `true` for nonblocking.
|
||||
* Set OutputStream I/O mode.
|
||||
* @param ioMode
|
||||
*/
|
||||
void setNonBlocking(bool nonBlocking);
|
||||
void setOutputStreamIOMode(oatpp::data::stream::IOMode ioMode) override;
|
||||
|
||||
/**
|
||||
* Set OutputStream I/O mode.
|
||||
* @return
|
||||
*/
|
||||
oatpp::data::stream::IOMode getOutputStreamIOMode() override;
|
||||
|
||||
/**
|
||||
* Set InputStream I/O mode.
|
||||
* @param ioMode
|
||||
*/
|
||||
void setInputStreamIOMode(oatpp::data::stream::IOMode ioMode) override;
|
||||
|
||||
/**
|
||||
* Get InputStream I/O mode.
|
||||
* @return
|
||||
*/
|
||||
oatpp::data::stream::IOMode getInputStreamIOMode() override;
|
||||
|
||||
/**
|
||||
* Close socket pipes.
|
||||
|
@ -46,7 +46,8 @@ void ConnectionProvider::close() {
|
||||
std::shared_ptr<ConnectionProvider::IOStream> ConnectionProvider::getConnection() {
|
||||
auto submission = m_interface->connect();
|
||||
auto socket = submission->getSocket();
|
||||
socket->setNonBlocking(false);
|
||||
socket->setOutputStreamIOMode(oatpp::data::stream::IOMode::BLOCKING);
|
||||
socket->setInputStreamIOMode(oatpp::data::stream::IOMode::BLOCKING);
|
||||
socket->setMaxAvailableToReadWrtie(m_maxAvailableToRead, m_maxAvailableToWrite);
|
||||
return socket;
|
||||
}
|
||||
@ -84,7 +85,8 @@ oatpp::async::CoroutineStarterForResult<const std::shared_ptr<oatpp::data::strea
|
||||
auto socket = m_submission->getSocketNonBlocking();
|
||||
|
||||
if(socket) {
|
||||
socket->setNonBlocking(true);
|
||||
socket->setOutputStreamIOMode(oatpp::data::stream::IOMode::NON_BLOCKING);
|
||||
socket->setInputStreamIOMode(oatpp::data::stream::IOMode::NON_BLOCKING);
|
||||
socket->setMaxAvailableToReadWrtie(m_maxAvailableToRead, m_maxAvailableToWrite);
|
||||
return _return(socket);
|
||||
}
|
||||
|
@ -26,9 +26,8 @@
|
||||
|
||||
namespace oatpp { namespace network { namespace virtual_ { namespace server {
|
||||
|
||||
ConnectionProvider::ConnectionProvider(const std::shared_ptr<virtual_::Interface>& interface, bool nonBlocking)
|
||||
ConnectionProvider::ConnectionProvider(const std::shared_ptr<virtual_::Interface>& interface)
|
||||
: m_interface(interface)
|
||||
, m_nonBlocking(nonBlocking)
|
||||
, m_open(true)
|
||||
, m_maxAvailableToRead(-1)
|
||||
, m_maxAvailableToWrite(-1)
|
||||
@ -37,8 +36,8 @@ ConnectionProvider::ConnectionProvider(const std::shared_ptr<virtual_::Interface
|
||||
setProperty(PROPERTY_PORT, "0");
|
||||
}
|
||||
|
||||
std::shared_ptr<ConnectionProvider> ConnectionProvider::createShared(const std::shared_ptr<virtual_::Interface>& interface, bool nonBlocking) {
|
||||
return std::make_shared<ConnectionProvider>(interface, nonBlocking);
|
||||
std::shared_ptr<ConnectionProvider> ConnectionProvider::createShared(const std::shared_ptr<virtual_::Interface>& interface) {
|
||||
return std::make_shared<ConnectionProvider>(interface);
|
||||
}
|
||||
|
||||
void ConnectionProvider::setSocketMaxAvailableToReadWrtie(data::v_io_size maxToRead, data::v_io_size maxToWrite) {
|
||||
@ -54,7 +53,6 @@ void ConnectionProvider::close() {
|
||||
std::shared_ptr<ConnectionProvider::IOStream> ConnectionProvider::getConnection() {
|
||||
auto socket = m_interface->accept(m_open);
|
||||
if(socket) {
|
||||
socket->setNonBlocking(m_nonBlocking);
|
||||
socket->setMaxAvailableToReadWrtie(m_maxAvailableToRead, m_maxAvailableToWrite);
|
||||
}
|
||||
return socket;
|
||||
|
@ -38,7 +38,6 @@ namespace oatpp { namespace network { namespace virtual_ { namespace server {
|
||||
class ConnectionProvider : public oatpp::network::ServerConnectionProvider {
|
||||
private:
|
||||
std::shared_ptr<virtual_::Interface> m_interface;
|
||||
bool m_nonBlocking;
|
||||
bool m_open;
|
||||
data::v_io_size m_maxAvailableToRead;
|
||||
data::v_io_size m_maxAvailableToWrite;
|
||||
@ -47,17 +46,15 @@ public:
|
||||
/**
|
||||
* Constructor.
|
||||
* @param interface - &id:oatpp::network::virtual_::Interface;.
|
||||
* @param nonBlocking - `true` to set non blocking regime for provided connections.
|
||||
*/
|
||||
ConnectionProvider(const std::shared_ptr<virtual_::Interface>& interface, bool nonBlocking = false);
|
||||
ConnectionProvider(const std::shared_ptr<virtual_::Interface>& interface);
|
||||
|
||||
/**
|
||||
* Create shared ConnectionProvider.
|
||||
* @param interface - &id:oatpp::network::virtual_::Interface;.
|
||||
* @param nonBlocking - `true` to set non blocking regime for provided connections.
|
||||
* @return - `std::shared_ptr` to ConnectionProvider.
|
||||
*/
|
||||
static std::shared_ptr<ConnectionProvider> createShared(const std::shared_ptr<virtual_::Interface>& interface, bool nonBlocking = false);
|
||||
static std::shared_ptr<ConnectionProvider> createShared(const std::shared_ptr<virtual_::Interface>& interface);
|
||||
|
||||
/**
|
||||
* Limit the available amount of bytes to read from socket and limit the available amount of bytes to write to socket. <br>
|
||||
|
@ -66,6 +66,9 @@ void AsyncHttpConnectionHandler::addRequestInterceptor(const std::shared_ptr<han
|
||||
}
|
||||
|
||||
void AsyncHttpConnectionHandler::handleConnection(const std::shared_ptr<oatpp::data::stream::IOStream>& connection){
|
||||
|
||||
connection->setOutputStreamIOMode(oatpp::data::stream::IOMode::NON_BLOCKING);
|
||||
connection->setInputStreamIOMode(oatpp::data::stream::IOMode::NON_BLOCKING);
|
||||
|
||||
auto ioBuffer = oatpp::data::buffer::IOBuffer::createShared();
|
||||
auto outStream = oatpp::data::stream::OutputStreamBufferedProxy::createShared(connection, ioBuffer);
|
||||
|
@ -111,7 +111,10 @@ void HttpConnectionHandler::addRequestInterceptor(const std::shared_ptr<handler:
|
||||
}
|
||||
|
||||
void HttpConnectionHandler::handleConnection(const std::shared_ptr<oatpp::data::stream::IOStream>& connection){
|
||||
|
||||
|
||||
connection->setOutputStreamIOMode(oatpp::data::stream::IOMode::BLOCKING);
|
||||
connection->setInputStreamIOMode(oatpp::data::stream::IOMode::BLOCKING);
|
||||
|
||||
/* Create working thread */
|
||||
std::thread thread(&Task::run, Task(m_router.get(), connection, m_bodyDecoder, m_errorHandler, &m_requestInterceptors));
|
||||
|
||||
|
@ -37,6 +37,8 @@
|
||||
#include <iostream>
|
||||
#include <mutex>
|
||||
|
||||
#include "oatpp/core/data/stream/Stream.hpp"
|
||||
|
||||
#ifdef OATPP_ENABLE_ALL_TESTS_MAIN
|
||||
namespace {
|
||||
|
||||
@ -52,10 +54,16 @@ public:
|
||||
|
||||
};
|
||||
|
||||
|
||||
|
||||
void runTests() {
|
||||
|
||||
oatpp::base::Environment::printCompilationConfig();
|
||||
|
||||
OATPP_LOGD("test", "osSize=%d", sizeof(oatpp::data::stream::OutputStream));
|
||||
OATPP_LOGD("test", "isSize=%d", sizeof(oatpp::data::stream::InputStream));
|
||||
|
||||
/*
|
||||
OATPP_RUN_TEST(oatpp::test::base::RegRuleTest);
|
||||
OATPP_RUN_TEST(oatpp::test::base::CommandLineArgumentsTest);
|
||||
|
||||
@ -81,6 +89,7 @@ void runTests() {
|
||||
OATPP_RUN_TEST(oatpp::test::network::virtual_::InterfaceTest);
|
||||
|
||||
OATPP_RUN_TEST(oatpp::test::web::server::api::ApiControllerTest);
|
||||
*/
|
||||
OATPP_RUN_TEST(oatpp::test::web::FullTest);
|
||||
|
||||
OATPP_RUN_TEST(oatpp::test::web::FullAsyncTest);
|
||||
|
@ -67,10 +67,10 @@ public:
|
||||
|
||||
OATPP_CREATE_COMPONENT(std::shared_ptr<oatpp::network::ServerConnectionProvider>, serverConnectionProvider)([this] {
|
||||
#ifdef OATPP_TEST_USE_PORT
|
||||
return oatpp::network::server::SimpleTCPConnectionProvider::createShared(OATPP_TEST_USE_PORT, true /* nonBlocking */);
|
||||
return oatpp::network::server::SimpleTCPConnectionProvider::createShared(OATPP_TEST_USE_PORT);
|
||||
#else
|
||||
OATPP_COMPONENT(std::shared_ptr<oatpp::network::virtual_::Interface>, interface);
|
||||
return oatpp::network::virtual_::server::ConnectionProvider::createShared(interface, true /* nonBlocking */);
|
||||
return oatpp::network::virtual_::server::ConnectionProvider::createShared(interface);
|
||||
#endif
|
||||
}());
|
||||
|
||||
|
@ -61,10 +61,10 @@ public:
|
||||
|
||||
OATPP_CREATE_COMPONENT(std::shared_ptr<oatpp::network::ServerConnectionProvider>, serverConnectionProvider)([this] {
|
||||
#ifdef OATPP_TEST_USE_PORT
|
||||
return oatpp::network::server::SimpleTCPConnectionProvider::createShared(OATPP_TEST_USE_PORT, true /* nonBlocking */);
|
||||
return oatpp::network::server::SimpleTCPConnectionProvider::createShared(OATPP_TEST_USE_PORT);
|
||||
#else
|
||||
OATPP_COMPONENT(std::shared_ptr<oatpp::network::virtual_::Interface>, interface);
|
||||
return oatpp::network::virtual_::server::ConnectionProvider::createShared(interface, true /* nonBlocking */);
|
||||
return oatpp::network::virtual_::server::ConnectionProvider::createShared(interface);
|
||||
#endif
|
||||
}());
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user