Feature Multipart. Add Simple-API http::outgoing::MultipartBody.

This commit is contained in:
lganzzzo 2019-07-23 01:29:39 +04:00
parent 1291add834
commit 3944412fa6
12 changed files with 485 additions and 32 deletions

View File

@ -175,6 +175,8 @@ add_library(oatpp
oatpp/web/protocol/http/outgoing/CommunicationUtils.hpp
oatpp/web/protocol/http/outgoing/DtoBody.cpp
oatpp/web/protocol/http/outgoing/DtoBody.hpp
oatpp/web/protocol/http/outgoing/MultipartBody.cpp
oatpp/web/protocol/http/outgoing/MultipartBody.hpp
oatpp/web/protocol/http/outgoing/Request.cpp
oatpp/web/protocol/http/outgoing/Request.hpp
oatpp/web/protocol/http/outgoing/Response.cpp

View File

@ -34,6 +34,24 @@ BufferInputStream::BufferInputStream(const std::shared_ptr<base::StrBuffer>& mem
, m_ioMode(IOMode::NON_BLOCKING)
{}
BufferInputStream::BufferInputStream(const oatpp::String& data)
: BufferInputStream(data.getPtr(), data->getData(), data->getSize())
{}
void BufferInputStream::reset(const std::shared_ptr<base::StrBuffer>& memoryHandle, p_char8 data, v_io_size size) {
m_memoryHandle = memoryHandle;
m_data = data;
m_size = size;
m_position = 0;
}
void BufferInputStream::reset() {
m_memoryHandle = nullptr;
m_data = nullptr;
m_size = 0;
m_position = 0;
}
data::v_io_size BufferInputStream::read(void *data, data::v_io_size count) {
data::v_io_size desiredAmount = count;
if(desiredAmount > m_size - m_position) {
@ -84,8 +102,8 @@ v_io_size BufferInputStream::getCurrentPosition() {
return m_position;
}
void BufferInputStream::resetPosition() {
m_position = 0;
void BufferInputStream::setCurrentPosition(v_io_size position) {
m_position = position;
}
}}}

View File

@ -46,6 +46,26 @@ public:
*/
BufferInputStream(const std::shared_ptr<base::StrBuffer>& memoryHandle, p_char8 data, v_io_size size);
/**
* Constructor.
* @param data - buffer.
*/
BufferInputStream(const oatpp::String& data);
/**
* Reset stream data and set position to `0`.
* @param memoryHandle - buffer memory handle. May be nullptr.
* @param data - pointer to buffer data.
* @param size - size of the buffer.
*/
void reset(const std::shared_ptr<base::StrBuffer>& memoryHandle, p_char8 data, v_io_size size);
/**
* Same as `reset(nullptr, nullptr, 0);.`
*/
void reset();
/**
* Read data from stream. <br>
* It is a legal case if return result < count. Caller should handle this!
@ -101,9 +121,10 @@ public:
v_io_size getCurrentPosition();
/**
* Reset current data read position to zero.
* Set current data read position.
* @param position - data read position.
*/
void resetPosition();
void setCurrentPosition(v_io_size position);
};

View File

@ -66,6 +66,12 @@ void Part::setDataInfo(const std::shared_ptr<data::stream::InputStream>& inputSt
m_knownSize = knownSize;
}
void Part::setDataInfo(const std::shared_ptr<data::stream::InputStream>& inputStream) {
m_inputStream = inputStream;
m_inMemoryData = nullptr;
m_knownSize = -1;
}
oatpp::String Part::getName() const {
return m_name;
}
@ -88,6 +94,19 @@ oatpp::String Part::getHeader(const oatpp::data::share::StringKeyLabelCI_FAST &h
return nullptr;
}
void Part::putHeader(const oatpp::data::share::StringKeyLabelCI_FAST& key, const oatpp::data::share::StringKeyLabel& value) {
m_headers[key] = value;
}
bool Part::putHeaderIfNotExists(const oatpp::data::share::StringKeyLabelCI_FAST& key, const oatpp::data::share::StringKeyLabel& value) {
auto it = m_headers.find(key);
if(it == m_headers.end()) {
m_headers.insert({key, value});
return true;
}
return false;
}
std::shared_ptr<data::stream::InputStream> Part::getInputStream() const {
return m_inputStream;
}

View File

@ -79,6 +79,12 @@ public:
const oatpp::String inMemoryData,
data::v_io_size knownSize);
/**
* Same as `setDataInfo(inputStream, nullptr, -1);.`
* @param inputStream - input stream of the part data.
*/
void setDataInfo(const std::shared_ptr<data::stream::InputStream>& inputStream);
/**
* Get name of the part.
* @return - name of the part.
@ -104,6 +110,21 @@ public:
*/
oatpp::String getHeader(const oatpp::data::share::StringKeyLabelCI_FAST& headerName) const;
/**
* Add http header.
* @param key - &id:oatpp::data::share::StringKeyLabelCI_FAST;.
* @param value - &id:oatpp::data::share::StringKeyLabel;.
*/
void putHeader(const oatpp::data::share::StringKeyLabelCI_FAST& key, const oatpp::data::share::StringKeyLabel& value);
/**
* Add http header if not already exists.
* @param key - &id:oatpp::data::share::StringKeyLabelCI_FAST;.
* @param value - &id:oatpp::data::share::StringKeyLabel;.
* @return - `true` if header was added.
*/
bool putHeaderIfNotExists(const oatpp::data::share::StringKeyLabelCI_FAST& key, const oatpp::data::share::StringKeyLabel& value);
/**
* Get input stream of the part data.
* @return - input stream of the part data.

View File

@ -244,6 +244,9 @@ oatpp::String HeaderValueData::getTitleParamValue(const data::share::StringKeyLa
}
return nullptr;
}
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// Parser
oatpp::data::share::StringKeyLabelCI_FAST Parser::parseHeaderNameLabel(const std::shared_ptr<oatpp::base::StrBuffer>& headersText,
oatpp::parser::Caret& caret) {
@ -367,36 +370,38 @@ void Parser::parseHeaderValueData(HeaderValueData& data, const oatpp::data::shar
v_char8 charSet[5] = {' ', '=', separator, '\r', '\n'};
v_char8 charSet2[4] = {' ', separator, '\r', '\n'};
while(caret.canContinue()) {
while (caret.canContinue()) {
caret.skipChar(' ');
auto label = caret.putLabel();
auto res = caret.findCharFromSet(charSet, 5);
if(res == '=') {
if (res == '=') {
data::share::StringKeyLabelCI key(headerValue.getMemoryHandle(), label.getData(), label.getSize());
caret.inc();
if(caret.isAtChar('"')) {
if (caret.isAtChar('"')) {
label = caret.parseStringEnclosed('"', '"', '\\');
} else if(caret.isAtChar('\'')) {
} else if (caret.isAtChar('\'')) {
label = caret.parseStringEnclosed('\'', '\'', '\\');
} else {
label = caret.putLabel();
auto r = caret.findCharFromSet(charSet2, 4);
}
data.titleParams[key] = data::share::StringKeyLabel(headerValue.getMemoryHandle(), label.getData(), label.getSize());
data.titleParams[key] = data::share::StringKeyLabel(headerValue.getMemoryHandle(), label.getData(),
label.getSize());
} else {
data.tokens.insert(data::share::StringKeyLabelCI(headerValue.getMemoryHandle(), label.getData(), label.getSize()));
data.tokens.insert(
data::share::StringKeyLabelCI(headerValue.getMemoryHandle(), label.getData(), label.getSize()));
}
if(caret.isAtCharFromSet((p_char8)"\r\n", 2)) {
if (caret.isAtCharFromSet((p_char8) "\r\n", 2)) {
break;
} else if(caret.isAtChar(separator)) {
} else if (caret.isAtChar(separator)) {
caret.inc();
}
@ -404,4 +409,20 @@ void Parser::parseHeaderValueData(HeaderValueData& data, const oatpp::data::shar
}
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// Utils
void Utils::writeHeaders(const Headers& headers, data::stream::ConsistentOutputStream* stream) {
auto it = headers.begin();
while(it != headers.end()) {
stream->write(it->first.getData(), it->first.getSize());
stream->write(": ", 2);
stream->write(it->second.getData(), it->second.getSize());
stream->write("\r\n", 2);
it ++;
}
}
}}}}

View File

@ -684,6 +684,21 @@ public:
static void parseHeaderValueData(HeaderValueData& data, const oatpp::data::share::StringKeyLabel& headerValue, v_char8 separator);
};
/**
* Http utils.
*/
class Utils {
public:
/**
* Write headers map to stream.
* @param headers
* @param stream
*/
static void writeHeaders(const Headers& headers, data::stream::ConsistentOutputStream* stream);
};
}}}}

