HttpRequestExecutor::executeAsync

This commit is contained in:
lganzzzo 2018-04-18 01:33:19 +03:00
parent 1e1fb31459
commit dd9795daf9
3 changed files with 144 additions and 11 deletions

View File

@ -58,7 +58,15 @@ std::shared_ptr<oatpp::data::stream::IOStream> SimpleTCPConnectionProvider::getC
return nullptr;
}
if (connect(clientHandle, (struct sockaddr *)&client, sizeof(client)) < 0 ) {
#ifdef SO_NOSIGPIPE
int yes = 1;
v_int32 ret = setsockopt(clientHandle, SOL_SOCKET, SO_NOSIGPIPE, &yes, sizeof(int));
if(ret < 0) {
OATPP_LOGD("SimpleTCPConnectionProvider", "Warning failed to set %s for socket", "SO_NOSIGPIPE");
}
#endif
if (connect(clientHandle, (struct sockaddr *)&client, sizeof(client)) != 0 ) {
oatpp::os::io::Library::handle_close(clientHandle);
OATPP_LOGD("SimpleTCPConnectionProvider", "Could not connect");
return nullptr;
@ -70,7 +78,71 @@ std::shared_ptr<oatpp::data::stream::IOStream> SimpleTCPConnectionProvider::getC
oatpp::async::Action SimpleTCPConnectionProvider::getConnectionAsync(oatpp::async::AbstractCoroutine* parentCoroutine,
AsyncCallback callback) {
throw std::runtime_error("oatpp::network::client::SimpleTCPConnectionProvider::getConnectionAsync not implemented");
class ConnectCoroutine : public oatpp::async::CoroutineWithResult<ConnectCoroutine, std::shared_ptr<oatpp::data::stream::IOStream>> {
private:
oatpp::base::String::PtrWrapper m_host;
v_int32 m_port;
oatpp::os::io::Library::v_handle m_clientHandle;
struct sockaddr_in m_client;
public:
ConnectCoroutine(const oatpp::base::String::PtrWrapper& host, v_int32 port)
: m_host(host)
, m_port(port)
{}
Action act() override {
struct hostent* host = gethostbyname((const char*) m_host->getData());
if ((host == NULL) || (host->h_addr == NULL)) {
return error("Error retrieving DNS information.");
}
bzero(&m_client, sizeof(m_client));
m_client.sin_family = AF_INET;
m_client.sin_port = htons(m_port);
memcpy(&m_client.sin_addr, host->h_addr, host->h_length);
m_clientHandle = socket(AF_INET, SOCK_STREAM, 0);
if (m_clientHandle < 0) {
return error("Error creating socket.");
}
fcntl(m_clientHandle, F_SETFL, O_NONBLOCK);
#ifdef SO_NOSIGPIPE
int yes = 1;
v_int32 ret = setsockopt(m_clientHandle, SOL_SOCKET, SO_NOSIGPIPE, &yes, sizeof(int));
if(ret < 0) {
OATPP_LOGD("SimpleTCPConnectionProvider", "Warning failed to set %s for socket", "SO_NOSIGPIPE");
}
#endif
return yieldTo(&ConnectCoroutine::doConnect);
}
Action doConnect() {
errno = 0;
auto res = connect(m_clientHandle, (struct sockaddr *)&m_client, sizeof(m_client));
if(res == 0) {
return _return(oatpp::network::Connection::createShared(m_clientHandle));
}
if(errno == EALREADY || errno == EINPROGRESS) {
return waitRetry();
} else if(errno == EINTR) {
return repeat();
}
return error("Can't connect");
}
};
return parentCoroutine->startCoroutineForResult<ConnectCoroutine>(callback, m_host, m_port);
}
}}}

View File

