oatpp/test/network/virtual_/PipeTest.cpp
2018-11-15 12:01:11 +02:00

151 lines
4.6 KiB
C++

/***************************************************************************
*
* Project _____ __ ____ _ _
* ( _ ) /__\ (_ _)_| |_ _| |_
* )(_)( /(__)\ )( (_ _)(_ _)
* (_____)(__)(__)(__) |_| |_|
*
*
* Copyright 2018-present, Leonid Stryzhevskyi, <lganzzzo@gmail.com>
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
***************************************************************************/
#include "PipeTest.hpp"
#include "oatpp/network/virtual_/Pipe.hpp"
#include "oatpp/core/data/stream/ChunkedBuffer.hpp"
#include "oatpp/core/concurrency/Thread.hpp"
#include "oatpp/test/Checker.hpp"
#include <iostream>
namespace oatpp { namespace test { namespace network { namespace virtual_ {
namespace {
typedef oatpp::network::virtual_::Pipe Pipe;
const char* DATA_CHUNK = "<0123456789/abcdefghijklmnopqrstuvwxyz/ABCDEFGHIJKLMNOPQRSTUVWXYZ>";
const os::io::Library::v_size CHUNK_SIZE = std::strlen(DATA_CHUNK);
class WriterTask : public oatpp::concurrency::Runnable {
private:
std::shared_ptr<Pipe> m_pipe;
v_int32 m_chunksToTransfer;
os::io::Library::v_size m_position = 0;
os::io::Library::v_size m_transferedBytes = 0;
public:
WriterTask(const std::shared_ptr<Pipe>& pipe, v_int32 chunksToTransfer)
: m_pipe(pipe)
, m_chunksToTransfer(chunksToTransfer)
{}
void run() override {
while (m_transferedBytes < CHUNK_SIZE * m_chunksToTransfer) {
auto res = m_pipe->getWriter()->write(&DATA_CHUNK[m_position], CHUNK_SIZE - m_position);
if(res > 0) {
m_transferedBytes += res;
m_position += res;
if(m_position == CHUNK_SIZE) {
m_position = 0;
}
}
}
OATPP_LOGD("WriterTask", "sent %d bytes", m_transferedBytes);
}
};
class ReaderTask : public oatpp::concurrency::Runnable {
private:
std::shared_ptr<oatpp::data::stream::ChunkedBuffer> m_buffer;
std::shared_ptr<Pipe> m_pipe;
v_int32 m_chunksToTransfer;
public:
ReaderTask(const std::shared_ptr<oatpp::data::stream::ChunkedBuffer> &buffer,
const std::shared_ptr<Pipe>& pipe,
v_int32 chunksToTransfer)
: m_buffer(buffer)
, m_pipe(pipe)
, m_chunksToTransfer(chunksToTransfer)
{}
void run() override {
v_char8 readBuffer[256];
while (m_buffer->getSize() < CHUNK_SIZE * m_chunksToTransfer) {
auto res = m_pipe->getReader()->read(readBuffer, 256);
if(res > 0) {
m_buffer->write(readBuffer, res);
}
}
OATPP_LOGD("ReaderTask", "sent %d bytes", m_buffer->getSize());
}
};
void runTransfer(const std::shared_ptr<Pipe>& pipe, v_int32 chunksToTransfer, bool writeNonBlock, bool readerNonBlock) {
OATPP_LOGD("transfer", "writer-nb: %d, reader-nb: %d", writeNonBlock, readerNonBlock);
auto buffer = oatpp::data::stream::ChunkedBuffer::createShared();
{
oatpp::test::PerformanceChecker timer("timer");
auto writerThread = oatpp::concurrency::Thread::createShared(std::make_shared<WriterTask>(pipe, chunksToTransfer));
auto readerThread = oatpp::concurrency::Thread::createShared(std::make_shared<ReaderTask>(buffer, pipe, chunksToTransfer));
writerThread->join();
readerThread->join();
}
OATPP_ASSERT(buffer->getSize() == chunksToTransfer * CHUNK_SIZE);
auto ruleBuffer = oatpp::data::stream::ChunkedBuffer::createShared();
for(v_int32 i = 0; i < chunksToTransfer; i ++) {
ruleBuffer->write(DATA_CHUNK, CHUNK_SIZE);
}
auto str1 = buffer->toString();
auto str2 = buffer->toString();
OATPP_ASSERT(str1 == str2);
}
}
bool PipeTest::onRun() {
auto pipe = Pipe::createShared();
v_int32 chunkCount = oatpp::data::buffer::IOBuffer::BUFFER_SIZE * 10 / CHUNK_SIZE;
runTransfer(pipe, chunkCount, false, false);
runTransfer(pipe, chunkCount, true, false);
runTransfer(pipe, chunkCount, false, true);
runTransfer(pipe, chunkCount, true, true);
return true;
}
}}}}