View File

@ -70,7 +70,7 @@ public:
~ChunkedBody();
/**
* Declare `Transfer-Encoding: chunked` header
* Declare `Transfer-Encoding: chunked` header.
* @param headers - &id:oatpp::web::protocol::http::Headers;.
*/
void declareHeaders(Headers& headers) noexcept override;
@ -82,7 +82,7 @@ public:
void writeToStream(OutputStream* stream) noexcept override;
/**
* Start &l:ChunkedBody::WriteToStreamCoroutine; to write buffer data to stream.
* Write body data to stream in asynchronous manner.
* @param stream - &id:oatpp::data::stream::OutputStream;.
* @return - &id:oatpp::async::CoroutineStarter;.
*/

View File

@ -0,0 +1,211 @@
/***************************************************************************
*
* Project _____ __ ____ _ _
* ( _ ) /__\ (_ _)_| |_ _| |_
* )(_)( /(__)\ )( (_ _)(_ _)
* (_____)(__)(__)(__) |_| |_|
*
*
* Copyright 2018-present, Leonid Stryzhevskyi <lganzzzo@gmail.com>
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
***************************************************************************/
#include "MultipartBody.hpp"
#include "oatpp/core/data/stream/ChunkedBuffer.hpp"
namespace oatpp { namespace web { namespace protocol { namespace http { namespace outgoing {
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// MultipartReadCallback
MultipartBody::MultipartReadCallback::MultipartReadCallback(const std::shared_ptr<Multipart>& multipart)
: m_multipart(multipart)
, m_iterator(multipart->getAllParts().begin())
, m_state(STATE_BOUNDARY)
, m_readStream(nullptr, nullptr, 0)
{}
data::v_io_size MultipartBody::MultipartReadCallback::readBoundary(void *buffer, data::v_io_size count) {
if (!m_readStream.getDataMemoryHandle()) {
oatpp::String boundary;
if (m_iterator == m_multipart->getAllParts().end()) {
boundary = "\r\n--" + m_multipart->getBoundary() + "--\r\n";
} else if (m_iterator == m_multipart->getAllParts().begin()) {
boundary = "--" + m_multipart->getBoundary() + "\r\n";
} else {
boundary = "\r\n--" + m_multipart->getBoundary() + "\r\n";
}
m_readStream.reset(boundary.getPtr(), boundary->getData(), boundary->getSize());
}
auto res = m_readStream.read(buffer, count);
if(res == 0) {
m_readStream.reset();
}
return res;
}
data::v_io_size MultipartBody::MultipartReadCallback::readHeaders(void *buffer, data::v_io_size count) {
if (!m_readStream.getDataMemoryHandle()) {
oatpp::data::stream::ChunkedBuffer stream;
auto& part = *m_iterator;
http::Utils::writeHeaders(part->getHeaders(), &stream);
stream.write("\r\n", 2);
auto buffer = stream.toString();
m_readStream.reset(buffer.getPtr(), buffer->getData(), buffer->getSize());
}
auto res = m_readStream.read(buffer, count);
if(res == 0) {
m_readStream.reset();
}
return res;
}
data::v_io_size MultipartBody::MultipartReadCallback::readBody(void *buffer, data::v_io_size count) {
auto& part = *m_iterator;
const auto& stream = part->getInputStream();
if(!stream) {
OATPP_LOGE("[oatpp::web::protocol::http::outgoing::MultipartBody::MultipartReadCallback::readBody()]", "Error. Part has no input stream", m_state);
m_iterator ++;
return 0;
}
auto res = stream->read(buffer, count);
if(res == 0) {
m_iterator ++;
}
return res;
}
data::v_io_size MultipartBody::MultipartReadCallback::read(void *buffer, data::v_io_size count) {
if(m_state == STATE_FINISHED) {
return 0;
}
p_char8 currBufferPtr = (p_char8) buffer;
data::v_io_size bytesLeft = count;
data::v_io_size res = 0;
while(bytesLeft > 0) {
switch (m_state) {
case STATE_BOUNDARY:
res = readBoundary(currBufferPtr, bytesLeft);
break;
case STATE_HEADERS:
res = readHeaders(currBufferPtr, bytesLeft);
break;
case STATE_BODY:
res = readBody(currBufferPtr, bytesLeft);
break;
default:
OATPP_LOGE("[oatpp::web::protocol::http::outgoing::MultipartBody::MultipartReadCallback::read()]", "Error. Invalid state %d", m_state);
return 0;
}
if(res > 0) {
currBufferPtr = &currBufferPtr[res];
bytesLeft -= res;
} else if(res == 0) {
if(m_state == STATE_BOUNDARY && m_iterator == m_multipart->getAllParts().end()) {
m_state = STATE_FINISHED;
break;
}
m_state += 1;
if(m_state == STATE_ROUND) {
m_state = 0;
}
} else {
OATPP_LOGE("[oatpp::web::protocol::http::outgoing::MultipartBody::MultipartReadCallback::read()]", "Error. Invalid read result %d. State=%d", res, m_state);
return 0;
}
}
return count - bytesLeft;
}
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// AsyncMultipartReadCallback
MultipartBody::AsyncMultipartReadCallback::AsyncMultipartReadCallback(const std::shared_ptr<Multipart>& multipart)
: m_multipart(multipart)
{}
oatpp::async::Action MultipartBody::AsyncMultipartReadCallback::readAsyncInline(oatpp::async::AbstractCoroutine* coroutine,
void*& currBufferPtr,
data::v_io_size& bytesLeftToRead,
oatpp::async::Action&& nextAction)
{
return std::forward<oatpp::async::Action>(nextAction);
}
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// MultipartBody
MultipartBody::MultipartBody(const std::shared_ptr<Multipart>& multipart)
: ChunkedBody(std::make_shared<MultipartReadCallback>(multipart),
std::make_shared<AsyncMultipartReadCallback>(multipart),
4096)
, m_multipart(multipart)
{}
void MultipartBody::declareHeaders(Headers& headers) noexcept {
if(m_multipart->getAllParts().empty()) {
headers[oatpp::web::protocol::http::Header::CONTENT_LENGTH] = "0";
return;
}
ChunkedBody::declareHeaders(headers);
headers[oatpp::web::protocol::http::Header::CONTENT_TYPE] = "multipart/form-data; boundary=" + m_multipart->getBoundary();
}
void MultipartBody::writeToStream(OutputStream* stream) noexcept {
if(m_multipart->getAllParts().empty()) {
return;
}
ChunkedBody::writeToStream(stream);
}
oatpp::async::CoroutineStarter MultipartBody::writeToStreamAsync(const std::shared_ptr<OutputStream>& stream) {
if(m_multipart->getAllParts().empty()) {
return nullptr;
}
return ChunkedBody::writeToStreamAsync(stream);
}
}}}}}

