use FIFOBuffer for StreamBufferedProxy(InputStream)

This commit is contained in:
lganzzzo 2019-02-07 08:10:05 +02:00
parent 10d961b99b
commit 66096514f5
6 changed files with 62 additions and 89 deletions

View File

@ -49,63 +49,14 @@ oatpp::async::Action OutputStreamBufferedProxy::flushAsync(oatpp::async::Abstrac
data::v_io_size InputStreamBufferedProxy::read(void *data, data::v_io_size count) {
if (m_pos == 0 && m_posEnd == 0) {
if(count > m_bufferSize){
//if(m_hasError){
// errno = m_errno;
// return -1;
//}
return m_inputStream->read(data, count);
} else {
//if(m_hasError){
// errno = m_errno;
// return -1;
//}
m_posEnd = (v_bufferSize) m_inputStream->read(m_buffer, m_bufferSize);
v_bufferSize result;
if(m_posEnd > count){
result = (v_bufferSize) count;
m_pos = result;
} else {
result = m_posEnd;
m_posEnd = 0;
m_pos = 0;
if(result < 0) {
return result;
}
}
std::memcpy(data, m_buffer, result);
return result;
}
if(m_buffer.availableToRead() > 0) {
return m_buffer.read(data, count);
} else {
v_bufferSize result = m_posEnd - m_pos;
if(count > result){
std::memcpy(data, &m_buffer[m_pos], result);
m_pos = 0;
m_posEnd = 0;
data::v_io_size bigResult = read(&((p_char8) data) [result], count - result);
if(bigResult > 0){
return bigResult + result;
} else if(bigResult < 0) {
return bigResult;
}
return result;
} else {
std::memcpy(data, &m_buffer[m_pos], count);
m_pos += (v_bufferSize) count;
if(m_pos == m_posEnd){
m_pos = 0;
m_posEnd = 0;
}
return count;
auto bytesBuffered = m_buffer.readFromStreamAndWrite(*m_inputStream, m_buffer.getBufferSize());
if(bytesBuffered > 0) {
return m_buffer.read(data, count);
}
return bytesBuffered;
}
}

View File

@ -92,79 +92,77 @@ public:
protected:
std::shared_ptr<InputStream> m_inputStream;
std::shared_ptr<oatpp::data::buffer::IOBuffer> m_bufferPtr;
p_char8 m_buffer;
v_bufferSize m_bufferSize;
v_bufferSize m_pos;
v_bufferSize m_posEnd;
buffer::FIFOBuffer m_buffer;
public:
InputStreamBufferedProxy(const std::shared_ptr<InputStream>& inputStream,
const std::shared_ptr<oatpp::data::buffer::IOBuffer>& bufferPtr,
p_char8 buffer,
v_bufferSize bufferSize,
v_bufferSize positionStart,
v_bufferSize positionEnd)
data::v_io_size bufferReadPosition,
data::v_io_size bufferWritePosition,
bool bufferCanRead)
: m_inputStream(inputStream)
, m_bufferPtr(bufferPtr)
, m_buffer(buffer)
, m_bufferSize(bufferSize)
, m_pos(positionStart)
, m_posEnd(positionEnd)
, m_buffer(buffer, bufferSize, bufferReadPosition, bufferWritePosition, bufferCanRead)
{}
public:
static std::shared_ptr<InputStreamBufferedProxy> createShared(const std::shared_ptr<InputStream>& inputStream,
const std::shared_ptr<oatpp::data::buffer::IOBuffer>& buffer)
const std::shared_ptr<oatpp::data::buffer::IOBuffer>& buffer)
{
return Shared_InputStreamBufferedProxy_Pool::allocateShared(inputStream,
buffer,
(p_char8) buffer->getData(),
buffer->getSize(),
0, 0);
0, 0, false);
}
static std::shared_ptr<InputStreamBufferedProxy> createShared(const std::shared_ptr<InputStream>& inputStream,
const std::shared_ptr<oatpp::data::buffer::IOBuffer>& buffer,
v_bufferSize positionStart,
v_bufferSize positionEnd)
const std::shared_ptr<oatpp::data::buffer::IOBuffer>& buffer,
data::v_io_size bufferReadPosition,
data::v_io_size bufferWritePosition,
bool bufferCanRead)
{
return Shared_InputStreamBufferedProxy_Pool::allocateShared(inputStream,
buffer,
(p_char8) buffer->getData(),
buffer->getSize(),
positionStart,
positionEnd);
bufferReadPosition,
bufferWritePosition,
bufferCanRead);
}
static std::shared_ptr<InputStreamBufferedProxy> createShared(const std::shared_ptr<InputStream>& inputStream,
p_char8 buffer,
v_bufferSize bufferSize)
p_char8 buffer,
v_bufferSize bufferSize)
{
return Shared_InputStreamBufferedProxy_Pool::allocateShared(inputStream,
nullptr,
buffer,
bufferSize,
0, 0);
0, 0, false);
}
static std::shared_ptr<InputStreamBufferedProxy> createShared(const std::shared_ptr<InputStream>& inputStream,
p_char8 buffer,
v_bufferSize bufferSize,
v_bufferSize positionStart,
v_bufferSize positionEnd)
p_char8 buffer,
v_bufferSize bufferSize,
data::v_io_size bufferReadPosition,
data::v_io_size bufferWritePosition,
bool bufferCanRead)
{
return Shared_InputStreamBufferedProxy_Pool::allocateShared(inputStream,
nullptr,
buffer,
bufferSize,
positionStart,
positionEnd);
bufferReadPosition,
bufferWritePosition,
bufferCanRead);
}
data::v_io_size read(void *data, data::v_io_size count) override;
void setBufferPosition(v_bufferSize pos, v_bufferSize posEnd){
m_pos = pos;
m_posEnd = posEnd;
void setBufferPosition(data::v_io_size readPosition, data::v_io_size writePosition, bool canRead) {
m_buffer.setBufferPosition(readPosition, writePosition, canRead);
}
};

