Merge pull request #36 from oatpp/io_generalization

Io generalization
This commit is contained in:
Leonid Stryzhevskyi 2019-02-07 08:36:05 +02:00 committed by GitHub
commit 50418a525c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
49 changed files with 895 additions and 681 deletions

View File

@ -12,15 +12,19 @@ jobs:
workspace: workspace:
clean: all clean: all
steps: steps:
- script: |
mkdir build
- task: CMake@1 - task: CMake@1
- script: | - script: |
cmake -DCMAKE_BUILD_TYPE=Release -DBUILD_SHARED_LIBS=OFF -DOATPP_INSTALL=OFF -DOATPP_BUILD_TESTS=ON cmake -DCMAKE_BUILD_TYPE=Release -DBUILD_SHARED_LIBS=OFF -DOATPP_INSTALL=OFF -DOATPP_BUILD_TESTS=ON ..
make make
displayName: 'CMake' displayName: 'CMake'
workingDirectory: build
- script: | - script: |
make test ARGS="-V" make test ARGS="-V"
displayName: 'Test' displayName: 'Test'
workingDirectory: build
- job: macOS - job: macOS
displayName: 'Build - macOS-10.13' displayName: 'Build - macOS-10.13'
@ -30,12 +34,16 @@ jobs:
workspace: workspace:
clean: all clean: all
steps: steps:
- script: |
mkdir build
- task: CMake@1 - task: CMake@1
- script: | - script: |
cmake -DCMAKE_BUILD_TYPE=Release -DBUILD_SHARED_LIBS=OFF -DOATPP_INSTALL=OFF -DOATPP_BUILD_TESTS=ON cmake -DCMAKE_BUILD_TYPE=Release -DBUILD_SHARED_LIBS=OFF -DOATPP_INSTALL=OFF -DOATPP_BUILD_TESTS=ON ..
make make
displayName: 'CMake' displayName: 'CMake'
workingDirectory: build
- script: | - script: |
make test ARGS="-V" make test ARGS="-V"
displayName: 'Test' displayName: 'Test'
workingDirectory: build

View File