View File

@ -0,0 +1,139 @@
/***************************************************************************
*
* Project _____ __ ____ _ _
* ( _ ) /__\ (_ _)_| |_ _| |_
* )(_)( /(__)\ )( (_ _)(_ _)
* (_____)(__)(__)(__) |_| |_|
*
*
* Copyright 2018-present, Leonid Stryzhevskyi <lganzzzo@gmail.com>
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
***************************************************************************/
#ifndef oatpp_web_protocol_http_outgoing_MultipartBody_hpp
#define oatpp_web_protocol_http_outgoing_MultipartBody_hpp
#include "./Body.hpp"
#include "./ChunkedBody.hpp"
#include "oatpp/web/mime/multipart/Multipart.hpp"
#include "oatpp/core/data/stream/BufferInputStream.hpp"
namespace oatpp { namespace web { namespace protocol { namespace http { namespace outgoing {
class MultipartBody : public ChunkedBody {
public:
/**
* Convenience typedef for &id:oatpp::web::mime::multipart::Multipart;.
*/
typedef oatpp::web::mime::multipart::Multipart Multipart;
/**
* Convenience typedef for &id:oatpp::web::mime::multipart::Part;.
*/
typedef oatpp::web::mime::multipart::Part Part;
private:
/*
* Convenience typedef for &id:oatpp::data::stream::ReadCallback;.
*/
typedef oatpp::data::stream::ReadCallback ReadCallback;
/*
* Convenience typedef for &id:oatpp::data::stream::AsyncReadCallback;.
*/
typedef oatpp::data::stream::AsyncReadCallback AsyncReadCallback;
private:
static constexpr v_int32 STATE_BOUNDARY = 0;
static constexpr v_int32 STATE_HEADERS = 1;
static constexpr v_int32 STATE_BODY = 2;
static constexpr v_int32 STATE_ROUND = 3; // number of possible states. used to round the state.
static constexpr v_int32 STATE_FINISHED = 4;
private:
class MultipartReadCallback : public ReadCallback {
private:
std::shared_ptr<Multipart> m_multipart;
std::list<std::shared_ptr<Part>>::const_iterator m_iterator;
v_int32 m_state;
oatpp::data::stream::BufferInputStream m_readStream;
private:
data::v_io_size readBoundary(void *buffer, data::v_io_size count);
data::v_io_size readHeaders(void *buffer, data::v_io_size count);
data::v_io_size readBody(void *buffer, data::v_io_size count);
public:
MultipartReadCallback(const std::shared_ptr<Multipart>& multipart);
data::v_io_size read(void *buffer, data::v_io_size count) override;
};
private:
class AsyncMultipartReadCallback : public AsyncReadCallback {
private:
std::shared_ptr<Multipart> m_multipart;
public:
AsyncMultipartReadCallback(const std::shared_ptr<Multipart>& multipart);
oatpp::async::Action readAsyncInline(oatpp::async::AbstractCoroutine* coroutine,
void*& currBufferPtr,
data::v_io_size& bytesLeftToRead,
oatpp::async::Action&& nextAction) override;
};
private:
std::shared_ptr<Multipart> m_multipart;
public:
/**
* Constructor.
* @param multipart - multipart object.
*/
MultipartBody(const std::shared_ptr<Multipart>& multipart);
/**
* Declare `Transfer-Encoding: chunked`, `Content-Type: multipart/<type>` header.
* @param headers - &id:oatpp::web::protocol::http::Headers;.
*/
void declareHeaders(Headers& headers) noexcept override;
/**
* Write body data to stream.
* @param stream - pointer to &id:oatpp::data::stream::OutputStream;.
*/
void writeToStream(OutputStream* stream) noexcept override;
/**
* Write body data to stream in asynchronous manner.
* @param stream - &id:oatpp::data::stream::OutputStream;.
* @return - &id:oatpp::async::CoroutineStarter;.
*/
oatpp::async::CoroutineStarter writeToStreamAsync(const std::shared_ptr<OutputStream>& stream) override;
};
}}}}}
#endif // oatpp_web_protocol_http_outgoing_MultipartBody_hpp