View File

@ -123,7 +123,8 @@ HttpRequestExecutor::execute(const String& method,
auto bodyStream = oatpp::data::stream::InputStreamBufferedProxy::createShared(connection,
ioBuffer,
result.bufferPosStart,
result.bufferPosEnd);
result.bufferPosEnd,
result.bufferPosStart != result.bufferPosEnd);
return Response::createShared(result.startingLine.statusCode,
result.startingLine.description.toString(),
@ -210,7 +211,8 @@ oatpp::async::Action HttpRequestExecutor::executeAsync(oatpp::async::AbstractCor
auto bodyStream = oatpp::data::stream::InputStreamBufferedProxy::createShared(m_connection,
m_ioBuffer,
result.bufferPosStart,
result.bufferPosEnd);
result.bufferPosEnd,
result.bufferPosStart != result.bufferPosEnd);
return _return(Response::createShared(result.startingLine.statusCode,
result.startingLine.description.toString(),

View File

@ -59,7 +59,9 @@ HttpProcessor::processRequest(HttpRouter* router,
}
auto& bodyStream = inStream;
bodyStream->setBufferPosition(headersReadResult.bufferPosStart, headersReadResult.bufferPosEnd);
bodyStream->setBufferPosition(headersReadResult.bufferPosStart,
headersReadResult.bufferPosEnd,
headersReadResult.bufferPosStart != headersReadResult.bufferPosEnd);
auto request = protocol::http::incoming::Request::createShared(headersReadResult.startingLine,
route.matchMap,
@ -107,7 +109,9 @@ oatpp::async::Action HttpProcessor::Coroutine::onHeadersParsed(const RequestHead
}
auto& bodyStream = m_inStream;
bodyStream->setBufferPosition(headersReadResult.bufferPosStart, headersReadResult.bufferPosEnd);
bodyStream->setBufferPosition(headersReadResult.bufferPosStart,
headersReadResult.bufferPosEnd,
headersReadResult.bufferPosStart != headersReadResult.bufferPosEnd);
m_currentRequest = protocol::http::incoming::Request::createShared(headersReadResult.startingLine,
m_currentRoute.matchMap,

View File

@ -98,6 +98,18 @@ bool FullTest::onRun() {
OATPP_ASSERT(dto);
OATPP_ASSERT(dto->testValue == "my_test_body");
}
{ // test Big Echo with body
oatpp::data::stream::ChunkedBuffer stream;
for(v_int32 i = 0; i < oatpp::data::buffer::IOBuffer::BUFFER_SIZE; i++) {
stream.write("0123456789", 10);
}
auto data = stream.toString();
auto response = client->echoBody(data);
auto returnedData = response->readBodyToString();
OATPP_ASSERT(returnedData);
OATPP_ASSERT(returnedData == data);
}
}

View File

@ -77,6 +77,12 @@ public:
return createDtoResponse(Status::CODE_200, dto);
}
ENDPOINT("POST", "echo", echo,
BODY_STRING(String, body)) {
OATPP_LOGD(TAG, "POST body(echo) size=%d", body->getSize());
return createResponse(Status::CODE_200, body);
}
#include OATPP_CODEGEN_END(ApiController)
};