Stream Context. Input/Output stream return Context by reference.

This commit is contained in:
lganzzzo 2019-12-10 16:22:52 +02:00
parent b456f043cc
commit 8fd96027e5
23 changed files with 97 additions and 67 deletions

View File

@ -29,6 +29,8 @@ namespace oatpp { namespace data{ namespace stream {
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// BufferOutputStream
data::stream::DefaultInitializedContext BufferOutputStream::DEFAULT_CONTEXT(data::stream::StreamType::STREAM_INFINITE);
BufferOutputStream::BufferOutputStream(v_buff_size initialCapacity, v_buff_size growBytes)
: m_data(new v_char8[initialCapacity])
, m_capacity(initialCapacity)
@ -60,8 +62,8 @@ IOMode BufferOutputStream::getOutputStreamIOMode() {
return m_ioMode;
}
Context* BufferOutputStream::getOutputStreamContext() {
return nullptr;
Context& BufferOutputStream::getOutputStreamContext() {
return DEFAULT_CONTEXT;
}
void BufferOutputStream::reserveBytesUpfront(v_buff_size count) {
@ -159,6 +161,8 @@ oatpp::async::CoroutineStarter BufferOutputStream::flushToStreamAsync(const std:
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// BufferInputStream
data::stream::DefaultInitializedContext BufferInputStream::DEFAULT_CONTEXT(data::stream::StreamType::STREAM_FINITE);
BufferInputStream::BufferInputStream(const std::shared_ptr<base::StrBuffer>& memoryHandle, p_char8 data, v_buff_size size)
: m_memoryHandle(memoryHandle)
, m_data(data)
@ -219,8 +223,8 @@ IOMode BufferInputStream::getInputStreamIOMode() {
return m_ioMode;
}
Context* BufferInputStream::getInputStreamContext() {
return nullptr;
Context& BufferInputStream::getInputStreamContext() {
return DEFAULT_CONTEXT;
}
std::shared_ptr<base::StrBuffer> BufferInputStream::getDataMemoryHandle() {

View File

@ -33,6 +33,8 @@ namespace oatpp { namespace data{ namespace stream {
* BufferOutputStream
*/
class BufferOutputStream : public ConsistentOutputStream {
public:
static data::stream::DefaultInitializedContext DEFAULT_CONTEXT;
private:
p_char8 m_data;
v_buff_size m_capacity;
@ -76,7 +78,7 @@ public:
* Get stream context.
* @return
*/
Context* getOutputStreamContext() override;
Context& getOutputStreamContext() override;
/**
* Reserve bytes for future writes.
@ -143,6 +145,8 @@ public:
* BufferInputStream
*/
class BufferInputStream : public InputStream {
public:
static data::stream::DefaultInitializedContext DEFAULT_CONTEXT;
private:
std::shared_ptr<base::StrBuffer> m_memoryHandle;
p_char8 m_data;
@ -213,7 +217,7 @@ public:
* Get stream context.
* @return
*/
Context* getInputStreamContext() override;
Context& getInputStreamContext() override;
/**
* Get data memory handle.

View File

@ -25,7 +25,9 @@
#include "ChunkedBuffer.hpp"
namespace oatpp { namespace data{ namespace stream {
data::stream::DefaultInitializedContext ChunkedBuffer::DEFAULT_CONTEXT(data::stream::StreamType::STREAM_INFINITE);
const char* ChunkedBuffer::ERROR_ASYNC_FAILED_TO_WRITE_ALL_DATA = "ERROR_ASYNC_FAILED_TO_WRITE_ALL_DATA";
const char* const ChunkedBuffer::CHUNK_POOL_NAME = "ChunkedBuffer_Chunk_Pool";
@ -156,8 +158,8 @@ IOMode ChunkedBuffer::getOutputStreamIOMode() {
return m_ioMode;
}
Context* ChunkedBuffer::getOutputStreamContext() {
return nullptr;
Context& ChunkedBuffer::getOutputStreamContext() {
return DEFAULT_CONTEXT;
}
data::v_io_size ChunkedBuffer::readSubstring(void *buffer,

View File

@ -36,6 +36,8 @@ namespace oatpp { namespace data{ namespace stream {
* Buffer wich can grow by chunks and implements &id:oatpp::data::stream::ConsistentOutputStream; interface.
*/
class ChunkedBuffer : public oatpp::base::Countable, public ConsistentOutputStream, public std::enable_shared_from_this<ChunkedBuffer> {
public:
static data::stream::DefaultInitializedContext DEFAULT_CONTEXT;
public:
static const char* ERROR_ASYNC_FAILED_TO_WRITE_ALL_DATA;
public:
@ -179,9 +181,9 @@ public:
/**
* Get stream context.
* @return - `nullptr.`
* @return - &id:oatpp::data::stream::Context;.
*/
Context* getOutputStreamContext() override;
Context& getOutputStreamContext() override;
/**
* Read part of ChunkedBuffer to buffer.

View File

@ -85,8 +85,8 @@ IOMode FileInputStream::getInputStreamIOMode() {
return m_ioMode;
}
Context* FileInputStream::getInputStreamContext() {
return &DEFAULT_CONTEXT;
Context& FileInputStream::getInputStreamContext() {
return DEFAULT_CONTEXT;
}
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
@ -148,8 +148,8 @@ IOMode FileOutputStream::getOutputStreamIOMode() {
return m_ioMode;
}
Context* FileOutputStream::getOutputStreamContext() {
return &DEFAULT_CONTEXT;
Context& FileOutputStream::getOutputStreamContext() {
return DEFAULT_CONTEXT;
}
}}}

View File

@ -101,7 +101,7 @@ public:
* Get stream context.
* @return
*/
Context* getInputStreamContext() override;
Context& getInputStreamContext() override;
};
@ -176,7 +176,7 @@ public:
* Get stream context.
* @return
*/
Context* getOutputStreamContext() override;
Context& getOutputStreamContext() override;
};

View File

@ -75,14 +75,14 @@ StreamType DefaultInitializedContext::getStreamType() const {
void IOStream::initContexts() {
auto* inStreamContext = getInputStreamContext();
if (inStreamContext && !inStreamContext->isInitialized()) {
inStreamContext->init();
auto& inStreamContext = getInputStreamContext();
if (!inStreamContext.isInitialized()) {
inStreamContext.init();
}
auto* outStreamContext = getOutputStreamContext();
if(outStreamContext && outStreamContext != inStreamContext && !outStreamContext->isInitialized()) {
outStreamContext->init();
auto& outStreamContext = getOutputStreamContext();
if(outStreamContext != inStreamContext && !outStreamContext.isInitialized()) {
outStreamContext.init();
}
}
@ -94,14 +94,14 @@ async::CoroutineStarter IOStream::initContextsAsync() {
async::CoroutineStarter starter(nullptr);
auto* inStreamContext = getInputStreamContext();
if (inStreamContext && !inStreamContext->isInitialized()) {
starter.next(inStreamContext->initAsync());
auto& inStreamContext = getInputStreamContext();
if (!inStreamContext.isInitialized()) {
starter.next(inStreamContext.initAsync());
}
auto* outStreamContext = getOutputStreamContext();
if(outStreamContext && outStreamContext != inStreamContext && !outStreamContext->isInitialized()) {
starter.next(outStreamContext->initAsync());
auto& outStreamContext = getOutputStreamContext();
if(outStreamContext != inStreamContext && !outStreamContext.isInitialized()) {
starter.next(outStreamContext.initAsync());
}
return starter;

View File

@ -110,6 +110,14 @@ public:
*/
const Properties& getProperties() const;
inline bool operator == (const Context& other){
return this == &other;
}
inline bool operator != (const Context& other){
return this != &other;
}
};
/**
@ -222,7 +230,7 @@ public:
* Get stream context. Can be `null`.
* @return - pointer to &l:Context; or `nullptr`.
*/
virtual Context* getOutputStreamContext() = 0;
virtual Context& getOutputStreamContext() = 0;
/**
* Same as `write((p_char8)data, std::strlen(data));`.
@ -298,7 +306,7 @@ public:
* Get stream context. Can be `null`.
* @return - pointer to &l:Context; or `nullptr`.
*/
virtual Context* getInputStreamContext() = 0;
virtual Context& getInputStreamContext() = 0;
};
@ -364,7 +372,7 @@ public:
return m_outputStream->getOutputStreamIOMode();
}
Context* getOutputStreamContext() override {
Context& getOutputStreamContext() override {
return m_outputStream->getOutputStreamContext();
}
@ -376,7 +384,7 @@ public:
return m_inputStream->getInputStreamIOMode();
}
Context* getInputStreamContext() override {
Context& getInputStreamContext() override {
return m_inputStream->getInputStreamContext();
}

View File

@ -53,7 +53,7 @@ oatpp::data::stream::IOMode OutputStreamBufferedProxy::getOutputStreamIOMode() {
return m_outputStream->getOutputStreamIOMode();
}
Context* OutputStreamBufferedProxy::getOutputStreamContext() {
Context& OutputStreamBufferedProxy::getOutputStreamContext() {
return m_outputStream->getOutputStreamContext();
}
@ -112,7 +112,7 @@ oatpp::data::stream::IOMode InputStreamBufferedProxy::getInputStreamIOMode() {
return m_inputStream->getInputStreamIOMode();
}
Context* InputStreamBufferedProxy::getInputStreamContext() {
Context& InputStreamBufferedProxy::getInputStreamContext() {
return m_inputStream->getInputStreamContext();
}

View File

@ -73,7 +73,7 @@ public:
* Get context of the underlying stream.
* @return
*/
Context* getOutputStreamContext() override;
Context& getOutputStreamContext() override;
data::v_io_size flush();
oatpp::async::CoroutineStarter flushAsync();
@ -143,7 +143,7 @@ public:
* Get context of the underlying stream.
* @return
*/
Context* getInputStreamContext() override;
Context& getInputStreamContext() override;
void setBufferPosition(data::v_io_size readPosition, data::v_io_size writePosition, bool canRead) {
m_buffer.setBufferPosition(readPosition, writePosition, canRead);

View File

@ -270,8 +270,8 @@ oatpp::data::stream::IOMode Connection::getOutputStreamIOMode() {
return getStreamIOMode();
}
oatpp::data::stream::Context* Connection::getOutputStreamContext() {
return &DEFAULT_CONTEXT;
oatpp::data::stream::Context& Connection::getOutputStreamContext() {
return DEFAULT_CONTEXT;
}
void Connection::setInputStreamIOMode(oatpp::data::stream::IOMode ioMode) {
@ -282,8 +282,8 @@ oatpp::data::stream::IOMode Connection::getInputStreamIOMode() {
return getStreamIOMode();
}
oatpp::data::stream::Context* Connection::getInputStreamContext() {
return &DEFAULT_CONTEXT;
oatpp::data::stream::Context& Connection::getInputStreamContext() {
return DEFAULT_CONTEXT;
}
void Connection::close(){

View File

@ -106,7 +106,7 @@ public:
* Get output stream context.
* @return - &id:oatpp::data::stream::Context;.
*/
oatpp::data::stream::Context* getOutputStreamContext() override;
oatpp::data::stream::Context& getOutputStreamContext() override;
/**
* Set InputStream I/O mode.
@ -124,7 +124,7 @@ public:
* Get input stream context. <br>
* @return - &id:oatpp::data::stream::Context;.
*/
oatpp::data::stream::Context* getInputStreamContext() override;
oatpp::data::stream::Context& getInputStreamContext() override;
/**
* Close socket handle.

View File

@ -96,11 +96,11 @@ oatpp::data::stream::IOMode ConnectionPool::ConnectionWrapper::getInputStreamIOM
return m_connection->getInputStreamIOMode();
}
oatpp::data::stream::Context* ConnectionPool::ConnectionWrapper::getOutputStreamContext() {
oatpp::data::stream::Context& ConnectionPool::ConnectionWrapper::getOutputStreamContext() {
return m_connection->getOutputStreamContext();
}
oatpp::data::stream::Context* ConnectionPool::ConnectionWrapper::getInputStreamContext() {
oatpp::data::stream::Context& ConnectionPool::ConnectionWrapper::getInputStreamContext() {
return m_connection->getInputStreamContext();
}

View File

@ -101,8 +101,8 @@ public:
void setInputStreamIOMode(oatpp::data::stream::IOMode ioMode) override;
oatpp::data::stream::IOMode getInputStreamIOMode() override;
oatpp::data::stream::Context* getOutputStreamContext() override;
oatpp::data::stream::Context* getInputStreamContext() override;
oatpp::data::stream::Context& getOutputStreamContext() override;
oatpp::data::stream::Context& getInputStreamContext() override;
/**
* Mark that this connection cannot be reused in the pool any more.

View File

@ -54,12 +54,12 @@ SimpleTCPConnectionProvider::ExtendedConnection::ExtendedConnection(data::v_io_h
, m_context(data::stream::StreamType::STREAM_INFINITE, std::forward<data::stream::Context::Properties>(properties))
{}
oatpp::data::stream::Context* SimpleTCPConnectionProvider::ExtendedConnection::getOutputStreamContext() {
return &m_context;
oatpp::data::stream::Context& SimpleTCPConnectionProvider::ExtendedConnection::getOutputStreamContext() {
return m_context;
}
oatpp::data::stream::Context* SimpleTCPConnectionProvider::ExtendedConnection::getInputStreamContext() {
return &m_context;
oatpp::data::stream::Context& SimpleTCPConnectionProvider::ExtendedConnection::getInputStreamContext() {
return m_context;
}
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////

View File

@ -63,13 +63,13 @@ public:
* Get output stream context.
* @return - &id:oatpp::data::stream::Context;.
*/
oatpp::data::stream::Context* getOutputStreamContext() override;
oatpp::data::stream::Context& getOutputStreamContext() override;
/**
* Get input stream context. <br>
* @return - &id:oatpp::data::stream::Context;.
*/
oatpp::data::stream::Context* getInputStreamContext() override;
oatpp::data::stream::Context& getInputStreamContext() override;
};

View File

@ -26,6 +26,8 @@
namespace oatpp { namespace network { namespace virtual_ {
data::stream::DefaultInitializedContext Pipe::Reader::DEFAULT_CONTEXT(data::stream::StreamType::STREAM_INFINITE);
void Pipe::Reader::setInputStreamIOMode(oatpp::data::stream::IOMode ioMode) {
m_ioMode = ioMode;
}
@ -103,8 +105,8 @@ oatpp::async::Action Pipe::Reader::suggestInputStreamAction(data::v_io_size ioRe
}
oatpp::data::stream::Context* Pipe::Reader::getInputStreamContext() {
return nullptr;
oatpp::data::stream::Context& Pipe::Reader::getInputStreamContext() {
return DEFAULT_CONTEXT;
}
void Pipe::Reader::notifyWaitList() {
@ -113,6 +115,8 @@ void Pipe::Reader::notifyWaitList() {
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
data::stream::DefaultInitializedContext Pipe::Writer::DEFAULT_CONTEXT(data::stream::StreamType::STREAM_INFINITE);
void Pipe::Writer::setOutputStreamIOMode(oatpp::data::stream::IOMode ioMode) {
m_ioMode = ioMode;
}
@ -121,8 +125,8 @@ oatpp::data::stream::IOMode Pipe::Writer::getOutputStreamIOMode() {
return m_ioMode;
}
oatpp::data::stream::Context* Pipe::Writer::getOutputStreamContext() {
return nullptr;
oatpp::data::stream::Context& Pipe::Writer::getOutputStreamContext() {
return DEFAULT_CONTEXT;
}
void Pipe::Writer::setMaxAvailableToWrite(data::v_io_size maxAvailableToWrite) {

View File

@ -52,7 +52,10 @@ public:
*/
class Reader : public oatpp::data::stream::InputStream {
friend Pipe;
public:
static data::stream::DefaultInitializedContext DEFAULT_CONTEXT;
private:
class WaitListListener : public oatpp::async::CoroutineWaitList::Listener {
private:
Pipe* m_pipe;
@ -70,6 +73,7 @@ public:
}
};
private:
Pipe* m_pipe;
oatpp::data::stream::IOMode m_ioMode;
@ -136,7 +140,7 @@ public:
* Get stream context.
* @return
*/
oatpp::data::stream::Context* getInputStreamContext() override;
oatpp::data::stream::Context& getInputStreamContext() override;
/**
* Notify coroutine wait-list
@ -151,6 +155,8 @@ public:
*/
class Writer : public oatpp::data::stream::OutputStream {
friend Pipe;
public:
static data::stream::DefaultInitializedContext DEFAULT_CONTEXT;
private:
class WaitListListener : public oatpp::async::CoroutineWaitList::Listener {
private:
@ -232,7 +238,7 @@ public:
* Get stream context.
* @return
*/
oatpp::data::stream::Context* getOutputStreamContext() override;
oatpp::data::stream::Context& getOutputStreamContext() override;
/**
* Notify coroutine wait-list

View File

@ -76,12 +76,12 @@ oatpp::data::stream::IOMode Socket::getInputStreamIOMode() {
return m_pipeIn->getReader()->getInputStreamIOMode();
}
oatpp::data::stream::Context* Socket::getOutputStreamContext() {
oatpp::data::stream::Context& Socket::getOutputStreamContext() {
return m_pipeOut->getWriter()->getOutputStreamContext();
}
oatpp::data::stream::Context* Socket::getInputStreamContext() {
oatpp::data::stream::Context& Socket::getInputStreamContext() {
return m_pipeIn->getReader()->getInputStreamContext();
}

View File

@ -128,13 +128,13 @@ public:
* Get output stream context.
* @return
*/
oatpp::data::stream::Context* getOutputStreamContext() override;
oatpp::data::stream::Context& getOutputStreamContext() override;
/**
* Get input stream context.
* @return
*/
oatpp::data::stream::Context* getInputStreamContext() override;
oatpp::data::stream::Context& getInputStreamContext() override;
/**
* Close socket pipes.

View File

@ -153,7 +153,7 @@ Beautifier::IOMode Beautifier::getOutputStreamIOMode() {
return m_outputStream->getOutputStreamIOMode();
}
Beautifier::Context* Beautifier::getOutputStreamContext() {
Beautifier::Context& Beautifier::getOutputStreamContext() {
return m_outputStream->getOutputStreamContext();
}

View File

@ -79,7 +79,7 @@ public:
* Get stream context.
* @return
*/
Context* getOutputStreamContext() override;
Context& getOutputStreamContext() override;
};

View File

@ -60,7 +60,7 @@ public:
throw std::runtime_error("It's a stub!");
}
oatpp::data::stream::Context* getOutputStreamContext() override {
oatpp::data::stream::Context& getOutputStreamContext() override {
throw std::runtime_error("It's a stub!");
}
@ -72,7 +72,7 @@ public:
throw std::runtime_error("It's a stub!");
}
oatpp::data::stream::Context* getInputStreamContext() override {
oatpp::data::stream::Context& getInputStreamContext() override {
throw std::runtime_error("It's a stub!");
}