View File

@ -134,15 +134,8 @@ oatpp::async::CoroutineStarter Request::sendAsync(const std::shared_ptr<data::st
m_buffer->write(" ", 1);
m_buffer->write("HTTP/1.1", 8);
m_buffer->write("\r\n", 2);
auto it = m_request->m_headers.begin();
while(it != m_request->m_headers.end()){
m_buffer->write(it->first.getData(), it->first.getSize());
m_buffer->write(": ", 2);
m_buffer->write(it->second.getData(), it->second.getSize());
m_buffer->write("\r\n", 2);
it++;
}
http::Utils::writeHeaders(m_request->m_headers, m_buffer.get());
m_buffer->write("\r\n", 2);

View File

@ -138,15 +138,8 @@ oatpp::async::CoroutineStarter Response::sendAsync(const std::shared_ptr<data::s
m_buffer->write(" ", 1);
m_buffer->OutputStream::write(m_response->m_status.description);
m_buffer->write("\r\n", 2);
auto it = m_response->m_headers.begin();
while(it != m_response->m_headers.end()) {
m_buffer->write(it->first.getData(), it->first.getSize());
m_buffer->write(": ", 2);
m_buffer->write(it->second.getData(), it->second.getSize());
m_buffer->write("\r\n", 2);
it ++;
}
http::Utils::writeHeaders(m_response->m_headers, m_buffer.get());
m_buffer->write("\r\n", 2);