@ -46,6 +46,7 @@ add_library(oatpp
oatpp/core/concurrency/SpinLock.hpp oatpp/core/concurrency/SpinLock.hpp
oatpp/core/concurrency/Thread.cpp oatpp/core/concurrency/Thread.cpp
oatpp/core/concurrency/Thread.hpp oatpp/core/concurrency/Thread.hpp
oatpp/core/data/IODefinitions.hpp
oatpp/core/data/buffer/FIFOBuffer.cpp oatpp/core/data/buffer/FIFOBuffer.cpp
oatpp/core/data/buffer/FIFOBuffer.hpp oatpp/core/data/buffer/FIFOBuffer.hpp
oatpp/core/data/buffer/IOBuffer.cpp oatpp/core/data/buffer/IOBuffer.cpp
@ -75,8 +76,6 @@ add_library(oatpp
oatpp/core/macro/basic.hpp oatpp/core/macro/basic.hpp
oatpp/core/macro/codegen.hpp oatpp/core/macro/codegen.hpp
oatpp/core/macro/component.hpp oatpp/core/macro/component.hpp
oatpp/core/os/io/Library.cpp
oatpp/core/os/io/Library.hpp
oatpp/core/parser/ParsingCaret.cpp oatpp/core/parser/ParsingCaret.cpp
oatpp/core/parser/ParsingCaret.hpp oatpp/core/parser/ParsingCaret.hpp
oatpp/core/utils/ConversionUtils.cpp oatpp/core/utils/ConversionUtils.cpp

View File

@ -0,0 +1,86 @@
/***************************************************************************
*
* Project _____ __ ____ _ _
* ( _ ) /__\ (_ _)_| |_ _| |_
* )(_)( /(__)\ )( (_ _)(_ _)
* (_____)(__)(__)(__) |_| |_|
*
*
* Copyright 2018-present, Leonid Stryzhevskyi, <lganzzzo@gmail.com>
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
***************************************************************************/
#ifndef oatpp_data_IODefinitions_hpp
#define oatpp_data_IODefinitions_hpp
#include "oatpp/core/Types.hpp"
namespace oatpp { namespace data {
typedef int v_io_handle;
/**
* All I/O buffer operations (like read/write(buffer, size)) should return
* v_io_size.
*
* Possible return values:
* On Success - [1..max_int64]
* On Error - IOError values.
*
* All other values are considered to be a fatal error.
* application should be terminated.
*/
typedef v_int64 v_io_size;
/**
* Final set of possible I/O operation error values.
* I/O operation should not return any other error values.
*/
enum IOError : v_io_size {
/**
* In oatpp 0 is considered to be an Error as for I/O operation size
*
* As for argument value 0 should be handled separately of the main flow.
*
* As for return value 0 should not be returned.
* I/O method should return an error describing a reason why I/O is empty instead of a zero itself.
* if zero is returned, client should treat it like a bad api implementation and as an error in the flow.
*/
ZERO_VALUE = 0,
/**
* I/O operation is not possible any more
* Client should give up trying and free all related resources
*/
BROKEN_PIPE = -1001,
/**
* I/O operation was interrupted because of some reason
* Client may retry immediately
*/
RETRY = -1002,
/**
* I/O operation is not currently available due to some reason
* Client should wait then retry
*/
WAIT_RETRY = -1003
};
}}
#endif //oatpp_data_IODefinitions_hpp

View File

@ -25,35 +25,55 @@
#include "FIFOBuffer.hpp" #include "FIFOBuffer.hpp"
namespace oatpp { namespace data{ namespace buffer { namespace oatpp { namespace data{ namespace buffer {
os::io::Library::v_size FIFOBuffer::availableToRead() { FIFOBuffer::FIFOBuffer(void* buffer, v_io_size bufferSize,
oatpp::concurrency::SpinLock lock(m_atom); data::v_io_size readPosition, data::v_io_size writePosition,
bool canRead)
: m_buffer((p_char8)buffer)
, m_bufferSize(bufferSize)
, m_readPosition(readPosition)
, m_writePosition(writePosition)
, m_canRead(canRead)
{}
FIFOBuffer::FIFOBuffer(void* buffer, v_io_size bufferSize)
: FIFOBuffer(buffer, bufferSize, 0, 0, false)
{}
void FIFOBuffer::setBufferPosition(data::v_io_size readPosition, data::v_io_size writePosition, bool canRead) {
m_readPosition = readPosition;
m_writePosition = writePosition;
m_canRead = canRead;
}
data::v_io_size FIFOBuffer::availableToRead() const {
if(!m_canRead) { if(!m_canRead) {
return 0; return 0;
} }
if(m_readPosition < m_writePosition) { if(m_readPosition < m_writePosition) {
return m_writePosition - m_readPosition; return m_writePosition - m_readPosition;
} }
return (IOBuffer::BUFFER_SIZE - m_readPosition + m_writePosition); return (m_bufferSize - m_readPosition + m_writePosition);
} }
os::io::Library::v_size FIFOBuffer::availableToWrite() { data::v_io_size FIFOBuffer::availableToWrite() const {
oatpp::concurrency::SpinLock lock(m_atom);
if(m_canRead && m_writePosition == m_readPosition) { if(m_canRead && m_writePosition == m_readPosition) {
return 0; return 0;
} }
if(m_writePosition < m_readPosition) { if(m_writePosition < m_readPosition) {
return m_readPosition - m_writePosition; return m_readPosition - m_writePosition;
} }
return (IOBuffer::BUFFER_SIZE - m_writePosition + m_readPosition); return (m_bufferSize - m_writePosition + m_readPosition);
} }
os::io::Library::v_size FIFOBuffer::read(void *data, os::io::Library::v_size count) { data::v_io_size FIFOBuffer::getBufferSize() const {
return m_bufferSize;
oatpp::concurrency::SpinLock lock(m_atom); }
data::v_io_size FIFOBuffer::read(void *data, data::v_io_size count) {
if(!m_canRead) { if(!m_canRead) {
return 0; return data::IOError::WAIT_RETRY;
} }
if(count == 0) { if(count == 0) {
@ -67,7 +87,7 @@ os::io::Library::v_size FIFOBuffer::read(void *data, os::io::Library::v_size cou
if(size > count) { if(size > count) {
size = count; size = count;
} }
std::memcpy(data, &((p_char8)m_buffer.getData())[m_readPosition], size); std::memcpy(data, &m_buffer[m_readPosition], size);
m_readPosition += size; m_readPosition += size;
if(m_readPosition == m_writePosition) { if(m_readPosition == m_writePosition) {
m_canRead = false; m_canRead = false;
@ -75,21 +95,21 @@ os::io::Library::v_size FIFOBuffer::read(void *data, os::io::Library::v_size cou
return size; return size;
} }
auto size = IOBuffer::BUFFER_SIZE - m_readPosition; auto size = m_bufferSize - m_readPosition;
if(size > count){ if(size > count){
std::memcpy(data, &((p_char8)m_buffer.getData())[m_readPosition], count); std::memcpy(data, &m_buffer[m_readPosition], count);
m_readPosition += count; m_readPosition += count;
return count; return count;
} }
std::memcpy(data, &((p_char8)m_buffer.getData())[m_readPosition], size); std::memcpy(data, &m_buffer[m_readPosition], size);
auto size2 = m_writePosition; auto size2 = m_writePosition;
if(size2 > count - size) { if(size2 > count - size) {
size2 = count - size; size2 = count - size;
} }
std::memcpy(&((p_char8) data)[size], m_buffer.getData(), size2); std::memcpy(&((p_char8) data)[size], m_buffer, size2);
m_readPosition = size2; m_readPosition = size2;
if(m_readPosition == m_writePosition) { if(m_readPosition == m_writePosition) {
m_canRead = false; m_canRead = false;
@ -99,12 +119,10 @@ os::io::Library::v_size FIFOBuffer::read(void *data, os::io::Library::v_size cou
} }
os::io::Library::v_size FIFOBuffer::write(const void *data, os::io::Library::v_size count) { data::v_io_size FIFOBuffer::write(const void *data, data::v_io_size count) {
oatpp::concurrency::SpinLock lock(m_atom);
if(m_canRead && m_writePosition == m_readPosition) { if(m_canRead && m_writePosition == m_readPosition) {
return 0; return data::IOError::WAIT_RETRY;
} }
if(count == 0) { if(count == 0) {
@ -120,30 +138,267 @@ os::io::Library::v_size FIFOBuffer::write(const void *data, os::io::Library::v_s
if(size > count) { if(size > count) {
size = count; size = count;
} }
std::memcpy(&((p_char8)m_buffer.getData())[m_writePosition], data, size); std::memcpy(&m_buffer[m_writePosition], data, size);
m_writePosition += size; m_writePosition += size;
return size; return size;
} }
auto size = IOBuffer::BUFFER_SIZE - m_writePosition; auto size = m_bufferSize - m_writePosition;
if(size > count){ if(size > count){
std::memcpy(&((p_char8)m_buffer.getData())[m_writePosition], data, count); std::memcpy(&m_buffer[m_writePosition], data, count);
m_writePosition += count; m_writePosition += count;
return count; return count;
} }
std::memcpy(&((p_char8)m_buffer.getData())[m_writePosition], data, size); std::memcpy(&m_buffer[m_writePosition], data, size);
auto size2 = m_readPosition; auto size2 = m_readPosition;
if(size2 > count - size) { if(size2 > count - size) {
size2 = count - size; size2 = count - size;
} }
std::memcpy(m_buffer.getData(), &((p_char8) data)[size], size2); std::memcpy(m_buffer, &((p_char8) data)[size], size2);
m_writePosition = size2; m_writePosition = size2;
return (size + size2); return (size + size2);
} }
data::v_io_size FIFOBuffer::readAndWriteToStream(data::stream::OutputStream& stream, data::v_io_size count) {
if(!m_canRead) {
return data::IOError::WAIT_RETRY;
}
if(count == 0) {
return 0;
} else if(count < 0) {
throw std::runtime_error("[oatpp::data::buffer::FIFOBuffer::readAndWriteToStream(...)]: count < 0");
}
if(m_readPosition < m_writePosition) {
auto size = m_writePosition - m_readPosition;
if(size > count) {
size = count;
}
auto bytesWritten = stream.write(&m_buffer[m_readPosition], size);
if(bytesWritten > 0) {
m_readPosition += bytesWritten;
if (m_readPosition == m_writePosition) {
m_canRead = false;
}
}
return bytesWritten;
}
auto size = m_bufferSize - m_readPosition;
/* DO NOT call stream.write() twice if size > count !!! */
if(size > count){
size = count;
} else if(size == 0) {
auto bytesWritten = stream.write(m_buffer, m_writePosition);
if(bytesWritten > 0) {
m_readPosition = bytesWritten;
if (m_readPosition == m_writePosition) {
m_canRead = false;
}
}
return bytesWritten;
}
auto bytesWritten = stream.write(&m_buffer[m_readPosition], size);
if(bytesWritten > 0) {
m_readPosition += bytesWritten;
}
return bytesWritten;
}
data::v_io_size FIFOBuffer::readFromStreamAndWrite(data::stream::InputStream& stream, data::v_io_size count) {
if(m_canRead && m_writePosition == m_readPosition) {
return data::IOError::WAIT_RETRY;
}
if(count == 0) {
return 0;
} else if(count < 0) {
throw std::runtime_error("[oatpp::data::buffer::FIFOBuffer::readFromStreamAndWrite(...)]: count < 0");
}
if(m_writePosition < m_readPosition) {
auto size = m_readPosition - m_writePosition;
if(size > count) {
size = count;
}
auto bytesRead = stream.read(&m_buffer[m_writePosition], size);
if(bytesRead > 0) {
m_writePosition += bytesRead;
m_canRead = true;
}
return bytesRead;
}
auto size = m_bufferSize - m_writePosition;
/* DO NOT call stream.read() twice if size > count !!! */
if(size > count){
size = count;
} else if(size == 0) {
auto bytesRead = stream.read(m_buffer, m_readPosition);
if(bytesRead > 0) {
m_writePosition = bytesRead;
m_canRead = true;
}
return bytesRead;
}
auto bytesRead = stream.read(&m_buffer[m_writePosition], size);
if(bytesRead > 0) {
m_writePosition += bytesRead;
m_canRead = true;
}
return bytesRead;
}
data::v_io_size FIFOBuffer::flushToStream(data::stream::OutputStream& stream) {
if(!m_canRead) {
return 0;
}
data::v_io_size result;
if(m_readPosition < m_writePosition) {
result = data::stream::writeExactSizeData(&stream, &m_buffer[m_readPosition], m_writePosition - m_readPosition);
} else {
auto result = data::stream::writeExactSizeData(&stream, &m_buffer[m_readPosition], m_bufferSize - m_readPosition);
result += data::stream::writeExactSizeData(&stream, m_buffer, m_writePosition);
}
setBufferPosition(0, 0, false);
return result;
}
oatpp::async::Action FIFOBuffer::flushToStreamAsync(oatpp::async::AbstractCoroutine* parentCoroutine,
const oatpp::async::Action& actionOnFinish,
data::stream::OutputStream& stream)
{
class FlushCoroutine : public oatpp::async::Coroutine<FlushCoroutine> {
private:
FIFOBuffer* m_fifo;
data::stream::OutputStream* m_stream;
private:
const void* m_data1;
data::v_io_size m_size1;
const void* m_data2;
data::v_io_size m_size2;
public:
FlushCoroutine(FIFOBuffer* fifo, data::stream::OutputStream* stream)
: m_fifo(fifo)
, m_stream(stream)
{}
Action act() override {
if(m_fifo->m_readPosition < m_fifo->m_writePosition) {
m_data1 = &m_fifo->m_buffer[m_fifo->m_readPosition];
m_size1 = m_fifo->m_writePosition - m_fifo->m_readPosition;
return yieldTo(&FlushCoroutine::fullFlush);
} else {
m_data1 = &m_fifo->m_buffer[m_fifo->m_readPosition];
m_size1 = m_fifo->m_bufferSize - m_fifo->m_readPosition;
m_data2 = m_fifo->m_buffer;
m_size2 = m_fifo->m_writePosition;
return yieldTo(&FlushCoroutine::partialFlush1);
}
}
Action fullFlush() {
return data::stream::writeExactSizeDataAsyncInline(m_stream, m_data1, m_size1, yieldTo(&FlushCoroutine::beforeFinish));
}
Action partialFlush1() {
return data::stream::writeExactSizeDataAsyncInline(m_stream, m_data1, m_size1, yieldTo(&FlushCoroutine::partialFlush2));
}
Action partialFlush2() {
return data::stream::writeExactSizeDataAsyncInline(m_stream, m_data2, m_size2, yieldTo(&FlushCoroutine::beforeFinish));
}
Action beforeFinish() {
m_fifo->setBufferPosition(0, 0, false);
return finish();
}
};
return parentCoroutine->startCoroutine<FlushCoroutine>(actionOnFinish, this, &stream);
}
//////////////////////////////////////////////////////////////////////////////////////////
// SynchronizedFIFOBuffer
SynchronizedFIFOBuffer::SynchronizedFIFOBuffer(void* buffer, v_io_size bufferSize,
data::v_io_size readPosition, data::v_io_size writePosition,
bool canRead)
: m_fifo(buffer, bufferSize, readPosition, writePosition, canRead)
, m_atom(false)
{}
SynchronizedFIFOBuffer::SynchronizedFIFOBuffer(void* buffer, v_io_size bufferSize)
: m_fifo(buffer, bufferSize)
, m_atom(false)
{}
void SynchronizedFIFOBuffer::setBufferPosition(data::v_io_size readPosition, data::v_io_size writePosition, bool canRead) {
oatpp::concurrency::SpinLock lock(m_atom);
m_fifo.setBufferPosition(readPosition, writePosition, canRead);
}
data::v_io_size SynchronizedFIFOBuffer::availableToRead() {
oatpp::concurrency::SpinLock lock(m_atom);
return m_fifo.availableToRead();
}
data::v_io_size SynchronizedFIFOBuffer::availableToWrite() {
oatpp::concurrency::SpinLock lock(m_atom);
return m_fifo.availableToWrite();
}
data::v_io_size SynchronizedFIFOBuffer::getBufferSize() const {
return m_fifo.getBufferSize();
}
data::v_io_size SynchronizedFIFOBuffer::read(void *data, data::v_io_size count) {
oatpp::concurrency::SpinLock lock(m_atom);
return m_fifo.read(data, count);
}
data::v_io_size SynchronizedFIFOBuffer::write(const void *data, data::v_io_size count) {
oatpp::concurrency::SpinLock lock(m_atom);
return m_fifo.write(data, count);
}
}}} }}}

View File

@ -25,41 +25,121 @@
#ifndef oatpp_data_buffer_FIFOBuffer_hpp #ifndef oatpp_data_buffer_FIFOBuffer_hpp
#define oatpp_data_buffer_FIFOBuffer_hpp #define oatpp_data_buffer_FIFOBuffer_hpp
#include "./IOBuffer.hpp" #include "oatpp/core/data/stream/Stream.hpp"
#include "oatpp/core/data/IODefinitions.hpp"
#include "oatpp/core/async/Coroutine.hpp"
#include "oatpp/core/concurrency/SpinLock.hpp" #include "oatpp/core/concurrency/SpinLock.hpp"
#include "oatpp/core/os/io/Library.hpp"
namespace oatpp { namespace data{ namespace buffer { namespace oatpp { namespace data { namespace buffer {
class FIFOBuffer : public oatpp::base::Controllable { /**
public: * FIFO operations over the buffer
OBJECT_POOL(FIFOBuffer_Pool, FIFOBuffer, 32) * !FIFOBuffer is NOT an IOStream despite having similar APIs!
SHARED_OBJECT_POOL(Shared_FIFOBuffer_Pool, FIFOBuffer, 32) */
class FIFOBuffer {
private: private:
p_char8 m_buffer;
v_io_size m_bufferSize;
data::v_io_size m_readPosition;
data::v_io_size m_writePosition;
bool m_canRead; bool m_canRead;
os::io::Library::v_size m_readPosition; public:
os::io::Library::v_size m_writePosition;
IOBuffer m_buffer; FIFOBuffer(void* buffer, v_io_size bufferSize,
data::v_io_size readPosition, data::v_io_size writePosition,
bool canRead);
FIFOBuffer(void* buffer, v_io_size bufferSize);
void setBufferPosition(data::v_io_size readPosition, data::v_io_size writePosition, bool canRead);
data::v_io_size availableToRead() const;
data::v_io_size availableToWrite() const;
data::v_io_size getBufferSize() const;
/**
* read up to count bytes from the buffer to data
* @param data
* @param count
* @return [1..count], IOErrors.
*
*/
data::v_io_size read(void *data, data::v_io_size count);
/**
* write up to count bytes from data to buffer
* @param data
* @param count
* @return [1..count], IOErrors.
*/
data::v_io_size write(const void *data, data::v_io_size count);
/**
* call read and then write bytes read to output stream
* @param stream
* @param count
* @return [1..count], IOErrors.
*/
data::v_io_size readAndWriteToStream(data::stream::OutputStream& stream, data::v_io_size count);
/**
* call stream.read() and then write bytes read to buffer
* @param stream
* @param count
* @return
*/
data::v_io_size readFromStreamAndWrite(data::stream::InputStream& stream, data::v_io_size count);
/**
* flush all availableToRead bytes to stream
* @param stream
* @return
*/
data::v_io_size flushToStream(data::stream::OutputStream& stream);
/**
* flush all availableToRead bytes to stream in asynchronous manner
* @param parentCoroutine
* @param actionOnFinish
* @return
*/
oatpp::async::Action flushToStreamAsync(oatpp::async::AbstractCoroutine* parentCoroutine,
const oatpp::async::Action& actionOnFinish,
data::stream::OutputStream& stream);
};
/**
* Same as FIFOBuffer + synchronization with SpinLock
*/
class SynchronizedFIFOBuffer {
private:
FIFOBuffer m_fifo;
oatpp::concurrency::SpinLock::Atom m_atom; oatpp::concurrency::SpinLock::Atom m_atom;
public: public:
FIFOBuffer()
: m_canRead(false) SynchronizedFIFOBuffer(void* buffer, v_io_size bufferSize,
, m_readPosition(0) data::v_io_size readPosition, data::v_io_size writePosition,
, m_writePosition(0) bool canRead);
, m_atom(false)
{} SynchronizedFIFOBuffer(void* buffer, v_io_size bufferSize);
public:
void setBufferPosition(data::v_io_size readPosition, data::v_io_size writePosition, bool canRead);
static std::shared_ptr<FIFOBuffer> createShared(){
return Shared_FIFOBuffer_Pool::allocateShared(); data::v_io_size availableToRead();
} data::v_io_size availableToWrite();
os::io::Library::v_size availableToRead(); data::v_io_size getBufferSize() const;
os::io::Library::v_size availableToWrite();
data::v_io_size read(void *data, data::v_io_size count);
os::io::Library::v_size read(void *data, os::io::Library::v_size count); data::v_io_size write(const void *data, data::v_io_size count);
os::io::Library::v_size write(const void *data, os::io::Library::v_size count);
/* No implementation of other methods */
/* User should implement his own synchronization for other methods */
}; };
}}} }}}

View File

@ -30,10 +30,10 @@ const char* ChunkedBuffer::ERROR_ASYNC_FAILED_TO_WRITE_ALL_DATA = "ERROR_ASYNC_F
const char* const ChunkedBuffer::CHUNK_POOL_NAME = "ChunkedBuffer_Chunk_Pool"; const char* const ChunkedBuffer::CHUNK_POOL_NAME = "ChunkedBuffer_Chunk_Pool";
const os::io::Library::v_size ChunkedBuffer::CHUNK_ENTRY_SIZE_INDEX_SHIFT = 11; const data::v_io_size ChunkedBuffer::CHUNK_ENTRY_SIZE_INDEX_SHIFT = 11;
const os::io::Library::v_size ChunkedBuffer::CHUNK_ENTRY_SIZE = const data::v_io_size ChunkedBuffer::CHUNK_ENTRY_SIZE =
(1 << ChunkedBuffer::CHUNK_ENTRY_SIZE_INDEX_SHIFT); (1 << ChunkedBuffer::CHUNK_ENTRY_SIZE_INDEX_SHIFT);
const os::io::Library::v_size ChunkedBuffer::CHUNK_CHUNK_SIZE = 32; const data::v_io_size ChunkedBuffer::CHUNK_CHUNK_SIZE = 32;
ChunkedBuffer::ChunkEntry* ChunkedBuffer::obtainNewEntry(){ ChunkedBuffer::ChunkEntry* ChunkedBuffer::obtainNewEntry(){
auto result = new ChunkEntry(getSegemntPool().obtain(), nullptr); auto result = new ChunkEntry(getSegemntPool().obtain(), nullptr);
@ -51,10 +51,10 @@ void ChunkedBuffer::freeEntry(ChunkEntry* entry){
delete entry; delete entry;
} }
os::io::Library::v_size ChunkedBuffer::writeToEntry(ChunkEntry* entry, data::v_io_size ChunkedBuffer::writeToEntry(ChunkEntry* entry,
const void *data, const void *data,
os::io::Library::v_size count, data::v_io_size count,
os::io::Library::v_size& outChunkPos) { data::v_io_size& outChunkPos) {
if(count >= CHUNK_ENTRY_SIZE){ if(count >= CHUNK_ENTRY_SIZE){
std::memcpy(entry->chunk, data, CHUNK_ENTRY_SIZE); std::memcpy(entry->chunk, data, CHUNK_ENTRY_SIZE);
outChunkPos = 0; outChunkPos = 0;
@ -66,12 +66,12 @@ os::io::Library::v_size ChunkedBuffer::writeToEntry(ChunkEntry* entry,
} }
} }
os::io::Library::v_size ChunkedBuffer::writeToEntryFrom(ChunkEntry* entry, data::v_io_size ChunkedBuffer::writeToEntryFrom(ChunkEntry* entry,
os::io::Library::v_size inChunkPos, data::v_io_size inChunkPos,
const void *data, const void *data,
os::io::Library::v_size count, data::v_io_size count,
os::io::Library::v_size& outChunkPos) { data::v_io_size& outChunkPos) {
os::io::Library::v_size spaceLeft = CHUNK_ENTRY_SIZE - inChunkPos; data::v_io_size spaceLeft = CHUNK_ENTRY_SIZE - inChunkPos;
if(count >= spaceLeft){ if(count >= spaceLeft){
std::memcpy(&((p_char8) entry->chunk)[inChunkPos], data, spaceLeft); std::memcpy(&((p_char8) entry->chunk)[inChunkPos], data, spaceLeft);
outChunkPos = 0; outChunkPos = 0;
@ -84,15 +84,15 @@ os::io::Library::v_size ChunkedBuffer::writeToEntryFrom(ChunkEntry* entry,
} }
ChunkedBuffer::ChunkEntry* ChunkedBuffer::getChunkForPosition(ChunkEntry* fromChunk, ChunkedBuffer::ChunkEntry* ChunkedBuffer::getChunkForPosition(ChunkEntry* fromChunk,
os::io::Library::v_size pos, data::v_io_size pos,
os::io::Library::v_size& outChunkPos) { data::v_io_size& outChunkPos) {
os::io::Library::v_size segIndex = pos >> CHUNK_ENTRY_SIZE_INDEX_SHIFT; data::v_io_size segIndex = pos >> CHUNK_ENTRY_SIZE_INDEX_SHIFT;
outChunkPos = pos - (segIndex << CHUNK_ENTRY_SIZE_INDEX_SHIFT); outChunkPos = pos - (segIndex << CHUNK_ENTRY_SIZE_INDEX_SHIFT);
auto curr = fromChunk; auto curr = fromChunk;
for(os::io::Library::v_size i = 0; i < segIndex; i++){ for(data::v_io_size i = 0; i < segIndex; i++){
curr = curr->next; curr = curr->next;
} }
@ -100,7 +100,7 @@ ChunkedBuffer::ChunkEntry* ChunkedBuffer::getChunkForPosition(ChunkEntry* fromCh
} }
os::io::Library::v_size ChunkedBuffer::write(const void *data, os::io::Library::v_size count){ data::v_io_size ChunkedBuffer::write(const void *data, data::v_io_size count){
if(count <= 0){ if(count <= 0){
return 0; return 0;
@ -111,7 +111,7 @@ os::io::Library::v_size ChunkedBuffer::write(const void *data, os::io::Library::
} }
ChunkEntry* entry = m_lastEntry; ChunkEntry* entry = m_lastEntry;
os::io::Library::v_size pos = 0; data::v_io_size pos = 0;
pos += writeToEntryFrom(entry, m_chunkPos, data, count, m_chunkPos); pos += writeToEntryFrom(entry, m_chunkPos, data, count, m_chunkPos);
@ -133,32 +133,32 @@ os::io::Library::v_size ChunkedBuffer::write(const void *data, os::io::Library::
} }
os::io::Library::v_size ChunkedBuffer::readSubstring(void *buffer, data::v_io_size ChunkedBuffer::readSubstring(void *buffer,
os::io::Library::v_size pos, data::v_io_size pos,
os::io::Library::v_size count) { data::v_io_size count) {
if(pos < 0 || pos >= m_size){ if(pos < 0 || pos >= m_size){
return 0; return 0;
} }
os::io::Library::v_size countToRead; data::v_io_size countToRead;
if(pos + count > m_size){ if(pos + count > m_size){
countToRead = m_size - pos; countToRead = m_size - pos;
} else { } else {
countToRead = count; countToRead = count;
} }
os::io::Library::v_size firstChunkPos; data::v_io_size firstChunkPos;
auto firstChunk = getChunkForPosition(m_firstEntry, pos, firstChunkPos); auto firstChunk = getChunkForPosition(m_firstEntry, pos, firstChunkPos);
os::io::Library::v_size lastChunkPos; data::v_io_size lastChunkPos;
auto lastChunk = getChunkForPosition(firstChunk, firstChunkPos + countToRead, lastChunkPos); auto lastChunk = getChunkForPosition(firstChunk, firstChunkPos + countToRead, lastChunkPos);
os::io::Library::v_size bufferPos = 0; data::v_io_size bufferPos = 0;
if(firstChunk != lastChunk){ if(firstChunk != lastChunk){
os::io::Library::v_size countToCopy = CHUNK_ENTRY_SIZE - firstChunkPos; data::v_io_size countToCopy = CHUNK_ENTRY_SIZE - firstChunkPos;
std::memcpy(buffer, &((p_char8)firstChunk->chunk)[firstChunkPos], countToCopy); std::memcpy(buffer, &((p_char8)firstChunk->chunk)[firstChunkPos], countToCopy);
bufferPos += countToCopy; bufferPos += countToCopy;
@ -173,7 +173,7 @@ os::io::Library::v_size ChunkedBuffer::readSubstring(void *buffer,
std::memcpy(&((p_char8)buffer)[bufferPos], lastChunk->chunk, lastChunkPos); std::memcpy(&((p_char8)buffer)[bufferPos], lastChunk->chunk, lastChunkPos);
} else { } else {
os::io::Library::v_size countToCopy = lastChunkPos - firstChunkPos; data::v_io_size countToCopy = lastChunkPos - firstChunkPos;
std::memcpy(buffer, &((p_char8)firstChunk->chunk)[firstChunkPos], countToCopy); std::memcpy(buffer, &((p_char8)firstChunk->chunk)[firstChunkPos], countToCopy);
} }
@ -181,15 +181,15 @@ os::io::Library::v_size ChunkedBuffer::readSubstring(void *buffer,
} }
oatpp::String ChunkedBuffer::getSubstring(os::io::Library::v_size pos, oatpp::String ChunkedBuffer::getSubstring(data::v_io_size pos,
os::io::Library::v_size count){ data::v_io_size count){
auto str = oatpp::String((v_int32) count); auto str = oatpp::String((v_int32) count);
readSubstring(str->getData(), pos, count); readSubstring(str->getData(), pos, count);
return str; return str;
} }
bool ChunkedBuffer::flushToStream(const std::shared_ptr<OutputStream>& stream){ bool ChunkedBuffer::flushToStream(const std::shared_ptr<OutputStream>& stream){
os::io::Library::v_size pos = m_size; data::v_io_size pos = m_size;
auto curr = m_firstEntry; auto curr = m_firstEntry;
while (pos > 0) { while (pos > 0) {
if(pos > CHUNK_ENTRY_SIZE) { if(pos > CHUNK_ENTRY_SIZE) {
@ -219,9 +219,9 @@ oatpp::async::Action ChunkedBuffer::flushToStreamAsync(oatpp::async::AbstractCor
std::shared_ptr<ChunkedBuffer> m_chunkedBuffer; std::shared_ptr<ChunkedBuffer> m_chunkedBuffer;
std::shared_ptr<OutputStream> m_stream; std::shared_ptr<OutputStream> m_stream;
ChunkEntry* m_currEntry; ChunkEntry* m_currEntry;
os::io::Library::v_size m_bytesLeft; data::v_io_size m_bytesLeft;
const void* m_currData; const void* m_currData;
os::io::Library::v_size m_currDataSize; data::v_io_size m_currDataSize;
Action m_nextAction; Action m_nextAction;
public: public:
@ -288,7 +288,7 @@ std::shared_ptr<ChunkedBuffer::Chunks> ChunkedBuffer::getChunks() {
return chunks; return chunks;
} }
os::io::Library::v_size ChunkedBuffer::getSize(){ data::v_io_size ChunkedBuffer::getSize(){
return m_size; return m_size;
} }

View File

@ -42,9 +42,9 @@ public:
static const char* const CHUNK_POOL_NAME; static const char* const CHUNK_POOL_NAME;
static const os::io::Library::v_size CHUNK_ENTRY_SIZE_INDEX_SHIFT; static const data::v_io_size CHUNK_ENTRY_SIZE_INDEX_SHIFT;
static const os::io::Library::v_size CHUNK_ENTRY_SIZE; static const data::v_io_size CHUNK_ENTRY_SIZE;
static const os::io::Library::v_size CHUNK_CHUNK_SIZE; static const data::v_io_size CHUNK_CHUNK_SIZE;
static oatpp::base::memory::ThreadDistributedMemoryPool& getSegemntPool(){ static oatpp::base::memory::ThreadDistributedMemoryPool& getSegemntPool(){
static oatpp::base::memory::ThreadDistributedMemoryPool pool(CHUNK_POOL_NAME, static oatpp::base::memory::ThreadDistributedMemoryPool pool(CHUNK_POOL_NAME,
@ -81,17 +81,17 @@ public:
SHARED_OBJECT_POOL(Shared_ChunkedBuffer_Chunk_Pool, Chunk, 32) SHARED_OBJECT_POOL(Shared_ChunkedBuffer_Chunk_Pool, Chunk, 32)
public: public:
Chunk(void* pData, os::io::Library::v_size pSize) Chunk(void* pData, data::v_io_size pSize)
: data(pData) : data(pData)
, size(pSize) , size(pSize)
{} {}
static std::shared_ptr<Chunk> createShared(void* data, os::io::Library::v_size size){ static std::shared_ptr<Chunk> createShared(void* data, data::v_io_size size){
return Shared_ChunkedBuffer_Chunk_Pool::allocateShared(data, size); return Shared_ChunkedBuffer_Chunk_Pool::allocateShared(data, size);
} }
const void* data; const void* data;
const os::io::Library::v_size size; const data::v_io_size size;
}; };
@ -99,8 +99,8 @@ public:
typedef oatpp::collection::LinkedList<std::shared_ptr<Chunk>> Chunks; typedef oatpp::collection::LinkedList<std::shared_ptr<Chunk>> Chunks;
private: private:
os::io::Library::v_size m_size; data::v_io_size m_size;
os::io::Library::v_size m_chunkPos; data::v_io_size m_chunkPos;
ChunkEntry* m_firstEntry; ChunkEntry* m_firstEntry;
ChunkEntry* m_lastEntry; ChunkEntry* m_lastEntry;
@ -109,20 +109,20 @@ private:
ChunkEntry* obtainNewEntry(); ChunkEntry* obtainNewEntry();
void freeEntry(ChunkEntry* entry); void freeEntry(ChunkEntry* entry);
os::io::Library::v_size writeToEntry(ChunkEntry* entry, data::v_io_size writeToEntry(ChunkEntry* entry,
const void *data, const void *data,
os::io::Library::v_size count, data::v_io_size count,
os::io::Library::v_size& outChunkPos); data::v_io_size& outChunkPos);
os::io::Library::v_size writeToEntryFrom(ChunkEntry* entry, data::v_io_size writeToEntryFrom(ChunkEntry* entry,
os::io::Library::v_size inChunkPos, data::v_io_size inChunkPos,
const void *data, const void *data,
os::io::Library::v_size count, data::v_io_size count,
os::io::Library::v_size& outChunkPos); data::v_io_size& outChunkPos);
ChunkEntry* getChunkForPosition(ChunkEntry* fromChunk, ChunkEntry* getChunkForPosition(ChunkEntry* fromChunk,
os::io::Library::v_size pos, data::v_io_size pos,
os::io::Library::v_size& outChunkPos); data::v_io_size& outChunkPos);
public: public:
@ -143,16 +143,16 @@ public:
return Shared_ChunkedBuffer_Pool::allocateShared(); return Shared_ChunkedBuffer_Pool::allocateShared();
} }
os::io::Library::v_size write(const void *data, os::io::Library::v_size count) override; data::v_io_size write(const void *data, data::v_io_size count) override;
os::io::Library::v_size readSubstring(void *buffer, data::v_io_size readSubstring(void *buffer,
os::io::Library::v_size pos, data::v_io_size pos,
os::io::Library::v_size count); data::v_io_size count);
/** /**
* return substring of the data written to stream; NOT NULL * return substring of the data written to stream; NOT NULL
*/ */
oatpp::String getSubstring(os::io::Library::v_size pos, os::io::Library::v_size count); oatpp::String getSubstring(data::v_io_size pos, data::v_io_size count);
/** /**
* return data written to stream as oatpp::String; NOT NULL * return data written to stream as oatpp::String; NOT NULL
@ -168,7 +168,7 @@ public:
std::shared_ptr<Chunks> getChunks(); std::shared_ptr<Chunks> getChunks();
os::io::Library::v_size getSize(); data::v_io_size getSize();
void clear(); void clear();
}; };

View File

@ -31,12 +31,12 @@ namespace oatpp { namespace data{ namespace stream {
class WriterDelegate { class WriterDelegate {
public: public:
virtual os::io::Library::v_size writeToStream(const std::shared_ptr<OutputStream>& stream) = 0; virtual data::v_io_size writeToStream(const std::shared_ptr<OutputStream>& stream) = 0;
}; };
class ReaderDelegate { class ReaderDelegate {
public: public:
virtual os::io::Library::v_size readFromStream(const std::shared_ptr<InputStream>& stream) = 0; virtual data::v_io_size readFromStream(const std::shared_ptr<InputStream>& stream) = 0;
}; };
}}} }}}

View File

@ -30,7 +30,7 @@ namespace oatpp { namespace data{ namespace stream {
const char* const Errors::ERROR_ASYNC_FAILED_TO_WRITE_DATA = "ERROR_ASYNC_FAILED_TO_WRITE_DATA"; const char* const Errors::ERROR_ASYNC_FAILED_TO_WRITE_DATA = "ERROR_ASYNC_FAILED_TO_WRITE_DATA";
const char* const Errors::ERROR_ASYNC_FAILED_TO_READ_DATA = "ERROR_ASYNC_FAILED_TO_READ_DATA"; const char* const Errors::ERROR_ASYNC_FAILED_TO_READ_DATA = "ERROR_ASYNC_FAILED_TO_READ_DATA";
os::io::Library::v_size OutputStream::writeAsString(v_int32 value){ data::v_io_size OutputStream::writeAsString(v_int32 value){
v_char8 a[100]; v_char8 a[100];
v_int32 size = utils::conversion::int32ToCharSequence(value, &a[0]); v_int32 size = utils::conversion::int32ToCharSequence(value, &a[0]);
if(size > 0){ if(size > 0){
@ -39,7 +39,7 @@ os::io::Library::v_size OutputStream::writeAsString(v_int32 value){
return 0; return 0;
} }
os::io::Library::v_size OutputStream::writeAsString(v_int64 value){ data::v_io_size OutputStream::writeAsString(v_int64 value){
v_char8 a[100]; v_char8 a[100];
v_int32 size = utils::conversion::int64ToCharSequence(value, &a[0]); v_int32 size = utils::conversion::int64ToCharSequence(value, &a[0]);
if(size > 0){ if(size > 0){
@ -48,7 +48,7 @@ os::io::Library::v_size OutputStream::writeAsString(v_int64 value){
return 0; return 0;
} }
os::io::Library::v_size OutputStream::writeAsString(v_float32 value){ data::v_io_size OutputStream::writeAsString(v_float32 value){
v_char8 a[100]; v_char8 a[100];
v_int32 size = utils::conversion::float32ToCharSequence(value, &a[0]); v_int32 size = utils::conversion::float32ToCharSequence(value, &a[0]);
if(size > 0){ if(size > 0){
@ -57,7 +57,7 @@ os::io::Library::v_size OutputStream::writeAsString(v_float32 value){
return 0; return 0;
} }
os::io::Library::v_size OutputStream::writeAsString(v_float64 value){ data::v_io_size OutputStream::writeAsString(v_float64 value){
v_char8 a[100]; v_char8 a[100];
v_int32 size = utils::conversion::float64ToCharSequence(value, &a[0]); v_int32 size = utils::conversion::float64ToCharSequence(value, &a[0]);
if(size > 0){ if(size > 0){
@ -66,7 +66,7 @@ os::io::Library::v_size OutputStream::writeAsString(v_float64 value){
return 0; return 0;
} }
os::io::Library::v_size OutputStream::writeAsString(bool value) { data::v_io_size OutputStream::writeAsString(bool value) {
if(value){ if(value){
return write("true", 4); return write("true", 4);
} else { } else {
@ -117,16 +117,16 @@ const std::shared_ptr<OutputStream>& operator << (const std::shared_ptr<OutputSt
return s; return s;
} }
oatpp::os::io::Library::v_size transfer(const std::shared_ptr<InputStream>& fromStream, oatpp::data::v_io_size transfer(const std::shared_ptr<InputStream>& fromStream,
const std::shared_ptr<OutputStream>& toStream, const std::shared_ptr<OutputStream>& toStream,
oatpp::os::io::Library::v_size transferSize, oatpp::data::v_io_size transferSize,
void* buffer, void* buffer,
oatpp::os::io::Library::v_size bufferSize) { oatpp::data::v_io_size bufferSize) {
oatpp::os::io::Library::v_size progress = 0; oatpp::data::v_io_size progress = 0;
while (transferSize == 0 || progress < transferSize) { while (transferSize == 0 || progress < transferSize) {
oatpp::os::io::Library::v_size desiredReadCount = transferSize - progress; oatpp::data::v_io_size desiredReadCount = transferSize - progress;
if(transferSize == 0 || desiredReadCount > bufferSize){ if(transferSize == 0 || desiredReadCount > bufferSize){
desiredReadCount = bufferSize; desiredReadCount = bufferSize;
} }
@ -138,7 +138,7 @@ oatpp::os::io::Library::v_size transfer(const std::shared_ptr<InputStream>& from
} }
progress += readResult; progress += readResult;
} else { } else {
if(readResult == oatpp::data::stream::Errors::ERROR_IO_RETRY || readResult == oatpp::data::stream::Errors::ERROR_IO_WAIT_RETRY) { if(readResult == data::IOError::RETRY || readResult == data::IOError::WAIT_RETRY) {
continue; continue;
} }
return progress; return progress;
@ -153,27 +153,27 @@ oatpp::async::Action transferAsync(oatpp::async::AbstractCoroutine* parentCorout
const oatpp::async::Action& actionOnReturn, const oatpp::async::Action& actionOnReturn,
const std::shared_ptr<InputStream>& fromStream, const std::shared_ptr<InputStream>& fromStream,
const std::shared_ptr<OutputStream>& toStream, const std::shared_ptr<OutputStream>& toStream,
oatpp::os::io::Library::v_size transferSize, oatpp::data::v_io_size transferSize,
const std::shared_ptr<oatpp::data::buffer::IOBuffer>& buffer) { const std::shared_ptr<oatpp::data::buffer::IOBuffer>& buffer) {
class TransferCoroutine : public oatpp::async::Coroutine<TransferCoroutine> { class TransferCoroutine : public oatpp::async::Coroutine<TransferCoroutine> {
private: private:
std::shared_ptr<InputStream> m_fromStream; std::shared_ptr<InputStream> m_fromStream;
std::shared_ptr<OutputStream> m_toStream; std::shared_ptr<OutputStream> m_toStream;
oatpp::os::io::Library::v_size m_transferSize; oatpp::data::v_io_size m_transferSize;
oatpp::os::io::Library::v_size m_progress; oatpp::data::v_io_size m_progress;
std::shared_ptr<oatpp::data::buffer::IOBuffer> m_buffer; std::shared_ptr<oatpp::data::buffer::IOBuffer> m_buffer;
oatpp::os::io::Library::v_size m_desiredReadCount; oatpp::data::v_io_size m_desiredReadCount;
void* m_readBufferPtr; void* m_readBufferPtr;
const void* m_writeBufferPtr; const void* m_writeBufferPtr;
oatpp::os::io::Library::v_size m_bytesLeft; oatpp::data::v_io_size m_bytesLeft;
public: public:
TransferCoroutine(const std::shared_ptr<InputStream>& fromStream, TransferCoroutine(const std::shared_ptr<InputStream>& fromStream,
const std::shared_ptr<OutputStream>& toStream, const std::shared_ptr<OutputStream>& toStream,
oatpp::os::io::Library::v_size transferSize, oatpp::data::v_io_size transferSize,
const std::shared_ptr<oatpp::data::buffer::IOBuffer>& buffer) const std::shared_ptr<oatpp::data::buffer::IOBuffer>& buffer)
: m_fromStream(fromStream) : m_fromStream(fromStream)
, m_toStream(toStream) , m_toStream(toStream)
@ -241,38 +241,16 @@ oatpp::async::Action transferAsync(oatpp::async::AbstractCoroutine* parentCorout
} }
oatpp::async::Action writeSomeDataAsyncInline(oatpp::data::stream::OutputStream* stream,
const void*& data,
os::io::Library::v_size& size,
const oatpp::async::Action& nextAction) {
auto res = stream->write(data, size);
if(res == oatpp::data::stream::Errors::ERROR_IO_WAIT_RETRY) {
return oatpp::async::Action::_WAIT_RETRY;
} else if(res == oatpp::data::stream::Errors::ERROR_IO_RETRY) {
return oatpp::async::Action::_REPEAT;
} else if(res == oatpp::data::stream::Errors::ERROR_IO_PIPE) {
return oatpp::async::Action::_ABORT;
} else if(res < 0) {
return oatpp::async::Action(oatpp::async::Error(Errors::ERROR_ASYNC_FAILED_TO_WRITE_DATA));
}
data = &((p_char8) data)[res];
size = size - res;
if(res < size && res > 0) {
return oatpp::async::Action::_REPEAT;
}
return nextAction;
}
oatpp::async::Action writeExactSizeDataAsyncInline(oatpp::data::stream::OutputStream* stream, oatpp::async::Action writeExactSizeDataAsyncInline(oatpp::data::stream::OutputStream* stream,
const void*& data, const void*& data,
os::io::Library::v_size& size, data::v_io_size& size,
const oatpp::async::Action& nextAction) { const oatpp::async::Action& nextAction) {
auto res = stream->write(data, size); auto res = stream->write(data, size);
if(res == oatpp::data::stream::Errors::ERROR_IO_WAIT_RETRY) { if(res == data::IOError::WAIT_RETRY) {
return oatpp::async::Action::_WAIT_RETRY; return oatpp::async::Action::_WAIT_RETRY;
} else if(res == oatpp::data::stream::Errors::ERROR_IO_RETRY) { } else if(res == data::IOError::RETRY) {
return oatpp::async::Action::_REPEAT; return oatpp::async::Action::_REPEAT;
} else if(res == oatpp::data::stream::Errors::ERROR_IO_PIPE) { } else if(res == data::IOError::BROKEN_PIPE) {
return oatpp::async::Action::_ABORT; return oatpp::async::Action::_ABORT;
} else if(res < 0) { } else if(res < 0) {
return oatpp::async::Action(oatpp::async::Error(Errors::ERROR_ASYNC_FAILED_TO_WRITE_DATA)); return oatpp::async::Action(oatpp::async::Error(Errors::ERROR_ASYNC_FAILED_TO_WRITE_DATA));
@ -280,21 +258,22 @@ oatpp::async::Action writeExactSizeDataAsyncInline(oatpp::data::stream::OutputSt
return oatpp::async::Action(oatpp::async::Error(Errors::ERROR_ASYNC_FAILED_TO_WRITE_DATA)); return oatpp::async::Action(oatpp::async::Error(Errors::ERROR_ASYNC_FAILED_TO_WRITE_DATA));
} }
data = &((p_char8) data)[res]; data = &((p_char8) data)[res];
size = size - res;
if(res < size) { if(res < size) {
size = size - res;
return oatpp::async::Action::_REPEAT; return oatpp::async::Action::_REPEAT;
} }
size = size - res;
return nextAction; return nextAction;
} }
oatpp::async::Action readSomeDataAsyncInline(oatpp::data::stream::InputStream* stream, oatpp::async::Action readSomeDataAsyncInline(oatpp::data::stream::InputStream* stream,
void*& data, void*& data,
os::io::Library::v_size& bytesLeftToRead, data::v_io_size& bytesLeftToRead,
const oatpp::async::Action& nextAction) { const oatpp::async::Action& nextAction) {
auto res = stream->read(data, bytesLeftToRead); auto res = stream->read(data, bytesLeftToRead);
if(res == oatpp::data::stream::Errors::ERROR_IO_WAIT_RETRY) { if(res == data::IOError::WAIT_RETRY) {
return oatpp::async::Action::_WAIT_RETRY; return oatpp::async::Action::_WAIT_RETRY;
} else if(res == oatpp::data::stream::Errors::ERROR_IO_RETRY) { } else if(res == data::IOError::RETRY) {
return oatpp::async::Action::_REPEAT; return oatpp::async::Action::_REPEAT;
} else if( res < 0) { } else if( res < 0) {
return oatpp::async::Action(oatpp::async::Error(Errors::ERROR_ASYNC_FAILED_TO_READ_DATA)); return oatpp::async::Action(oatpp::async::Error(Errors::ERROR_ASYNC_FAILED_TO_READ_DATA));
@ -307,14 +286,14 @@ oatpp::async::Action readSomeDataAsyncInline(oatpp::data::stream::InputStream* s
oatpp::async::Action readExactSizeDataAsyncInline(oatpp::data::stream::InputStream* stream, oatpp::async::Action readExactSizeDataAsyncInline(oatpp::data::stream::InputStream* stream,
void*& data, void*& data,
os::io::Library::v_size& bytesLeftToRead, data::v_io_size& bytesLeftToRead,
const oatpp::async::Action& nextAction) { const oatpp::async::Action& nextAction) {
auto res = stream->read(data, bytesLeftToRead); auto res = stream->read(data, bytesLeftToRead);
if(res == oatpp::data::stream::Errors::ERROR_IO_WAIT_RETRY) { if(res == data::IOError::WAIT_RETRY) {
return oatpp::async::Action::_WAIT_RETRY; return oatpp::async::Action::_WAIT_RETRY;
} else if(res == oatpp::data::stream::Errors::ERROR_IO_RETRY) { } else if(res == data::IOError::RETRY) {
return oatpp::async::Action::_REPEAT; return oatpp::async::Action::_REPEAT;
} else if(res == oatpp::data::stream::Errors::ERROR_IO_PIPE) { } else if(res == data::IOError::BROKEN_PIPE) {
return oatpp::async::Action::_ABORT; return oatpp::async::Action::_ABORT;
} else if( res < 0) { } else if( res < 0) {
return oatpp::async::Action(oatpp::async::Error(Errors::ERROR_ASYNC_FAILED_TO_READ_DATA)); return oatpp::async::Action(oatpp::async::Error(Errors::ERROR_ASYNC_FAILED_TO_READ_DATA));
@ -322,17 +301,18 @@ oatpp::async::Action readExactSizeDataAsyncInline(oatpp::data::stream::InputStre
return oatpp::async::Action(oatpp::async::Error(Errors::ERROR_ASYNC_FAILED_TO_READ_DATA)); return oatpp::async::Action(oatpp::async::Error(Errors::ERROR_ASYNC_FAILED_TO_READ_DATA));
} }
data = &((p_char8) data)[res]; data = &((p_char8) data)[res];
bytesLeftToRead -= res;
if(res < bytesLeftToRead) { if(res < bytesLeftToRead) {
bytesLeftToRead -= res;
return oatpp::async::Action::_REPEAT; return oatpp::async::Action::_REPEAT;
} }
bytesLeftToRead -= res;
return nextAction; return nextAction;
} }
oatpp::os::io::Library::v_size readExactSizeData(oatpp::data::stream::InputStream* stream, void* data, os::io::Library::v_size size) { oatpp::data::v_io_size readExactSizeData(oatpp::data::stream::InputStream* stream, void* data, data::v_io_size size) {
char* buffer = (char*) data; char* buffer = (char*) data;
oatpp::os::io::Library::v_size progress = 0; oatpp::data::v_io_size progress = 0;
while (progress < size) { while (progress < size) {
@ -341,7 +321,7 @@ oatpp::os::io::Library::v_size readExactSizeData(oatpp::data::stream::InputStrea
if(res > 0) { if(res > 0) {
progress += res; progress += res;
} else { // if res == 0 then probably stream handles read() error incorrectly. return. } else { // if res == 0 then probably stream handles read() error incorrectly. return.
if(res == oatpp::data::stream::Errors::ERROR_IO_RETRY || res == oatpp::data::stream::Errors::ERROR_IO_WAIT_RETRY) { if(res == data::IOError::RETRY || res == data::IOError::WAIT_RETRY) {
continue; continue;
} }
return progress; return progress;
@ -353,19 +333,19 @@ oatpp::os::io::Library::v_size readExactSizeData(oatpp::data::stream::InputStrea
} }
oatpp::os::io::Library::v_size writeExactSizeData(oatpp::data::stream::OutputStream* stream, const void* data, os::io::Library::v_size size) { oatpp::data::v_io_size writeExactSizeData(oatpp::data::stream::OutputStream* stream, const void* data, data::v_io_size size) {
const char* buffer = (char*)data; const char* buffer = (char*)data;
oatpp::os::io::Library::v_size progress = 0; oatpp::data::v_io_size progress = 0;
while (progress < size) { while (progress < size) {
auto res = stream->write(&buffer[progress], size - progress); auto res = stream->write(&buffer[progress], size - progress);
if(res > 0) { if(res > 0) {
progress += res; progress += res;
} else { // if res == 0 then probably stream handles write() error incorrectly. return. } else { // if res == 0 then probably stream handles write() error incorrectly. return.
if(res == oatpp::data::stream::Errors::ERROR_IO_RETRY || res == oatpp::data::stream::Errors::ERROR_IO_WAIT_RETRY) { if(res == data::IOError::RETRY || res == data::IOError::WAIT_RETRY) {
continue; continue;
} }
return progress; return progress;

View File

@ -25,25 +25,23 @@
#ifndef oatpp_data_Stream #ifndef oatpp_data_Stream
#define oatpp_data_Stream #define oatpp_data_Stream
#include "oatpp/core/async/Coroutine.hpp"
#include "oatpp/core/data/buffer/IOBuffer.hpp" #include "oatpp/core/data/buffer/IOBuffer.hpp"
#include "oatpp/core/Types.hpp" #include "oatpp/core/data/IODefinitions.hpp"
#include "oatpp/core/async/Coroutine.hpp"
#include "oatpp/core/os/io/Library.hpp"
namespace oatpp { namespace data{ namespace stream { namespace oatpp { namespace data{ namespace stream {
class Errors { class Errors {
public: public:
constexpr static os::io::Library::v_size ERROR_IO_NOTHING_TO_READ = -1001; /**
constexpr static os::io::Library::v_size ERROR_IO_WAIT_RETRY = -1002; * deprecated
constexpr static os::io::Library::v_size ERROR_IO_RETRY = -1003; */
constexpr static os::io::Library::v_size ERROR_IO_PIPE = -1004;
static const char* const ERROR_ASYNC_FAILED_TO_WRITE_DATA; static const char* const ERROR_ASYNC_FAILED_TO_WRITE_DATA;
/**
* deprecated
*/
static const char* const ERROR_ASYNC_FAILED_TO_READ_DATA; static const char* const ERROR_ASYNC_FAILED_TO_READ_DATA;
}; };
@ -54,25 +52,25 @@ public:
* Write data to stream up to count bytes, and return number of bytes actually written * Write data to stream up to count bytes, and return number of bytes actually written
* It is a legal case if return result < count. Caller should handle this! * It is a legal case if return result < count. Caller should handle this!
*/ */
virtual os::io::Library::v_size write(const void *data, os::io::Library::v_size count) = 0; virtual data::v_io_size write(const void *data, data::v_io_size count) = 0;
os::io::Library::v_size write(const char* data){ data::v_io_size write(const char* data){
return write((p_char8)data, std::strlen(data)); return write((p_char8)data, std::strlen(data));
} }
os::io::Library::v_size write(const oatpp::String& str){ data::v_io_size write(const oatpp::String& str){
return write(str->getData(), str->getSize()); return write(str->getData(), str->getSize());
} }
os::io::Library::v_size writeChar(v_char8 c){ data::v_io_size writeChar(v_char8 c){
return write(&c, 1); return write(&c, 1);
} }
os::io::Library::v_size writeAsString(v_int32 value); data::v_io_size writeAsString(v_int32 value);
os::io::Library::v_size writeAsString(v_int64 value); data::v_io_size writeAsString(v_int64 value);
os::io::Library::v_size writeAsString(v_float32 value); data::v_io_size writeAsString(v_float32 value);
os::io::Library::v_size writeAsString(v_float64 value); data::v_io_size writeAsString(v_float64 value);
os::io::Library::v_size writeAsString(bool value); data::v_io_size writeAsString(bool value);
}; };
@ -82,12 +80,12 @@ public:
* Read data from stream up to count bytes, and return number of bytes actually read * Read data from stream up to count bytes, and return number of bytes actually read
* It is a legal case if return result < count. Caller should handle this! * It is a legal case if return result < count. Caller should handle this!
*/ */
virtual os::io::Library::v_size read(void *data, os::io::Library::v_size count) = 0; virtual data::v_io_size read(void *data, data::v_io_size count) = 0;
}; };
class IOStream : public InputStream, public OutputStream { class IOStream : public InputStream, public OutputStream {
public: public:
typedef os::io::Library::v_size v_size; typedef data::v_io_size v_size;
}; };
class CompoundIOStream : public oatpp::base::Controllable, public IOStream { class CompoundIOStream : public oatpp::base::Controllable, public IOStream {
@ -110,11 +108,11 @@ public:
return Shared_CompoundIOStream_Pool::allocateShared(outputStream, inputStream); return Shared_CompoundIOStream_Pool::allocateShared(outputStream, inputStream);
} }
os::io::Library::v_size write(const void *data, os::io::Library::v_size count) override { data::v_io_size write(const void *data, data::v_io_size count) override {
return m_outputStream->write(data, count); return m_outputStream->write(data, count);
} }
os::io::Library::v_size read(void *data, os::io::Library::v_size count) override { data::v_io_size read(void *data, data::v_io_size count) override {
return m_inputStream->read(data, count); return m_inputStream->read(data, count);
} }
@ -141,11 +139,11 @@ const std::shared_ptr<OutputStream>& operator << (const std::shared_ptr<OutputSt
* transfer up to transferSize or until error if transferSize == 0 * transfer up to transferSize or until error if transferSize == 0
* throws in case readCount != writeCount * throws in case readCount != writeCount
*/ */
oatpp::os::io::Library::v_size transfer(const std::shared_ptr<InputStream>& fromStream, oatpp::data::v_io_size transfer(const std::shared_ptr<InputStream>& fromStream,
const std::shared_ptr<OutputStream>& toStream, const std::shared_ptr<OutputStream>& toStream,
oatpp::os::io::Library::v_size transferSize, oatpp::data::v_io_size transferSize,
void* buffer, void* buffer,
oatpp::os::io::Library::v_size bufferSize); oatpp::data::v_io_size bufferSize);
/** /**
@ -155,31 +153,23 @@ oatpp::async::Action transferAsync(oatpp::async::AbstractCoroutine* parentCorout
const oatpp::async::Action& actionOnReturn, const oatpp::async::Action& actionOnReturn,
const std::shared_ptr<InputStream>& fromStream, const std::shared_ptr<InputStream>& fromStream,
const std::shared_ptr<OutputStream>& toStream, const std::shared_ptr<OutputStream>& toStream,
oatpp::os::io::Library::v_size transferSize, oatpp::data::v_io_size transferSize,
const std::shared_ptr<oatpp::data::buffer::IOBuffer>& buffer); const std::shared_ptr<oatpp::data::buffer::IOBuffer>& buffer);
/**
* Async write data withot starting new Coroutine.
* Should be called from a separate Coroutine method
*/
oatpp::async::Action writeSomeDataAsyncInline(oatpp::data::stream::OutputStream* stream,
const void*& data,
os::io::Library::v_size& size,
const oatpp::async::Action& nextAction);
oatpp::async::Action writeExactSizeDataAsyncInline(oatpp::data::stream::OutputStream* stream, oatpp::async::Action writeExactSizeDataAsyncInline(oatpp::data::stream::OutputStream* stream,
const void*& data, const void*& data,
os::io::Library::v_size& size, data::v_io_size& size,
const oatpp::async::Action& nextAction); const oatpp::async::Action& nextAction);
oatpp::async::Action readSomeDataAsyncInline(oatpp::data::stream::InputStream* stream, oatpp::async::Action readSomeDataAsyncInline(oatpp::data::stream::InputStream* stream,
void*& data, void*& data,
os::io::Library::v_size& bytesLeftToRead, data::v_io_size& bytesLeftToRead,
const oatpp::async::Action& nextAction); const oatpp::async::Action& nextAction);
oatpp::async::Action readExactSizeDataAsyncInline(oatpp::data::stream::InputStream* stream, oatpp::async::Action readExactSizeDataAsyncInline(oatpp::data::stream::InputStream* stream,
void*& data, void*& data,
os::io::Library::v_size& bytesLeftToRead, data::v_io_size& bytesLeftToRead,
const oatpp::async::Action& nextAction); const oatpp::async::Action& nextAction);
/** /**
@ -187,14 +177,14 @@ oatpp::async::Action readExactSizeDataAsyncInline(oatpp::data::stream::InputStre
* returns exact amount of bytes was read. * returns exact amount of bytes was read.
* return result can be < size only in case of some disaster like connection reset by peer * return result can be < size only in case of some disaster like connection reset by peer
*/ */
oatpp::os::io::Library::v_size readExactSizeData(oatpp::data::stream::InputStream* stream, void* data, os::io::Library::v_size size); oatpp::data::v_io_size readExactSizeData(oatpp::data::stream::InputStream* stream, void* data, data::v_io_size size);
/** /**
* Write exact amount of bytes to stream. * Write exact amount of bytes to stream.
* returns exact amount of bytes was written. * returns exact amount of bytes was written.
* return result can be < size only in case of some disaster like broken pipe * return result can be < size only in case of some disaster like broken pipe
*/ */
oatpp::os::io::Library::v_size writeExactSizeData(oatpp::data::stream::OutputStream* stream, const void* data, os::io::Library::v_size size); oatpp::data::v_io_size writeExactSizeData(oatpp::data::stream::OutputStream* stream, const void* data, data::v_io_size size);
}}} }}}

View File

@ -26,183 +26,37 @@
namespace oatpp { namespace data{ namespace stream { namespace oatpp { namespace data{ namespace stream {
os::io::Library::v_size OutputStreamBufferedProxy::write(const void *data, os::io::Library::v_size count) { data::v_io_size OutputStreamBufferedProxy::write(const void *data, data::v_io_size count) {
if(m_pos == 0){ if(m_buffer.availableToWrite() > 0) {
return m_buffer.write(data, count);
v_bufferSize spaceLeft = m_bufferSize - m_posEnd;
if(spaceLeft > count){
memcpy(&m_buffer[m_posEnd], data, count);
m_posEnd += (v_bufferSize) count;
return count;
}
if(m_posEnd == 0) {
return m_outputStream->write(data, count);
}
if(spaceLeft > 0){
memcpy(&m_buffer[m_posEnd], data, spaceLeft);
m_posEnd = m_bufferSize;
}
os::io::Library::v_size writeResult = m_outputStream->write(m_buffer, m_bufferSize);
if(writeResult == m_bufferSize){
m_posEnd = 0;
os::io::Library::v_size bigResult = write(&((p_char8) data)[spaceLeft], count - spaceLeft);
if(bigResult > 0) {
return bigResult + spaceLeft;
} else if(bigResult < 0) {
return bigResult;
} else {
return spaceLeft;
}
}
if(writeResult > 0){
m_pos += (v_bufferSize) writeResult;
} else if(writeResult < 0) {
return writeResult;
}
return spaceLeft;
} else { } else {
auto amount = m_posEnd - m_pos; auto bytesFlushed = m_buffer.readAndWriteToStream(*m_outputStream, m_buffer.getBufferSize());
if(amount > 0){ if(bytesFlushed > 0) {
os::io::Library::v_size result = m_outputStream->write(&m_buffer[m_pos], amount); return m_buffer.write(data, count);
if(result == amount){
m_pos = 0;
m_posEnd = 0;
return write(data, count);
} else if(result > 0){
m_pos += (v_bufferSize) result;
return 0;
}
return result;
} }
m_pos = 0; return bytesFlushed;
m_posEnd = 0;
return write(data, count);
} }
} }
os::io::Library::v_size OutputStreamBufferedProxy::flush() { data::v_io_size OutputStreamBufferedProxy::flush() {
auto amount = m_posEnd - m_pos; return m_buffer.flushToStream(*m_outputStream);
if(amount > 0){
os::io::Library::v_size result = stream::writeExactSizeData(m_outputStream.get(), &m_buffer[m_pos], amount);
if(result == amount){
m_pos = 0;
m_posEnd = 0;
} else if(result > 0){
m_pos += (v_bufferSize) result;
}
return result;
}
return 0;
} }
oatpp::async::Action OutputStreamBufferedProxy::flushAsync(oatpp::async::AbstractCoroutine* parentCoroutine, oatpp::async::Action OutputStreamBufferedProxy::flushAsync(oatpp::async::AbstractCoroutine* parentCoroutine,
const oatpp::async::Action& actionOnFinish) { const oatpp::async::Action& actionOnFinish) {
return m_buffer.flushToStreamAsync(parentCoroutine, actionOnFinish, *m_outputStream);
class FlushCoroutine : public oatpp::async::Coroutine<FlushCoroutine> {
private:
std::shared_ptr<OutputStreamBufferedProxy> m_stream;
public:
FlushCoroutine(const std::shared_ptr<OutputStreamBufferedProxy>& stream)
: m_stream(stream)
{}
Action act() override {
auto amount = m_stream->m_posEnd - m_stream->m_pos;
if(amount > 0){
os::io::Library::v_size result = m_stream->m_outputStream->write(&m_stream->m_buffer[m_stream->m_pos], amount);
if(result == amount) {
m_stream->m_pos = 0;
m_stream->m_posEnd = 0;
return finish();
} else if(result == oatpp::data::stream::Errors::ERROR_IO_WAIT_RETRY) {
return oatpp::async::Action::_WAIT_RETRY;
} else if(result == oatpp::data::stream::Errors::ERROR_IO_RETRY) {
return oatpp::async::Action::_REPEAT;
} else if(result == oatpp::data::stream::Errors::ERROR_IO_PIPE) {
return error("[oatpp::data::stream::OutputStreamBufferedProxy::flushAsync()]: Error - oatpp::data::stream::Errors::ERROR_IO_PIPE");
} else if( result < 0) {
return error("[oatpp::data::stream::OutputStreamBufferedProxy::flushAsync()]: Error - Failed to flush all data");
} else if(result < amount) {
m_stream->m_pos += (v_bufferSize) result;
return oatpp::async::Action::_REPEAT;
}
}
return finish();
}
};
return parentCoroutine->startCoroutine<FlushCoroutine>(actionOnFinish, getSharedPtr<OutputStreamBufferedProxy>());
} }
os::io::Library::v_size InputStreamBufferedProxy::read(void *data, os::io::Library::v_size count) { data::v_io_size InputStreamBufferedProxy::read(void *data, data::v_io_size count) {
if (m_pos == 0 && m_posEnd == 0) { if(m_buffer.availableToRead() > 0) {
return m_buffer.read(data, count);
if(count > m_bufferSize){
//if(m_hasError){
// errno = m_errno;
// return -1;
//}
return m_inputStream->read(data, count);
} else {
//if(m_hasError){
// errno = m_errno;
// return -1;
//}
m_posEnd = (v_bufferSize) m_inputStream->read(m_buffer, m_bufferSize);
v_bufferSize result;
if(m_posEnd > count){
result = (v_bufferSize) count;
m_pos = result;
} else {
result = m_posEnd;
m_posEnd = 0;
m_pos = 0;
if(result < 0) {
return result;
}
}
std::memcpy(data, m_buffer, result);
return result;
}
} else { } else {
v_bufferSize result = m_posEnd - m_pos; auto bytesBuffered = m_buffer.readFromStreamAndWrite(*m_inputStream, m_buffer.getBufferSize());
if(count > result){ if(bytesBuffered > 0) {
return m_buffer.read(data, count);
std::memcpy(data, &m_buffer[m_pos], result);
m_pos = 0;
m_posEnd = 0;
os::io::Library::v_size bigResult = read(&((p_char8) data) [result], count - result);
if(bigResult > 0){
return bigResult + result;
} else if(bigResult < 0) {
return bigResult;
}
return result;
} else {
std::memcpy(data, &m_buffer[m_pos], count);
m_pos += (v_bufferSize) count;
if(m_pos == m_posEnd){
m_pos = 0;
m_posEnd = 0;
}
return count;
} }
return bytesBuffered;
} }
} }

View File

@ -26,6 +26,7 @@
#define oatpp_data_stream_StreamBufferedProxy_hpp #define oatpp_data_stream_StreamBufferedProxy_hpp
#include "Stream.hpp" #include "Stream.hpp"
#include "oatpp/core/data/buffer/FIFOBuffer.hpp"
#include "oatpp/core/data/buffer/IOBuffer.hpp" #include "oatpp/core/data/buffer/IOBuffer.hpp"
#include "oatpp/core/async/Coroutine.hpp" #include "oatpp/core/async/Coroutine.hpp"
@ -40,10 +41,7 @@ public:
private: private:
std::shared_ptr<OutputStream> m_outputStream; std::shared_ptr<OutputStream> m_outputStream;
std::shared_ptr<oatpp::data::buffer::IOBuffer> m_bufferPtr; std::shared_ptr<oatpp::data::buffer::IOBuffer> m_bufferPtr;
p_char8 m_buffer; buffer::FIFOBuffer m_buffer;
v_bufferSize m_bufferSize;
v_bufferSize m_pos;
v_bufferSize m_posEnd;
public: public:
OutputStreamBufferedProxy(const std::shared_ptr<OutputStream>& outputStream, OutputStreamBufferedProxy(const std::shared_ptr<OutputStream>& outputStream,
const std::shared_ptr<oatpp::data::buffer::IOBuffer>& bufferPtr, const std::shared_ptr<oatpp::data::buffer::IOBuffer>& bufferPtr,
@ -51,10 +49,7 @@ public:
v_bufferSize bufferSize) v_bufferSize bufferSize)
: m_outputStream(outputStream) : m_outputStream(outputStream)
, m_bufferPtr(bufferPtr) , m_bufferPtr(bufferPtr)
, m_buffer(buffer) , m_buffer(buffer, bufferSize)
, m_bufferSize(bufferSize)
, m_pos(0)
, m_posEnd(0)
{} {}
public: public:
@ -77,14 +72,13 @@ public:
bufferSize); bufferSize);
} }
os::io::Library::v_size write(const void *data, os::io::Library::v_size count) override; data::v_io_size write(const void *data, data::v_io_size count) override;
os::io::Library::v_size flush(); data::v_io_size flush();
oatpp::async::Action flushAsync(oatpp::async::AbstractCoroutine* parentCoroutine, oatpp::async::Action flushAsync(oatpp::async::AbstractCoroutine* parentCoroutine,
const oatpp::async::Action& actionOnFinish); const oatpp::async::Action& actionOnFinish);
void setBufferPosition(v_bufferSize pos, v_bufferSize posEnd){ void setBufferPosition(data::v_io_size readPosition, data::v_io_size writePosition, bool canRead) {
m_pos = pos; m_buffer.setBufferPosition(readPosition, writePosition, canRead);
m_posEnd = posEnd;
} }
}; };
@ -98,79 +92,77 @@ public:
protected: protected:
std::shared_ptr<InputStream> m_inputStream; std::shared_ptr<InputStream> m_inputStream;
std::shared_ptr<oatpp::data::buffer::IOBuffer> m_bufferPtr; std::shared_ptr<oatpp::data::buffer::IOBuffer> m_bufferPtr;
p_char8 m_buffer; buffer::FIFOBuffer m_buffer;
v_bufferSize m_bufferSize;
v_bufferSize m_pos;
v_bufferSize m_posEnd;
public: public:
InputStreamBufferedProxy(const std::shared_ptr<InputStream>& inputStream, InputStreamBufferedProxy(const std::shared_ptr<InputStream>& inputStream,
const std::shared_ptr<oatpp::data::buffer::IOBuffer>& bufferPtr, const std::shared_ptr<oatpp::data::buffer::IOBuffer>& bufferPtr,
p_char8 buffer, p_char8 buffer,
v_bufferSize bufferSize, v_bufferSize bufferSize,
v_bufferSize positionStart, data::v_io_size bufferReadPosition,
v_bufferSize positionEnd) data::v_io_size bufferWritePosition,
bool bufferCanRead)
: m_inputStream(inputStream) : m_inputStream(inputStream)
, m_bufferPtr(bufferPtr) , m_bufferPtr(bufferPtr)
, m_buffer(buffer) , m_buffer(buffer, bufferSize, bufferReadPosition, bufferWritePosition, bufferCanRead)
, m_bufferSize(bufferSize)
, m_pos(positionStart)
, m_posEnd(positionEnd)
{} {}
public: public:
static std::shared_ptr<InputStreamBufferedProxy> createShared(const std::shared_ptr<InputStream>& inputStream, static std::shared_ptr<InputStreamBufferedProxy> createShared(const std::shared_ptr<InputStream>& inputStream,
const std::shared_ptr<oatpp::data::buffer::IOBuffer>& buffer) const std::shared_ptr<oatpp::data::buffer::IOBuffer>& buffer)
{ {
return Shared_InputStreamBufferedProxy_Pool::allocateShared(inputStream, return Shared_InputStreamBufferedProxy_Pool::allocateShared(inputStream,
buffer, buffer,
(p_char8) buffer->getData(), (p_char8) buffer->getData(),
buffer->getSize(), buffer->getSize(),
0, 0); 0, 0, false);
} }
static std::shared_ptr<InputStreamBufferedProxy> createShared(const std::shared_ptr<InputStream>& inputStream, static std::shared_ptr<InputStreamBufferedProxy> createShared(const std::shared_ptr<InputStream>& inputStream,
const std::shared_ptr<oatpp::data::buffer::IOBuffer>& buffer, const std::shared_ptr<oatpp::data::buffer::IOBuffer>& buffer,
v_bufferSize positionStart, data::v_io_size bufferReadPosition,
v_bufferSize positionEnd) data::v_io_size bufferWritePosition,
bool bufferCanRead)
{ {
return Shared_InputStreamBufferedProxy_Pool::allocateShared(inputStream, return Shared_InputStreamBufferedProxy_Pool::allocateShared(inputStream,
buffer, buffer,
(p_char8) buffer->getData(), (p_char8) buffer->getData(),
buffer->getSize(), buffer->getSize(),
positionStart, bufferReadPosition,
positionEnd); bufferWritePosition,
bufferCanRead);
} }
static std::shared_ptr<InputStreamBufferedProxy> createShared(const std::shared_ptr<InputStream>& inputStream, static std::shared_ptr<InputStreamBufferedProxy> createShared(const std::shared_ptr<InputStream>& inputStream,
p_char8 buffer, p_char8 buffer,
v_bufferSize bufferSize) v_bufferSize bufferSize)
{ {
return Shared_InputStreamBufferedProxy_Pool::allocateShared(inputStream, return Shared_InputStreamBufferedProxy_Pool::allocateShared(inputStream,
nullptr, nullptr,
buffer, buffer,
bufferSize, bufferSize,
0, 0); 0, 0, false);
} }
static std::shared_ptr<InputStreamBufferedProxy> createShared(const std::shared_ptr<InputStream>& inputStream, static std::shared_ptr<InputStreamBufferedProxy> createShared(const std::shared_ptr<InputStream>& inputStream,
p_char8 buffer, p_char8 buffer,
v_bufferSize bufferSize, v_bufferSize bufferSize,
v_bufferSize positionStart, data::v_io_size bufferReadPosition,
v_bufferSize positionEnd) data::v_io_size bufferWritePosition,
bool bufferCanRead)
{ {
return Shared_InputStreamBufferedProxy_Pool::allocateShared(inputStream, return Shared_InputStreamBufferedProxy_Pool::allocateShared(inputStream,
nullptr, nullptr,
buffer, buffer,
bufferSize, bufferSize,
positionStart, bufferReadPosition,
positionEnd); bufferWritePosition,
bufferCanRead);
} }
os::io::Library::v_size read(void *data, os::io::Library::v_size count) override; data::v_io_size read(void *data, data::v_io_size count) override;
void setBufferPosition(v_bufferSize pos, v_bufferSize posEnd){ void setBufferPosition(data::v_io_size readPosition, data::v_io_size writePosition, bool canRead) {
m_pos = pos; m_buffer.setBufferPosition(readPosition, writePosition, canRead);
m_posEnd = posEnd;
} }
}; };

View File

@ -1,49 +0,0 @@
/***************************************************************************
*
* Project _____ __ ____ _ _
* ( _ ) /__\ (_ _)_| |_ _| |_
* )(_)( /(__)\ )( (_ _)(_ _)
* (_____)(__)(__)(__) |_| |_|
*
*
* Copyright 2018-present, Leonid Stryzhevskyi, <lganzzzo@gmail.com>
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
***************************************************************************/
#include "Library.hpp"
#include <memory>
#include <unistd.h>
#include <sys/socket.h>
namespace oatpp { namespace os { namespace io {
v_int32 Library::handle_close(v_handle handle){
return close(handle);
}
Library::v_size Library::handle_read(v_handle handle, void *buf, v_size count){
return read(handle, buf, count);
}
Library::v_size Library::handle_write(v_handle handle, const void *buf, v_size count){
v_int32 flags = 0;
#ifdef MSG_NOSIGNAL
flags |= MSG_NOSIGNAL;
#endif
return send(handle, buf, count, flags);
}
}}}

View File

@ -1,47 +0,0 @@
/***************************************************************************
*
* Project _____ __ ____ _ _
* ( _ ) /__\ (_ _)_| |_ _| |_
* )(_)( /(__)\ )( (_ _)(_ _)
* (_____)(__)(__)(__) |_| |_|
*
*
* Copyright 2018-present, Leonid Stryzhevskyi, <lganzzzo@gmail.com>
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
***************************************************************************/
#ifndef oatpp_os_io_Library_hpp
#define oatpp_os_io_Library_hpp
#include "oatpp/core/base/Environment.hpp"
namespace oatpp { namespace os { namespace io {
class Library{
public:
typedef v_int32 v_handle;
typedef ssize_t v_size;
public:
static v_int32 handle_close(v_handle handle);
static v_size handle_read(v_handle handle, void *buf, v_size count);
static v_size handle_write(v_handle handle, const void *buf, v_size count);
};
}}}
#endif /* oatpp_os_io_Library_hpp */

View File

@ -24,13 +24,14 @@
#include "./Connection.hpp" #include "./Connection.hpp"
#include <unistd.h>
#include <sys/socket.h> #include <sys/socket.h>
#include <thread> #include <thread>
#include <chrono> #include <chrono>
namespace oatpp { namespace network { namespace oatpp { namespace network {
Connection::Connection(Library::v_handle handle) Connection::Connection(data::v_io_handle handle)
: m_handle(handle) : m_handle(handle)
{ {
} }
@ -39,17 +40,24 @@ Connection::~Connection(){
close(); close();
} }
Connection::Library::v_size Connection::write(const void *buff, Library::v_size count){ data::v_io_size Connection::write(const void *buff, data::v_io_size count){
errno = 0; errno = 0;
auto result = Library::handle_write(m_handle, buff, count);
v_int32 flags = 0;
#ifdef MSG_NOSIGNAL
flags |= MSG_NOSIGNAL;
#endif
auto result = ::send(m_handle, buff, count, flags);
if(result <= 0) { if(result <= 0) {
auto e = errno; auto e = errno;
if(e == EAGAIN || e == EWOULDBLOCK){ if(e == EAGAIN || e == EWOULDBLOCK){
return oatpp::data::stream::Errors::ERROR_IO_WAIT_RETRY; // For async io. In case socket is non_blocking return data::IOError::WAIT_RETRY; // For async io. In case socket is non_blocking
} else if(e == EINTR) { } else if(e == EINTR) {
return oatpp::data::stream::Errors::ERROR_IO_RETRY; return data::IOError::RETRY;
} else if(e == EPIPE) { } else if(e == EPIPE) {
return oatpp::data::stream::Errors::ERROR_IO_PIPE; return data::IOError::BROKEN_PIPE;
} else { } else {
//OATPP_LOGD("Connection", "write errno=%d", e); //OATPP_LOGD("Connection", "write errno=%d", e);
} }
@ -57,17 +65,17 @@ Connection::Library::v_size Connection::write(const void *buff, Library::v_size
return result; return result;
} }
Connection::Library::v_size Connection::read(void *buff, Library::v_size count){ data::v_io_size Connection::read(void *buff, data::v_io_size count){
errno = 0; errno = 0;
auto result = Library::handle_read(m_handle, buff, count); auto result = ::read(m_handle, buff, count);
if(result <= 0) { if(result <= 0) {
auto e = errno; auto e = errno;
if(e == EAGAIN || e == EWOULDBLOCK){ if(e == EAGAIN || e == EWOULDBLOCK){
return oatpp::data::stream::Errors::ERROR_IO_WAIT_RETRY; // For async io. In case socket is non_blocking return data::IOError::WAIT_RETRY; // For async io. In case socket is non_blocking
} else if(e == EINTR) { } else if(e == EINTR) {
return oatpp::data::stream::Errors::ERROR_IO_RETRY; return data::IOError::RETRY;
} else if(e == ECONNRESET) { } else if(e == ECONNRESET) {
return oatpp::data::stream::Errors::ERROR_IO_PIPE; return data::IOError::BROKEN_PIPE;
} else { } else {
//OATPP_LOGD("Connection", "write errno=%d", e); //OATPP_LOGD("Connection", "write errno=%d", e);
} }
@ -76,7 +84,7 @@ Connection::Library::v_size Connection::read(void *buff, Library::v_size count){
} }
void Connection::close(){ void Connection::close(){
Library::handle_close(m_handle); ::close(m_handle);
} }
}} }}

View File

@ -31,29 +31,27 @@
namespace oatpp { namespace network { namespace oatpp { namespace network {
class Connection : public oatpp::base::Controllable, public oatpp::data::stream::IOStream { class Connection : public oatpp::base::Controllable, public oatpp::data::stream::IOStream {
public:
typedef oatpp::os::io::Library Library;
public: public:
OBJECT_POOL(Connection_Pool, Connection, 32); OBJECT_POOL(Connection_Pool, Connection, 32);
SHARED_OBJECT_POOL(Shared_Connection_Pool, Connection, 32); SHARED_OBJECT_POOL(Shared_Connection_Pool, Connection, 32);
private: private:
Library::v_handle m_handle; data::v_io_handle m_handle;
public: public:
Connection(Library::v_handle handle); Connection(data::v_io_handle handle);
public: public:
static std::shared_ptr<Connection> createShared(Library::v_handle handle){ static std::shared_ptr<Connection> createShared(data::v_io_handle handle){
return Shared_Connection_Pool::allocateShared(handle); return Shared_Connection_Pool::allocateShared(handle);
} }
~Connection(); ~Connection();
Library::v_size write(const void *buff, Library::v_size count) override; data::v_io_size write(const void *buff, data::v_io_size count) override;
Library::v_size read(void *buff, Library::v_size count) override; data::v_io_size read(void *buff, data::v_io_size count) override;
void close(); void close();
Library::v_handle getHandle(){ data::v_io_handle getHandle(){
return m_handle; return m_handle;
} }

View File

@ -32,6 +32,7 @@
#include <netdb.h> #include <netdb.h>
#include <arpa/inet.h> #include <arpa/inet.h>
#include <sys/socket.h> #include <sys/socket.h>
#include <unistd.h>
namespace oatpp { namespace network { namespace client { namespace oatpp { namespace network { namespace client {
@ -58,7 +59,7 @@ std::shared_ptr<oatpp::data::stream::IOStream> SimpleTCPConnectionProvider::getC
client.sin_port = htons(m_port); client.sin_port = htons(m_port);
memcpy(&client.sin_addr, host->h_addr, host->h_length); memcpy(&client.sin_addr, host->h_addr, host->h_length);
oatpp::os::io::Library::v_handle clientHandle = socket(AF_INET, SOCK_STREAM, 0); oatpp::data::v_io_handle clientHandle = socket(AF_INET, SOCK_STREAM, 0);
if (clientHandle < 0) { if (clientHandle < 0) {
OATPP_LOGD("SimpleTCPConnectionProvider", "Error creating socket."); OATPP_LOGD("SimpleTCPConnectionProvider", "Error creating socket.");
@ -74,7 +75,7 @@ std::shared_ptr<oatpp::data::stream::IOStream> SimpleTCPConnectionProvider::getC
#endif #endif
if (connect(clientHandle, (struct sockaddr *)&client, sizeof(client)) != 0 ) { if (connect(clientHandle, (struct sockaddr *)&client, sizeof(client)) != 0 ) {
oatpp::os::io::Library::handle_close(clientHandle); ::close(clientHandle);
OATPP_LOGD("SimpleTCPConnectionProvider", "Could not connect"); OATPP_LOGD("SimpleTCPConnectionProvider", "Could not connect");
return nullptr; return nullptr;
} }
@ -90,7 +91,7 @@ oatpp::async::Action SimpleTCPConnectionProvider::getConnectionAsync(oatpp::asyn
private: private:
oatpp::String m_host; oatpp::String m_host;
v_int32 m_port; v_int32 m_port;
oatpp::os::io::Library::v_handle m_clientHandle; oatpp::data::v_io_handle m_clientHandle;
struct sockaddr_in m_client; struct sockaddr_in m_client;
public: public:
@ -143,7 +144,7 @@ oatpp::async::Action SimpleTCPConnectionProvider::getConnectionAsync(oatpp::asyn
} else if(errno == EINTR) { } else if(errno == EINTR) {
return repeat(); return repeat();
} }
oatpp::os::io::Library::handle_close(m_clientHandle); ::close(m_clientHandle);
return error("[oatpp::network::client::SimpleTCPConnectionProvider::getConnectionAsync()]: Can't connect"); return error("[oatpp::network::client::SimpleTCPConnectionProvider::getConnectionAsync()]: Can't connect");
} }

View File

@ -36,15 +36,11 @@
#include "oatpp/core/base/Controllable.hpp" #include "oatpp/core/base/Controllable.hpp"
#include "oatpp/core/base/Environment.hpp" #include "oatpp/core/base/Environment.hpp"
#include "oatpp/core/os/io/Library.hpp"
#include <atomic> #include <atomic>
namespace oatpp { namespace network { namespace server { namespace oatpp { namespace network { namespace server {
class Server : public base::Controllable, public concurrency::Runnable{ class Server : public base::Controllable, public concurrency::Runnable{
public:
typedef oatpp::os::io::Library Library;
private: private:
void mainLoop(); void mainLoop();

View File

@ -33,6 +33,7 @@
#include <arpa/inet.h> #include <arpa/inet.h>
#include <sys/socket.h> #include <sys/socket.h>
#include <netinet/tcp.h> #include <netinet/tcp.h>
#include <unistd.h>
namespace oatpp { namespace network { namespace server { namespace oatpp { namespace network { namespace server {
@ -44,10 +45,14 @@ SimpleTCPConnectionProvider::SimpleTCPConnectionProvider(v_word16 port, bool non
setProperty(PROPERTY_HOST, "localhost"); setProperty(PROPERTY_HOST, "localhost");
setProperty(PROPERTY_PORT, oatpp::utils::conversion::int32ToStr(port)); setProperty(PROPERTY_PORT, oatpp::utils::conversion::int32ToStr(port));
} }
SimpleTCPConnectionProvider::~SimpleTCPConnectionProvider() {
::close(m_serverHandle);
}
oatpp::os::io::Library::v_handle SimpleTCPConnectionProvider::instantiateServer(){ oatpp::data::v_io_handle SimpleTCPConnectionProvider::instantiateServer(){
oatpp::os::io::Library::v_handle serverHandle; oatpp::data::v_io_handle serverHandle;
v_int32 ret; v_int32 ret;
int yes = 1; int yes = 1;
@ -71,14 +76,14 @@ oatpp::os::io::Library::v_handle SimpleTCPConnectionProvider::instantiateServer(
ret = bind(serverHandle, (struct sockaddr *)&addr, sizeof(addr)); ret = bind(serverHandle, (struct sockaddr *)&addr, sizeof(addr));
if(ret != 0) { if(ret != 0) {
oatpp::os::io::Library::handle_close(serverHandle); ::close(serverHandle);
throw std::runtime_error("Can't bind to address"); throw std::runtime_error("Can't bind to address");
return -1 ; return -1 ;
} }
ret = listen(serverHandle, 10000); ret = listen(serverHandle, 10000);
if(ret < 0) { if(ret < 0) {
oatpp::os::io::Library::handle_close(serverHandle); ::close(serverHandle);
return -1 ; return -1 ;
} }
@ -92,7 +97,7 @@ std::shared_ptr<oatpp::data::stream::IOStream> SimpleTCPConnectionProvider::getC
//oatpp::test::PerformanceChecker checker("Accept Checker"); //oatpp::test::PerformanceChecker checker("Accept Checker");
oatpp::os::io::Library::v_handle handle = accept(m_serverHandle, nullptr, nullptr); oatpp::data::v_io_handle handle = accept(m_serverHandle, nullptr, nullptr);
if (handle < 0) { if (handle < 0) {
v_int32 error = errno; v_int32 error = errno;

View File

@ -29,7 +29,6 @@
#include "oatpp/core/data/stream/Stream.hpp" #include "oatpp/core/data/stream/Stream.hpp"
#include "oatpp/core/Types.hpp" #include "oatpp/core/Types.hpp"
#include "oatpp/core/os/io/Library.hpp"
namespace oatpp { namespace network { namespace server { namespace oatpp { namespace network { namespace server {
@ -37,9 +36,9 @@ class SimpleTCPConnectionProvider : public base::Controllable, public ServerConn
private: private:
v_word16 m_port; v_word16 m_port;
bool m_nonBlocking; bool m_nonBlocking;
oatpp::os::io::Library::v_handle m_serverHandle; oatpp::data::v_io_handle m_serverHandle;
private: private:
oatpp::os::io::Library::v_handle instantiateServer(); oatpp::data::v_io_handle instantiateServer();
public: public:
SimpleTCPConnectionProvider(v_word16 port, bool nonBlocking = false); SimpleTCPConnectionProvider(v_word16 port, bool nonBlocking = false);
public: public:
@ -48,9 +47,7 @@ public:
return std::make_shared<SimpleTCPConnectionProvider>(port, nonBlocking); return std::make_shared<SimpleTCPConnectionProvider>(port, nonBlocking);
} }
~SimpleTCPConnectionProvider() { ~SimpleTCPConnectionProvider();
oatpp::os::io::Library::handle_close(m_serverHandle);
}
std::shared_ptr<IOStream> getConnection() override; std::shared_ptr<IOStream> getConnection() override;

View File

@ -26,37 +26,37 @@
namespace oatpp { namespace network { namespace virtual_ { namespace oatpp { namespace network { namespace virtual_ {
void Pipe::Reader::setMaxAvailableToRead(os::io::Library::v_size maxAvailableToRead) { void Pipe::Reader::setMaxAvailableToRead(data::v_io_size maxAvailableToRead) {
m_maxAvailableToRead = maxAvailableToRead; m_maxAvailableToRead = maxAvailableToRead;
} }
os::io::Library::v_size Pipe::Reader::read(void *data, os::io::Library::v_size count) { data::v_io_size Pipe::Reader::read(void *data, data::v_io_size count) {
if(m_maxAvailableToRead > -1 && count > m_maxAvailableToRead) { if(m_maxAvailableToRead > -1 && count > m_maxAvailableToRead) {
count = m_maxAvailableToRead; count = m_maxAvailableToRead;
} }
Pipe& pipe = *m_pipe; Pipe& pipe = *m_pipe;
oatpp::os::io::Library::v_size result; oatpp::data::v_io_size result;
if(m_nonBlocking) { if(m_nonBlocking) {
if(pipe.m_buffer.availableToRead() > 0) { if(pipe.m_fifo.availableToRead() > 0) {
result = pipe.m_buffer.read(data, count); result = pipe.m_fifo.read(data, count);
} else if(pipe.m_open) { } else if(pipe.m_open) {
result = oatpp::data::stream::Errors::ERROR_IO_WAIT_RETRY; result = data::IOError::WAIT_RETRY;
} else { } else {
result = oatpp::data::stream::Errors::ERROR_IO_PIPE; result = data::IOError::BROKEN_PIPE;
} }
} else { } else {
std::unique_lock<std::mutex> lock(pipe.m_mutex); std::unique_lock<std::mutex> lock(pipe.m_mutex);
while (pipe.m_buffer.availableToRead() == 0 && pipe.m_open) { while (pipe.m_fifo.availableToRead() == 0 && pipe.m_open) {
pipe.m_conditionWrite.notify_one(); pipe.m_conditionWrite.notify_one();
pipe.m_conditionRead.wait(lock); pipe.m_conditionRead.wait(lock);
} }
if (pipe.m_buffer.availableToRead() > 0) { if (pipe.m_fifo.availableToRead() > 0) {
result = pipe.m_buffer.read(data, count); result = pipe.m_fifo.read(data, count);
} else { } else {
result = oatpp::data::stream::Errors::ERROR_IO_PIPE; result = data::IOError::BROKEN_PIPE;
} }
} }
@ -66,37 +66,37 @@ os::io::Library::v_size Pipe::Reader::read(void *data, os::io::Library::v_size c
} }
void Pipe::Writer::setMaxAvailableToWrite(os::io::Library::v_size maxAvailableToWrite) { void Pipe::Writer::setMaxAvailableToWrite(data::v_io_size maxAvailableToWrite) {
m_maxAvailableToWrtie = maxAvailableToWrite; m_maxAvailableToWrtie = maxAvailableToWrite;
} }
os::io::Library::v_size Pipe::Writer::write(const void *data, os::io::Library::v_size count) { data::v_io_size Pipe::Writer::write(const void *data, data::v_io_size count) {
if(m_maxAvailableToWrtie > -1 && count > m_maxAvailableToWrtie) { if(m_maxAvailableToWrtie > -1 && count > m_maxAvailableToWrtie) {
count = m_maxAvailableToWrtie; count = m_maxAvailableToWrtie;
} }
Pipe& pipe = *m_pipe; Pipe& pipe = *m_pipe;
oatpp::os::io::Library::v_size result; oatpp::data::v_io_size result;
if(m_nonBlocking) { if(m_nonBlocking) {
if(pipe.m_buffer.availableToWrite() > 0) { if(pipe.m_fifo.availableToWrite() > 0) {
result = pipe.m_buffer.write(data, count); result = pipe.m_fifo.write(data, count);
} else if(pipe.m_open) { } else if(pipe.m_open) {
result = oatpp::data::stream::Errors::ERROR_IO_WAIT_RETRY; result = data::IOError::WAIT_RETRY;
} else { } else {
result = oatpp::data::stream::Errors::ERROR_IO_PIPE; result = data::IOError::BROKEN_PIPE;
} }
} else { } else {
std::unique_lock<std::mutex> lock(pipe.m_mutex); std::unique_lock<std::mutex> lock(pipe.m_mutex);
while (pipe.m_buffer.availableToWrite() == 0 && pipe.m_open) { while (pipe.m_fifo.availableToWrite() == 0 && pipe.m_open) {
pipe.m_conditionRead.notify_one(); pipe.m_conditionRead.notify_one();
pipe.m_conditionWrite.wait(lock); pipe.m_conditionWrite.wait(lock);
} }
if (pipe.m_open && pipe.m_buffer.availableToWrite() > 0) { if (pipe.m_open && pipe.m_fifo.availableToWrite() > 0) {
result = pipe.m_buffer.write(data, count); result = pipe.m_fifo.write(data, count);
} else { } else {
result = oatpp::data::stream::Errors::ERROR_IO_PIPE; result = data::IOError::BROKEN_PIPE;
} }
} }

View File

@ -26,7 +26,9 @@
#define oatpp_network_virtual__Pipe_hpp #define oatpp_network_virtual__Pipe_hpp
#include "oatpp/core/data/stream/Stream.hpp" #include "oatpp/core/data/stream/Stream.hpp"
#include "oatpp/core/data/buffer/FIFOBuffer.hpp" #include "oatpp/core/data/buffer/FIFOBuffer.hpp"
#include "oatpp/core/data/buffer/IOBuffer.hpp"
#include "oatpp/core/concurrency/SpinLock.hpp" #include "oatpp/core/concurrency/SpinLock.hpp"
@ -46,7 +48,7 @@ public:
/** /**
* this one used for testing purposes only * this one used for testing purposes only
*/ */
os::io::Library::v_size m_maxAvailableToRead; data::v_io_size m_maxAvailableToRead;
public: public:
Reader(Pipe* pipe, bool nonBlocking = false) Reader(Pipe* pipe, bool nonBlocking = false)
@ -63,9 +65,9 @@ public:
* this one used for testing purposes only * this one used for testing purposes only
* set to -1 in order to ignore this value * set to -1 in order to ignore this value
*/ */
void setMaxAvailableToRead(os::io::Library::v_size maxAvailableToRead); void setMaxAvailableToRead(data::v_io_size maxAvailableToRead);
os::io::Library::v_size read(void *data, os::io::Library::v_size count) override; data::v_io_size read(void *data, data::v_io_size count) override;
}; };
@ -77,7 +79,7 @@ public:
/** /**
* this one used for testing purposes only * this one used for testing purposes only
*/ */
os::io::Library::v_size m_maxAvailableToWrtie; data::v_io_size m_maxAvailableToWrtie;
public: public:
Writer(Pipe* pipe, bool nonBlocking = false) Writer(Pipe* pipe, bool nonBlocking = false)
@ -94,9 +96,9 @@ public:
* this one used for testing purposes only * this one used for testing purposes only
* set to -1 in order to ignore this value * set to -1 in order to ignore this value
*/ */
void setMaxAvailableToWrite(os::io::Library::v_size maxAvailableToWrite); void setMaxAvailableToWrite(data::v_io_size maxAvailableToWrite);
os::io::Library::v_size write(const void *data, os::io::Library::v_size count) override; data::v_io_size write(const void *data, data::v_io_size count) override;
}; };
@ -104,7 +106,10 @@ private:
bool m_open; bool m_open;
Writer m_writer; Writer m_writer;
Reader m_reader; Reader m_reader;
oatpp::data::buffer::FIFOBuffer m_buffer;
oatpp::data::buffer::IOBuffer m_buffer;
oatpp::data::buffer::SynchronizedFIFOBuffer m_fifo;
std::mutex m_mutex; std::mutex m_mutex;
std::condition_variable m_conditionRead; std::condition_variable m_conditionRead;
std::condition_variable m_conditionWrite; std::condition_variable m_conditionWrite;
@ -114,6 +119,8 @@ public:
: m_open(true) : m_open(true)
, m_writer(this) , m_writer(this)
, m_reader(this) , m_reader(this)
, m_buffer()
, m_fifo(m_buffer.getData(), m_buffer.getSize())
{} {}
static std::shared_ptr<Pipe> createShared(){ static std::shared_ptr<Pipe> createShared(){

View File

@ -26,16 +26,16 @@
namespace oatpp { namespace network { namespace virtual_ { namespace oatpp { namespace network { namespace virtual_ {
void Socket::setMaxAvailableToReadWrtie(os::io::Library::v_size maxToRead, os::io::Library::v_size maxToWrite) { void Socket::setMaxAvailableToReadWrtie(data::v_io_size maxToRead, data::v_io_size maxToWrite) {
m_pipeIn->getReader()->setMaxAvailableToRead(maxToRead); m_pipeIn->getReader()->setMaxAvailableToRead(maxToRead);
m_pipeOut->getWriter()->setMaxAvailableToWrite(maxToWrite); m_pipeOut->getWriter()->setMaxAvailableToWrite(maxToWrite);
} }
os::io::Library::v_size Socket::read(void *data, os::io::Library::v_size count) { data::v_io_size Socket::read(void *data, data::v_io_size count) {
return m_pipeIn->getReader()->read(data, count); return m_pipeIn->getReader()->read(data, count);
} }
os::io::Library::v_size Socket::write(const void *data, os::io::Library::v_size count) { data::v_io_size Socket::write(const void *data, data::v_io_size count) {
return m_pipeOut->getWriter()->write(data, count); return m_pipeOut->getWriter()->write(data, count);
} }

View File

@ -52,10 +52,10 @@ public:
* this one used for testing purposes only * this one used for testing purposes only
* set to -1 in order to ignore this value * set to -1 in order to ignore this value
*/ */
void setMaxAvailableToReadWrtie(os::io::Library::v_size maxToRead, os::io::Library::v_size maxToWrite); void setMaxAvailableToReadWrtie(data::v_io_size maxToRead, data::v_io_size maxToWrite);
os::io::Library::v_size read(void *data, os::io::Library::v_size count) override; data::v_io_size read(void *data, data::v_io_size count) override;
os::io::Library::v_size write(const void *data, os::io::Library::v_size count) override; data::v_io_size write(const void *data, data::v_io_size count) override;
void setNonBlocking(bool nonBlocking); void setNonBlocking(bool nonBlocking);

View File

@ -39,14 +39,14 @@ oatpp::async::Action ConnectionProvider::getConnectionAsync(oatpp::async::Abstra
class ConnectCoroutine : public oatpp::async::CoroutineWithResult<ConnectCoroutine, std::shared_ptr<oatpp::data::stream::IOStream>> { class ConnectCoroutine : public oatpp::async::CoroutineWithResult<ConnectCoroutine, std::shared_ptr<oatpp::data::stream::IOStream>> {
private: private:
std::shared_ptr<virtual_::Interface> m_interface; std::shared_ptr<virtual_::Interface> m_interface;
os::io::Library::v_size m_maxAvailableToRead; data::v_io_size m_maxAvailableToRead;
os::io::Library::v_size m_maxAvailableToWrite; data::v_io_size m_maxAvailableToWrite;
std::shared_ptr<virtual_::Interface::ConnectionSubmission> m_submission; std::shared_ptr<virtual_::Interface::ConnectionSubmission> m_submission;
public: public:
ConnectCoroutine(const std::shared_ptr<virtual_::Interface>& interface, ConnectCoroutine(const std::shared_ptr<virtual_::Interface>& interface,
os::io::Library::v_size maxAvailableToRead, data::v_io_size maxAvailableToRead,
os::io::Library::v_size maxAvailableToWrite) data::v_io_size maxAvailableToWrite)
: m_interface(interface) : m_interface(interface)
, m_maxAvailableToRead(maxAvailableToRead) , m_maxAvailableToRead(maxAvailableToRead)
, m_maxAvailableToWrite(maxAvailableToWrite) , m_maxAvailableToWrite(maxAvailableToWrite)

View File

@ -33,8 +33,8 @@ namespace oatpp { namespace network { namespace virtual_ { namespace client {
class ConnectionProvider : public oatpp::network::ClientConnectionProvider { class ConnectionProvider : public oatpp::network::ClientConnectionProvider {
private: private:
std::shared_ptr<virtual_::Interface> m_interface; std::shared_ptr<virtual_::Interface> m_interface;
os::io::Library::v_size m_maxAvailableToRead; data::v_io_size m_maxAvailableToRead;
os::io::Library::v_size m_maxAvailableToWrite; data::v_io_size m_maxAvailableToWrite;
public: public:
ConnectionProvider(const std::shared_ptr<virtual_::Interface>& interface) ConnectionProvider(const std::shared_ptr<virtual_::Interface>& interface)
@ -54,7 +54,7 @@ public:
* this one used for testing purposes only * this one used for testing purposes only
* set to -1 in order to ignore this value * set to -1 in order to ignore this value
*/ */
void setSocketMaxAvailableToReadWrtie(os::io::Library::v_size maxToRead, os::io::Library::v_size maxToWrite) { void setSocketMaxAvailableToReadWrtie(data::v_io_size maxToRead, data::v_io_size maxToWrite) {
m_maxAvailableToRead = maxToRead; m_maxAvailableToRead = maxToRead;
m_maxAvailableToWrite = maxToWrite; m_maxAvailableToWrite = maxToWrite;
} }

View File

@ -28,7 +28,7 @@ namespace oatpp { namespace network { namespace virtual_ { namespace server {
std::shared_ptr<ConnectionProvider::IOStream> ConnectionProvider::getConnection() { std::shared_ptr<ConnectionProvider::IOStream> ConnectionProvider::getConnection() {
auto socket = m_interface->accept(); auto socket = m_interface->accept();
socket->setNonBlocking(false); socket->setNonBlocking(m_nonBlocking);
socket->setMaxAvailableToReadWrtie(m_maxAvailableToRead, m_maxAvailableToWrite); socket->setMaxAvailableToReadWrtie(m_maxAvailableToRead, m_maxAvailableToWrite);
return socket; return socket;
} }

View File

@ -33,12 +33,14 @@ namespace oatpp { namespace network { namespace virtual_ { namespace server {
class ConnectionProvider : public oatpp::network::ServerConnectionProvider { class ConnectionProvider : public oatpp::network::ServerConnectionProvider {
private: private:
std::shared_ptr<virtual_::Interface> m_interface; std::shared_ptr<virtual_::Interface> m_interface;
os::io::Library::v_size m_maxAvailableToRead; bool m_nonBlocking;
os::io::Library::v_size m_maxAvailableToWrite; data::v_io_size m_maxAvailableToRead;
data::v_io_size m_maxAvailableToWrite;
public: public:
ConnectionProvider(const std::shared_ptr<virtual_::Interface>& interface) ConnectionProvider(const std::shared_ptr<virtual_::Interface>& interface, bool nonBlocking = false)
: m_interface(interface) : m_interface(interface)
, m_nonBlocking(nonBlocking)
, m_maxAvailableToRead(-1) , m_maxAvailableToRead(-1)
, m_maxAvailableToWrite(-1) , m_maxAvailableToWrite(-1)
{ {
@ -46,15 +48,15 @@ public:
setProperty(PROPERTY_PORT, "0"); setProperty(PROPERTY_PORT, "0");
} }
static std::shared_ptr<ConnectionProvider> createShared(const std::shared_ptr<virtual_::Interface>& interface) { static std::shared_ptr<ConnectionProvider> createShared(const std::shared_ptr<virtual_::Interface>& interface, bool nonBlocking = false) {
return std::make_shared<ConnectionProvider>(interface); return std::make_shared<ConnectionProvider>(interface, nonBlocking);
} }
/** /**
* this one used for testing purposes only * this one used for testing purposes only
* set to -1 in order to ignore this value * set to -1 in order to ignore this value
*/ */
void setSocketMaxAvailableToReadWrtie(os::io::Library::v_size maxToRead, os::io::Library::v_size maxToWrite) { void setSocketMaxAvailableToReadWrtie(data::v_io_size maxToRead, data::v_io_size maxToWrite) {
m_maxAvailableToRead = maxToRead; m_maxAvailableToRead = maxToRead;
m_maxAvailableToWrite = maxToWrite; m_maxAvailableToWrite = maxToWrite;
} }

View File

@ -123,7 +123,8 @@ HttpRequestExecutor::execute(const String& method,
auto bodyStream = oatpp::data::stream::InputStreamBufferedProxy::createShared(connection, auto bodyStream = oatpp::data::stream::InputStreamBufferedProxy::createShared(connection,
ioBuffer, ioBuffer,
result.bufferPosStart, result.bufferPosStart,
result.bufferPosEnd); result.bufferPosEnd,
result.bufferPosStart != result.bufferPosEnd);
return Response::createShared(result.startingLine.statusCode, return Response::createShared(result.startingLine.statusCode,
result.startingLine.description.toString(), result.startingLine.description.toString(),
@ -154,7 +155,7 @@ oatpp::async::Action HttpRequestExecutor::executeAsync(oatpp::async::AbstractCor
std::shared_ptr<oatpp::data::stream::IOStream> m_connection; std::shared_ptr<oatpp::data::stream::IOStream> m_connection;
std::shared_ptr<oatpp::data::buffer::IOBuffer> m_ioBuffer; std::shared_ptr<oatpp::data::buffer::IOBuffer> m_ioBuffer;
void* m_bufferPointer; void* m_bufferPointer;
os::io::Library::v_size m_bufferBytesLeftToRead; data::v_io_size m_bufferBytesLeftToRead;
public: public:
ExecutorCoroutine(const std::shared_ptr<oatpp::network::ClientConnectionProvider>& connectionProvider, ExecutorCoroutine(const std::shared_ptr<oatpp::network::ClientConnectionProvider>& connectionProvider,
@ -210,7 +211,8 @@ oatpp::async::Action HttpRequestExecutor::executeAsync(oatpp::async::AbstractCor
auto bodyStream = oatpp::data::stream::InputStreamBufferedProxy::createShared(m_connection, auto bodyStream = oatpp::data::stream::InputStreamBufferedProxy::createShared(m_connection,
m_ioBuffer, m_ioBuffer,
result.bufferPosStart, result.bufferPosStart,
result.bufferPosEnd); result.bufferPosEnd,
result.bufferPosStart != result.bufferPosEnd);
return _return(Response::createShared(result.startingLine.statusCode, return _return(Response::createShared(result.startingLine.statusCode,
result.startingLine.description.toString(), result.startingLine.description.toString(),

View File

@ -26,13 +26,13 @@
namespace oatpp { namespace web { namespace protocol { namespace oatpp { namespace web { namespace protocol {
CommunicationError::CommunicationError(oatpp::os::io::Library::v_size ioStatus, const oatpp::String& message) CommunicationError::CommunicationError(oatpp::data::v_io_size ioStatus, const oatpp::String& message)
:std::runtime_error(message->std_str()) :std::runtime_error(message->std_str())
, m_ioStatus(ioStatus) , m_ioStatus(ioStatus)
, m_message(message) , m_message(message)
{} {}
oatpp::os::io::Library::v_size CommunicationError::getIOStatus() { oatpp::data::v_io_size CommunicationError::getIOStatus() {
return m_ioStatus; return m_ioStatus;
} }

View File

@ -25,19 +25,18 @@
#ifndef oatpp_web_protocol_CommunicationError_hpp #ifndef oatpp_web_protocol_CommunicationError_hpp
#define oatpp_web_protocol_CommunicationError_hpp #define oatpp_web_protocol_CommunicationError_hpp
#include "oatpp/core/Types.hpp" #include "oatpp/core/data/IODefinitions.hpp"
#include "oatpp/core/os/io/Library.hpp"
namespace oatpp { namespace web { namespace protocol { namespace oatpp { namespace web { namespace protocol {
class CommunicationError : public std::runtime_error { class CommunicationError : public std::runtime_error {
private: private:
oatpp::os::io::Library::v_size m_ioStatus; oatpp::data::v_io_size m_ioStatus;
oatpp::String m_message; oatpp::String m_message;
public: public:
CommunicationError(oatpp::os::io::Library::v_size ioStatus, const oatpp::String& message); CommunicationError(oatpp::data::v_io_size ioStatus, const oatpp::String& message);
oatpp::os::io::Library::v_size getIOStatus(); oatpp::data::v_io_size getIOStatus();
oatpp::String& getMessage(); oatpp::String& getMessage();
}; };
@ -52,12 +51,12 @@ public:
: ioStatus(0) : ioStatus(0)
{} {}
Info(oatpp::os::io::Library::v_size pIOStatus, const Status& pStatus) Info(oatpp::data::v_io_size pIOStatus, const Status& pStatus)
: ioStatus(pIOStatus) : ioStatus(pIOStatus)
, status(pStatus) , status(pStatus)
{} {}
oatpp::os::io::Library::v_size ioStatus; oatpp::data::v_io_size ioStatus;
Status status; Status status;
}; };

View File

@ -159,8 +159,8 @@ Range Range::parse(oatpp::parser::ParsingCaret& caret) {
caret.findRN(); caret.findRN();
endLabel.end(); endLabel.end();
oatpp::os::io::Library::v_size start = oatpp::utils::conversion::strToInt64((const char*) startLabel.getData()); oatpp::data::v_io_size start = oatpp::utils::conversion::strToInt64((const char*) startLabel.getData());
oatpp::os::io::Library::v_size end = oatpp::utils::conversion::strToInt64((const char*) endLabel.getData()); oatpp::data::v_io_size end = oatpp::utils::conversion::strToInt64((const char*) endLabel.getData());
return Range(unitsLabel.toString(true), start, end); return Range(unitsLabel.toString(true), start, end);
} }
@ -220,9 +220,9 @@ ContentRange ContentRange::parse(oatpp::parser::ParsingCaret& caret) {
caret.findRN(); caret.findRN();
sizeLabel.end(); sizeLabel.end();
oatpp::os::io::Library::v_size start = oatpp::utils::conversion::strToInt64((const char*) startLabel.getData()); oatpp::data::v_io_size start = oatpp::utils::conversion::strToInt64((const char*) startLabel.getData());
oatpp::os::io::Library::v_size end = oatpp::utils::conversion::strToInt64((const char*) endLabel.getData()); oatpp::data::v_io_size end = oatpp::utils::conversion::strToInt64((const char*) endLabel.getData());
oatpp::os::io::Library::v_size size = 0; oatpp::data::v_io_size size = 0;
bool isSizeKnown = false; bool isSizeKnown = false;
if(sizeLabel.getData()[0] != '*') { if(sizeLabel.getData()[0] != '*') {
isSizeKnown = true; isSizeKnown = true;

View File

@ -188,16 +188,16 @@ private:
public: public:
Range(const oatpp::String& pUnits, Range(const oatpp::String& pUnits,
const oatpp::os::io::Library::v_size& pStart, const oatpp::data::v_io_size& pStart,
const oatpp::os::io::Library::v_size& pEnd) const oatpp::data::v_io_size& pEnd)
: units(pUnits) : units(pUnits)
, start(pStart) , start(pStart)
, end(pEnd) , end(pEnd)
{} {}
oatpp::String units; oatpp::String units;
oatpp::os::io::Library::v_size start; oatpp::data::v_io_size start;
oatpp::os::io::Library::v_size end; oatpp::data::v_io_size end;
oatpp::String toString() const; oatpp::String toString() const;
@ -220,9 +220,9 @@ private:
public: public:
ContentRange(const oatpp::String& pUnits, ContentRange(const oatpp::String& pUnits,
const oatpp::os::io::Library::v_size& pStart, const oatpp::data::v_io_size& pStart,
const oatpp::os::io::Library::v_size& pEnd, const oatpp::data::v_io_size& pEnd,
const oatpp::os::io::Library::v_size& pSize, const oatpp::data::v_io_size& pSize,
bool pIsSizeKnown) bool pIsSizeKnown)
: units(pUnits) : units(pUnits)
, start(pStart) , start(pStart)
@ -232,9 +232,9 @@ public:
{} {}
oatpp::String units; oatpp::String units;
oatpp::os::io::Library::v_size start; oatpp::data::v_io_size start;
oatpp::os::io::Library::v_size end; oatpp::data::v_io_size end;
oatpp::os::io::Library::v_size size; oatpp::data::v_io_size size;
bool isSizeKnown; bool isSizeKnown;
oatpp::String toString() const; oatpp::String toString() const;

View File

@ -28,14 +28,14 @@
namespace oatpp { namespace web { namespace protocol { namespace http { namespace incoming { namespace oatpp { namespace web { namespace protocol { namespace http { namespace incoming {
os::io::Library::v_size RequestHeadersReader::readHeadersSection(const std::shared_ptr<oatpp::data::stream::IOStream>& connection, data::v_io_size RequestHeadersReader::readHeadersSection(const std::shared_ptr<oatpp::data::stream::IOStream>& connection,
oatpp::data::stream::OutputStream* bufferStream, oatpp::data::stream::OutputStream* bufferStream,
Result& result) { Result& result) {
v_word32 sectionEnd = ('\r' << 24) | ('\n' << 16) | ('\r' << 8) | ('\n'); v_word32 sectionEnd = ('\r' << 24) | ('\n' << 16) | ('\r' << 8) | ('\n');
v_word32 accumulator = 0; v_word32 accumulator = 0;
v_int32 progress = 0; v_int32 progress = 0;
os::io::Library::v_size res; data::v_io_size res;
while (true) { while (true) {
v_int32 desiredToRead = m_bufferSize; v_int32 desiredToRead = m_bufferSize;
@ -60,7 +60,7 @@ os::io::Library::v_size RequestHeadersReader::readHeadersSection(const std::shar
} }
} }
} else if(res == oatpp::data::stream::Errors::ERROR_IO_WAIT_RETRY || res == oatpp::data::stream::Errors::ERROR_IO_RETRY) { } else if(res == data::IOError::WAIT_RETRY || res == data::IOError::RETRY) {
continue; continue;
} else { } else {
break; break;
@ -148,7 +148,7 @@ RequestHeadersReader::Action RequestHeadersReader::readHeadersAsync(oatpp::async
return waitRetry(); return waitRetry();
} else if(res == oatpp::data::stream::Errors::ERROR_IO_WAIT_RETRY || res == oatpp::data::stream::Errors::ERROR_IO_RETRY) { } else if(res == data::IOError::WAIT_RETRY || res == data::IOError::RETRY) {
return waitRetry(); return waitRetry();
} else { } else {
return abort(); return abort();

View File

@ -47,7 +47,7 @@ public:
public: public:
typedef Action (oatpp::async::AbstractCoroutine::*AsyncCallback)(const Result&); typedef Action (oatpp::async::AbstractCoroutine::*AsyncCallback)(const Result&);
private: private:
os::io::Library::v_size readHeadersSection(const std::shared_ptr<oatpp::data::stream::IOStream>& connection, data::v_io_size readHeadersSection(const std::shared_ptr<oatpp::data::stream::IOStream>& connection,
oatpp::data::stream::OutputStream* bufferStream, oatpp::data::stream::OutputStream* bufferStream,
Result& result); Result& result);
private: private:

View File

@ -28,14 +28,14 @@
namespace oatpp { namespace web { namespace protocol { namespace http { namespace incoming { namespace oatpp { namespace web { namespace protocol { namespace http { namespace incoming {
os::io::Library::v_size ResponseHeadersReader::readHeadersSection(const std::shared_ptr<oatpp::data::stream::IOStream>& connection, data::v_io_size ResponseHeadersReader::readHeadersSection(const std::shared_ptr<oatpp::data::stream::IOStream>& connection,
oatpp::data::stream::OutputStream* bufferStream, oatpp::data::stream::OutputStream* bufferStream,
Result& result) { Result& result) {
v_word32 sectionEnd = ('\r' << 24) | ('\n' << 16) | ('\r' << 8) | ('\n'); v_word32 sectionEnd = ('\r' << 24) | ('\n' << 16) | ('\r' << 8) | ('\n');
v_word32 accumulator = 0; v_word32 accumulator = 0;
v_int32 progress = 0; v_int32 progress = 0;
os::io::Library::v_size res; data::v_io_size res;
while (true) { while (true) {
v_int32 desiredToRead = m_bufferSize; v_int32 desiredToRead = m_bufferSize;
@ -60,7 +60,7 @@ os::io::Library::v_size ResponseHeadersReader::readHeadersSection(const std::sha
} }
} }
} else if(res == oatpp::data::stream::Errors::ERROR_IO_WAIT_RETRY || res == oatpp::data::stream::Errors::ERROR_IO_RETRY) { } else if(res == data::IOError::WAIT_RETRY || res == data::IOError::RETRY) {
continue; continue;
} else { } else {
break; break;
@ -148,7 +148,7 @@ ResponseHeadersReader::Action ResponseHeadersReader::readHeadersAsync(oatpp::asy
return waitRetry(); return waitRetry();
} else if(res == oatpp::data::stream::Errors::ERROR_IO_WAIT_RETRY || res == oatpp::data::stream::Errors::ERROR_IO_RETRY) { } else if(res == data::IOError::WAIT_RETRY || res == data::IOError::RETRY) {
return waitRetry(); return waitRetry();
} else { } else {
return abort(); return abort();

View File

@ -47,7 +47,7 @@ public:
public: public:
typedef Action (oatpp::async::AbstractCoroutine::*AsyncCallback)(const Result&); typedef Action (oatpp::async::AbstractCoroutine::*AsyncCallback)(const Result&);
private: private:
os::io::Library::v_size readHeadersSection(const std::shared_ptr<oatpp::data::stream::IOStream>& connection, data::v_io_size readHeadersSection(const std::shared_ptr<oatpp::data::stream::IOStream>& connection,
oatpp::data::stream::OutputStream* bufferStream, oatpp::data::stream::OutputStream* bufferStream,
Result& result); Result& result);
private: private:

View File

@ -29,12 +29,12 @@
namespace oatpp { namespace web { namespace protocol { namespace http { namespace incoming { namespace oatpp { namespace web { namespace protocol { namespace http { namespace incoming {
os::io::Library::v_size SimpleBodyDecoder::readLine(const std::shared_ptr<oatpp::data::stream::InputStream>& fromStream, data::v_io_size SimpleBodyDecoder::readLine(const std::shared_ptr<oatpp::data::stream::InputStream>& fromStream,
p_char8 buffer, p_char8 buffer,
os::io::Library::v_size maxLineSize) { data::v_io_size maxLineSize) {
v_char8 a; v_char8 a;
os::io::Library::v_size count = 0; data::v_io_size count = 0;
while (fromStream->read(&a, 1) > 0) { while (fromStream->read(&a, 1) > 0) {
if(a != '\r') { if(a != '\r') {
if(count + 1 > maxLineSize) { if(count + 1 > maxLineSize) {
@ -62,7 +62,7 @@ void SimpleBodyDecoder::doChunkedDecoding(const std::shared_ptr<oatpp::data::str
v_int32 maxLineSize = 8; // 0xFFFFFFFF 4Gb for chunk v_int32 maxLineSize = 8; // 0xFFFFFFFF 4Gb for chunk
v_char8 lineBuffer[maxLineSize + 1]; v_char8 lineBuffer[maxLineSize + 1];
os::io::Library::v_size countToRead; data::v_io_size countToRead;
do { do {
@ -91,7 +91,7 @@ void SimpleBodyDecoder::decode(const Protocol::Headers& headers,
if(transferEncodingIt != headers.end() && transferEncodingIt->second == Header::Value::TRANSFER_ENCODING_CHUNKED) { if(transferEncodingIt != headers.end() && transferEncodingIt->second == Header::Value::TRANSFER_ENCODING_CHUNKED) {
doChunkedDecoding(bodyStream, toStream); doChunkedDecoding(bodyStream, toStream);
} else { } else {
oatpp::os::io::Library::v_size contentLength = 0; oatpp::data::v_io_size contentLength = 0;
auto contentLengthStrIt = headers.find(Header::CONTENT_LENGTH); auto contentLengthStrIt = headers.find(Header::CONTENT_LENGTH);
if(contentLengthStrIt == headers.end()) { if(contentLengthStrIt == headers.end()) {
return; // DO NOTHING // it is an empty or invalid body return; // DO NOTHING // it is an empty or invalid body
@ -127,7 +127,7 @@ oatpp::async::Action SimpleBodyDecoder::doChunkedDecodingAsync(oatpp::async::Abs
bool m_lineEnding; bool m_lineEnding;
v_char8 m_lineBuffer [16]; // used max 8 v_char8 m_lineBuffer [16]; // used max 8
void* m_skipData; void* m_skipData;
os::io::Library::v_size m_skipSize; data::v_io_size m_skipSize;
bool m_done = false; bool m_done = false;
private: private:
void prepareSkipRN() { void prepareSkipRN() {
@ -152,9 +152,9 @@ oatpp::async::Action SimpleBodyDecoder::doChunkedDecodingAsync(oatpp::async::Abs
Action readLineChar() { Action readLineChar() {
auto res = m_fromStream->read(&m_lineChar, 1); auto res = m_fromStream->read(&m_lineChar, 1);
if(res == oatpp::data::stream::Errors::ERROR_IO_WAIT_RETRY) { if(res == data::IOError::WAIT_RETRY) {
return oatpp::async::Action::_WAIT_RETRY; return oatpp::async::Action::_WAIT_RETRY;
} else if(res == oatpp::data::stream::Errors::ERROR_IO_RETRY) { } else if(res == data::IOError::RETRY) {
return oatpp::async::Action::_REPEAT; return oatpp::async::Action::_REPEAT;
} else if( res < 0) { } else if( res < 0) {
return error("[BodyDecoder::ChunkedDecoder] Can't read line char"); return error("[BodyDecoder::ChunkedDecoder] Can't read line char");
@ -187,7 +187,7 @@ oatpp::async::Action SimpleBodyDecoder::doChunkedDecodingAsync(oatpp::async::Abs
} }
Action onLineRead() { Action onLineRead() {
os::io::Library::v_size countToRead = std::strtol((const char*) m_lineBuffer, nullptr, 16); data::v_io_size countToRead = std::strtol((const char*) m_lineBuffer, nullptr, 16);
if(countToRead > 0) { if(countToRead > 0) {
prepareSkipRN(); prepareSkipRN();
return oatpp::data::stream::transferAsync(this, yieldTo(&ChunkedDecoder::skipRN), m_fromStream, m_toStream, countToRead, m_buffer); return oatpp::data::stream::transferAsync(this, yieldTo(&ChunkedDecoder::skipRN), m_fromStream, m_toStream, countToRead, m_buffer);
@ -226,7 +226,7 @@ oatpp::async::Action SimpleBodyDecoder::decodeAsync(oatpp::async::AbstractCorout
if(transferEncodingIt != headers.end() && transferEncodingIt->second == Header::Value::TRANSFER_ENCODING_CHUNKED) { if(transferEncodingIt != headers.end() && transferEncodingIt->second == Header::Value::TRANSFER_ENCODING_CHUNKED) {
return doChunkedDecodingAsync(parentCoroutine, actionOnReturn, bodyStream, toStream); return doChunkedDecodingAsync(parentCoroutine, actionOnReturn, bodyStream, toStream);
} else { } else {
oatpp::os::io::Library::v_size contentLength = 0; oatpp::data::v_io_size contentLength = 0;
auto contentLengthStrIt = headers.find(Header::CONTENT_LENGTH); auto contentLengthStrIt = headers.find(Header::CONTENT_LENGTH);
if(contentLengthStrIt == headers.end()) { if(contentLengthStrIt == headers.end()) {
return actionOnReturn; // DO NOTHING // it is an empty or invalid body return actionOnReturn; // DO NOTHING // it is an empty or invalid body

View File

@ -31,9 +31,9 @@ namespace oatpp { namespace web { namespace protocol { namespace http { namespac
class SimpleBodyDecoder : public BodyDecoder { class SimpleBodyDecoder : public BodyDecoder {
private: private:
static os::io::Library::v_size readLine(const std::shared_ptr<oatpp::data::stream::InputStream>& fromStream, static data::v_io_size readLine(const std::shared_ptr<oatpp::data::stream::InputStream>& fromStream,
p_char8 buffer, p_char8 buffer,
os::io::Library::v_size maxLineSize); data::v_io_size maxLineSize);
static void doChunkedDecoding(const std::shared_ptr<oatpp::data::stream::InputStream>& from, static void doChunkedDecoding(const std::shared_ptr<oatpp::data::stream::InputStream>& from,
const std::shared_ptr<oatpp::data::stream::OutputStream>& toStream); const std::shared_ptr<oatpp::data::stream::OutputStream>& toStream);

View File

@ -62,7 +62,7 @@ public:
std::shared_ptr<BufferBody> m_body; std::shared_ptr<BufferBody> m_body;
std::shared_ptr<OutputStream> m_stream; std::shared_ptr<OutputStream> m_stream;
const void* m_currData; const void* m_currData;
oatpp::os::io::Library::v_size m_currDataSize; oatpp::data::v_io_size m_currDataSize;
public: public:
WriteToStreamCoroutine(const std::shared_ptr<BufferBody>& body, WriteToStreamCoroutine(const std::shared_ptr<BufferBody>& body,

View File

@ -92,7 +92,7 @@ public:
std::shared_ptr<oatpp::data::stream::ChunkedBuffer::Chunks> m_chunks; std::shared_ptr<oatpp::data::stream::ChunkedBuffer::Chunks> m_chunks;
oatpp::data::stream::ChunkedBuffer::Chunks::LinkedListNode* m_currChunk; oatpp::data::stream::ChunkedBuffer::Chunks::LinkedListNode* m_currChunk;
const void* m_currData; const void* m_currData;
oatpp::os::io::Library::v_size m_currDataSize; oatpp::data::v_io_size m_currDataSize;
Action m_nextAction; Action m_nextAction;
v_char8 m_buffer[16]; v_char8 m_buffer[16];
public: public:

View File

@ -46,7 +46,7 @@ void HttpConnectionHandler::Task::run(){
response = HttpProcessor::processRequest(m_router, m_connection, m_bodyDecoder, m_errorHandler, m_requestInterceptors, buffer, bufferSize, inStream, connectionState); response = HttpProcessor::processRequest(m_router, m_connection, m_bodyDecoder, m_errorHandler, m_requestInterceptors, buffer, bufferSize, inStream, connectionState);
if(response) { if(response) {
outStream->setBufferPosition(0, 0); outStream->setBufferPosition(0, 0, false);
response->send(outStream); response->send(outStream);
outStream->flush(); outStream->flush();
} else { } else {

View File

@ -59,7 +59,9 @@ HttpProcessor::processRequest(HttpRouter* router,
} }
auto& bodyStream = inStream; auto& bodyStream = inStream;
bodyStream->setBufferPosition(headersReadResult.bufferPosStart, headersReadResult.bufferPosEnd); bodyStream->setBufferPosition(headersReadResult.bufferPosStart,
headersReadResult.bufferPosEnd,
headersReadResult.bufferPosStart != headersReadResult.bufferPosEnd);
auto request = protocol::http::incoming::Request::createShared(headersReadResult.startingLine, auto request = protocol::http::incoming::Request::createShared(headersReadResult.startingLine,
route.matchMap, route.matchMap,
@ -107,7 +109,9 @@ oatpp::async::Action HttpProcessor::Coroutine::onHeadersParsed(const RequestHead
} }
auto& bodyStream = m_inStream; auto& bodyStream = m_inStream;
bodyStream->setBufferPosition(headersReadResult.bufferPosStart, headersReadResult.bufferPosEnd); bodyStream->setBufferPosition(headersReadResult.bufferPosStart,
headersReadResult.bufferPosEnd,
headersReadResult.bufferPosStart != headersReadResult.bufferPosEnd);
m_currentRequest = protocol::http::incoming::Request::createShared(headersReadResult.startingLine, m_currentRequest = protocol::http::incoming::Request::createShared(headersReadResult.startingLine,
m_currentRoute.matchMap, m_currentRoute.matchMap,
@ -149,7 +153,7 @@ HttpProcessor::Coroutine::Action HttpProcessor::Coroutine::onResponseFormed() {
m_currentResponse->putHeaderIfNotExists(protocol::http::Header::SERVER, protocol::http::Header::Value::SERVER); m_currentResponse->putHeaderIfNotExists(protocol::http::Header::SERVER, protocol::http::Header::Value::SERVER);
m_connectionState = oatpp::web::protocol::http::outgoing::CommunicationUtils::considerConnectionState(m_currentRequest, m_currentResponse); m_connectionState = oatpp::web::protocol::http::outgoing::CommunicationUtils::considerConnectionState(m_currentRequest, m_currentResponse);
m_outStream->setBufferPosition(0, 0); m_outStream->setBufferPosition(0, 0, false);
return m_currentResponse->sendAsync(this, return m_currentResponse->sendAsync(this,
m_outStream->flushAsync( m_outStream->flushAsync(
this, this,

View File

@ -40,14 +40,14 @@ namespace {
typedef oatpp::network::virtual_::Pipe Pipe; typedef oatpp::network::virtual_::Pipe Pipe;
const char* DATA_CHUNK = "<0123456789/abcdefghijklmnopqrstuvwxyz/ABCDEFGHIJKLMNOPQRSTUVWXYZ>"; const char* DATA_CHUNK = "<0123456789/abcdefghijklmnopqrstuvwxyz/ABCDEFGHIJKLMNOPQRSTUVWXYZ>";
const os::io::Library::v_size CHUNK_SIZE = std::strlen(DATA_CHUNK); const data::v_io_size CHUNK_SIZE = std::strlen(DATA_CHUNK);
class WriterTask : public oatpp::concurrency::Runnable { class WriterTask : public oatpp::concurrency::Runnable {
private: private:
std::shared_ptr<Pipe> m_pipe; std::shared_ptr<Pipe> m_pipe;
v_int32 m_chunksToTransfer; v_int32 m_chunksToTransfer;
os::io::Library::v_size m_position = 0; data::v_io_size m_position = 0;
os::io::Library::v_size m_transferedBytes = 0; data::v_io_size m_transferedBytes = 0;
public: public:
WriterTask(const std::shared_ptr<Pipe>& pipe, v_int32 chunksToTransfer) WriterTask(const std::shared_ptr<Pipe>& pipe, v_int32 chunksToTransfer)

View File

@ -46,7 +46,7 @@ bool FullAsyncTest::onRun() {
auto interface = oatpp::network::virtual_::Interface::createShared("virtualhost"); auto interface = oatpp::network::virtual_::Interface::createShared("virtualhost");
auto serverConnectionProvider = oatpp::network::virtual_::server::ConnectionProvider::createShared(interface); auto serverConnectionProvider = oatpp::network::virtual_::server::ConnectionProvider::createShared(interface, true);
auto clientConnectionProvider = oatpp::network::virtual_::client::ConnectionProvider::createShared(interface); auto clientConnectionProvider = oatpp::network::virtual_::client::ConnectionProvider::createShared(interface);
serverConnectionProvider->setSocketMaxAvailableToReadWrtie(1, 1); serverConnectionProvider->setSocketMaxAvailableToReadWrtie(1, 1);
@ -70,20 +70,20 @@ bool FullAsyncTest::onRun() {
for(v_int32 i = 0; i < 10; i ++) { for(v_int32 i = 0; i < 10; i ++) {
{ /* test simple GET */ { // test simple GET
auto response = client->getRoot(); auto response = client->getRoot();
auto value = response->readBodyToString(); auto value = response->readBodyToString();
OATPP_ASSERT(value == "Hello World Async!!!"); OATPP_ASSERT(value == "Hello World Async!!!");
} }
{ /* test GET with path parameter */ { // test GET with path parameter
auto response = client->getWithParams("my_test_param-Async"); auto response = client->getWithParams("my_test_param-Async");
auto dto = response->readBodyToDto<app::TestDto>(objectMapper); auto dto = response->readBodyToDto<app::TestDto>(objectMapper);
OATPP_ASSERT(dto); OATPP_ASSERT(dto);
OATPP_ASSERT(dto->testValue == "my_test_param-Async"); OATPP_ASSERT(dto->testValue == "my_test_param-Async");
} }
{ /* test GET with header parameter */ { // test GET with header parameter
auto response = client->getWithHeaders("my_test_header-Async"); auto response = client->getWithHeaders("my_test_header-Async");
//auto str = response->readBodyToString(); //auto str = response->readBodyToString();
//OATPP_LOGE("AAA", "code=%d, str='%s'", response->statusCode, str->c_str()); //OATPP_LOGE("AAA", "code=%d, str='%s'", response->statusCode, str->c_str());
@ -92,12 +92,24 @@ bool FullAsyncTest::onRun() {
OATPP_ASSERT(dto->testValue == "my_test_header-Async"); OATPP_ASSERT(dto->testValue == "my_test_header-Async");
} }
{ /* test POST with body */ { // test POST with body
auto response = client->postBody("my_test_body-Async"); auto response = client->postBody("my_test_body-Async");
auto dto = response->readBodyToDto<app::TestDto>(objectMapper); auto dto = response->readBodyToDto<app::TestDto>(objectMapper);
OATPP_ASSERT(dto); OATPP_ASSERT(dto);
OATPP_ASSERT(dto->testValue == "my_test_body-Async"); OATPP_ASSERT(dto->testValue == "my_test_body-Async");
} }
{ // test Big Echo with body
oatpp::data::stream::ChunkedBuffer stream;
for(v_int32 i = 0; i < oatpp::data::buffer::IOBuffer::BUFFER_SIZE; i++) {
stream.write("0123456789", 10);
}
auto data = stream.toString();
auto response = client->echoBody(data);
auto returnedData = response->readBodyToString();
OATPP_ASSERT(returnedData);
OATPP_ASSERT(returnedData == data);
}
} }

View File

@ -71,33 +71,45 @@ bool FullTest::onRun() {
std::thread clientThread([client, server, objectMapper]{ std::thread clientThread([client, server, objectMapper]{
for(v_int32 i = 0; i < 10; i ++) { for(v_int32 i = 0; i < 10; i ++) {
{ /* test simple GET */ { // test simple GET
auto response = client->getRoot(); auto response = client->getRoot();
auto value = response->readBodyToString(); auto value = response->readBodyToString();
OATPP_ASSERT(value == "Hello World!!!"); OATPP_ASSERT(value == "Hello World!!!");
} }
{ /* test GET with path parameter */ { // test GET with path parameter
auto response = client->getWithParams("my_test_param"); auto response = client->getWithParams("my_test_param");
auto dto = response->readBodyToDto<app::TestDto>(objectMapper); auto dto = response->readBodyToDto<app::TestDto>(objectMapper);
OATPP_ASSERT(dto); OATPP_ASSERT(dto);
OATPP_ASSERT(dto->testValue == "my_test_param"); OATPP_ASSERT(dto->testValue == "my_test_param");
} }
{ /* test GET with header parameter */ { // test GET with header parameter
auto response = client->getWithHeaders("my_test_header"); auto response = client->getWithHeaders("my_test_header");
auto dto = response->readBodyToDto<app::TestDto>(objectMapper); auto dto = response->readBodyToDto<app::TestDto>(objectMapper);
OATPP_ASSERT(dto); OATPP_ASSERT(dto);
OATPP_ASSERT(dto->testValue == "my_test_header"); OATPP_ASSERT(dto->testValue == "my_test_header");
} }
{ /* test POST with body */ { // test POST with body
auto response = client->postBody("my_test_body"); auto response = client->postBody("my_test_body");
auto dto = response->readBodyToDto<app::TestDto>(objectMapper); auto dto = response->readBodyToDto<app::TestDto>(objectMapper);
OATPP_ASSERT(dto); OATPP_ASSERT(dto);
OATPP_ASSERT(dto->testValue == "my_test_body"); OATPP_ASSERT(dto->testValue == "my_test_body");
} }
{ // test Big Echo with body
oatpp::data::stream::ChunkedBuffer stream;
for(v_int32 i = 0; i < oatpp::data::buffer::IOBuffer::BUFFER_SIZE; i++) {
stream.write("0123456789", 10);
}
auto data = stream.toString();
auto response = client->echoBody(data);
auto returnedData = response->readBodyToString();
OATPP_ASSERT(returnedData);
OATPP_ASSERT(returnedData == data);
}
} }

View File

@ -39,6 +39,7 @@ class Client : public oatpp::web::client::ApiClient {
API_CALL("GET", "params/{param}", getWithParams, PATH(String, param)) API_CALL("GET", "params/{param}", getWithParams, PATH(String, param))
API_CALL("GET", "headers", getWithHeaders, HEADER(String, param, "X-TEST-HEADER")) API_CALL("GET", "headers", getWithHeaders, HEADER(String, param, "X-TEST-HEADER"))
API_CALL("POST", "body", postBody, BODY_STRING(String, body)) API_CALL("POST", "body", postBody, BODY_STRING(String, body))
API_CALL("POST", "echo", echoBody, BODY_STRING(String, body))
#include OATPP_CODEGEN_END(ApiClient) #include OATPP_CODEGEN_END(ApiClient)
}; };

View File

@ -77,6 +77,12 @@ public:
return createDtoResponse(Status::CODE_200, dto); return createDtoResponse(Status::CODE_200, dto);
} }
ENDPOINT("POST", "echo", echo,
BODY_STRING(String, body)) {
OATPP_LOGD(TAG, "POST body(echo) size=%d", body->getSize());
return createResponse(Status::CODE_200, body);
}
#include OATPP_CODEGEN_END(ApiController) #include OATPP_CODEGEN_END(ApiController)
}; };

View File

@ -39,7 +39,7 @@ private:
static constexpr const char* TAG = "test::web::app::ControllerAsync"; static constexpr const char* TAG = "test::web::app::ControllerAsync";
public: public:
ControllerAsync(const std::shared_ptr<ObjectMapper>& objectMapper) ControllerAsync(const std::shared_ptr<ObjectMapper>& objectMapper)
: oatpp::web::server::api::ApiController(objectMapper) : oatpp::web::server::api::ApiController(objectMapper)
{} {}
public: public:
@ -105,6 +105,22 @@ public:
} }
}; };
ENDPOINT_ASYNC("POST", "echo", Echo) {
ENDPOINT_ASYNC_INIT(Echo)
Action act() {
OATPP_LOGD(TAG, "POST body(echo). Reading body...");
return request->readBodyToStringAsync(this, &Echo::onBodyRead);
}
Action onBodyRead(const String& body) {
OATPP_LOGD(TAG, "POST echo size=%d", body->getSize());
return _return(controller->createResponse(Status::CODE_200, body));
}
};
#include OATPP_CODEGEN_END(ApiController) #include OATPP_CODEGEN_END(ApiController)