@ -112,31 +112,87 @@ oatpp::async::Action HttpRequestExecutor::executeAsync(oatpp::async::AbstractCor
class ExecutorCoroutine : public oatpp::async::CoroutineWithResult<ExecutorCoroutine, std::shared_ptr<HttpRequestExecutor::Response>> {
private:
std::shared_ptr<oatpp::network::ClientConnectionProvider> m_connectionProvider;
String::PtrWrapper m_method;
String::PtrWrapper m_path;
std::shared_ptr<Headers> m_headers;
std::shared_ptr<Body> m_body;
private:
std::shared_ptr<oatpp::data::stream::IOStream> m_connection;
std::shared_ptr<oatpp::data::buffer::IOBuffer> m_ioBuffer;
void* m_bufferPointer;
os::io::Library::v_size m_bufferBytesLeftToRead;
public:
ExecutorCoroutine(const String::PtrWrapper& method,
ExecutorCoroutine(const std::shared_ptr<oatpp::network::ClientConnectionProvider>& connectionProvider,
const String::PtrWrapper& method,
const String::PtrWrapper& path,
const std::shared_ptr<Headers>& headers,
const std::shared_ptr<Body>& body)
: m_method(method)
: m_connectionProvider(connectionProvider)
, m_method(method)
, m_path(path)
, m_headers(headers)
, m_body(body)
{}
Action act() override {
return _return(nullptr);
oatpp::network::ClientConnectionProvider::AsyncCallback callback =
static_cast<oatpp::network::ClientConnectionProvider::AsyncCallback>(&ExecutorCoroutine::onConnectionReady);
return m_connectionProvider->getConnectionAsync(this, callback);
}
Action onConnectionReady(const std::shared_ptr<oatpp::data::stream::IOStream>& connection) {
m_connection = connection;
auto request = oatpp::web::protocol::http::outgoing::Request::createShared(m_method, m_path, m_headers, m_body);
request->headers->put(oatpp::web::protocol::http::Header::HOST, m_connectionProvider->getHost());
m_ioBuffer = oatpp::data::buffer::IOBuffer::createShared();
auto upStream = oatpp::data::stream::OutputStreamBufferedProxy::createShared(connection, m_ioBuffer);
m_bufferPointer = m_ioBuffer->getData();
m_bufferBytesLeftToRead = m_ioBuffer->getSize();
return request->sendAsync(this, upStream->flushAsync(this, yieldTo(&ExecutorCoroutine::readResponse)), upStream);
}
Action readResponse() {
return oatpp::data::stream::IOStream::
readSomeDataAsyncInline(m_connection.get(), m_bufferPointer, m_bufferBytesLeftToRead, yieldTo(&ExecutorCoroutine::parseResponse));
}
Action parseResponse() {
os::io::Library::v_size readCount = m_ioBuffer->getSize() - m_bufferBytesLeftToRead;
if(readCount > 0) {
oatpp::parser::ParsingCaret caret((p_char8) m_ioBuffer->getData(), m_ioBuffer->getSize());
auto line = protocol::http::Protocol::parseResponseStartingLine(caret);
if(!line){
return error("Invalid starting line");
}
oatpp::web::protocol::http::Status err;
auto headers = protocol::http::Protocol::parseHeaders(caret, err);
if(err.code != 0){
return error("can't parse headers");
}
auto bodyStream = oatpp::data::stream::InputStreamBufferedProxy::createShared(m_connection,
m_ioBuffer,
caret.getPosition(),
(v_int32) readCount);
return _return(Response::createShared(line->statusCode, line->description, headers, bodyStream));
}
return error("Read zero bytes from Response");
}
};
return parentCoroutine->startCoroutineForResult<ExecutorCoroutine>(callback, method, path, headers, body);
return parentCoroutine->startCoroutineForResult<ExecutorCoroutine>(callback, m_connectionProvider, method, path, headers, body);
}

View File

@ -127,6 +127,13 @@ oatpp::async::Action BodyDecoder::doChunkedDecodingAsync(oatpp::async::AbstractC
void* m_skipData;
os::io::Library::v_size m_skipSize;
bool m_done = false;
private:
void prepareSkipRN() {
m_skipData = &m_lineBuffer[0];
m_skipSize = 2;
m_currLineLength = 0;
m_lineEnding = false;
}
public:
ChunkedDecoder(const std::shared_ptr<oatpp::data::stream::InputStream>& fromStream,
@ -148,7 +155,7 @@ oatpp::async::Action BodyDecoder::doChunkedDecodingAsync(oatpp::async::AbstractC
} else if(res == oatpp::data::stream::IOStream::ERROR_IO_RETRY) {
return oatpp::async::Action::_REPEAT;
} else if( res < 0) {
return oatpp::async::Action(oatpp::async::Error("[BodyDecoder::ChunkedDecoder] Can't read line char"));
return error("[BodyDecoder::ChunkedDecoder] Can't read line char");
}
return yieldTo(&ChunkedDecoder::onLineCharRead);
}
@ -180,17 +187,15 @@ oatpp::async::Action BodyDecoder::doChunkedDecodingAsync(oatpp::async::AbstractC
Action onLineRead() {
os::io::Library::v_size countToRead = std::strtol((const char*) m_lineBuffer, nullptr, 16);
if(countToRead > 0) {
prepareSkipRN();
return oatpp::data::stream::transferAsync(this, yieldTo(&ChunkedDecoder::skipRN), m_fromStream, m_toStream, countToRead, m_buffer);
}
m_done = true;
prepareSkipRN();
return yieldTo(&ChunkedDecoder::skipRN);
}
Action skipRN() {
m_skipData = &m_lineBuffer[0];
m_skipSize = 2;
m_currLineLength = 0;
m_lineEnding = false;
if(m_done) {
return oatpp::data::stream::IOStream::readExactSizeDataAsyncInline(m_fromStream.get(),
m_skipData,