mirror of
https://github.com/curl/curl.git
synced 2025-01-24 14:15:18 +08:00
http2: improved upload eos handling
- replace the counting of upload lengths with the new eos send flag - improve frequency of stream draining to happen less on events where it is not needed - this PR is based on #14220 http2, cf-h2-proxy: fix EAGAINed out buffer - in adjust pollset and shutdown handling, a non-empty `ctx->outbufq` must trigger send polling, irregardless of http/2 flow control - in http2, fix retry handling of blocked GOAWAY frame test case improvement: - let client 'upload-pausing' handle http versions Closes #14253
This commit is contained in:
parent
344ba8c883
commit
35bf766280
@ -1080,7 +1080,7 @@ static CURLcode H2_CONNECT(struct Curl_cfilter *cf,
|
||||
} while(ts->state == H2_TUNNEL_INIT);
|
||||
|
||||
out:
|
||||
if(result || ctx->tunnel.closed)
|
||||
if((result && (result != CURLE_AGAIN)) || ctx->tunnel.closed)
|
||||
h2_tunnel_go_state(cf, ts, H2_TUNNEL_FAILED, data);
|
||||
return result;
|
||||
}
|
||||
@ -1576,6 +1576,7 @@ static CURLcode cf_h2_proxy_query(struct Curl_cfilter *cf,
|
||||
case CF_QUERY_NEED_FLUSH: {
|
||||
if(!Curl_bufq_is_empty(&ctx->outbufq) ||
|
||||
!Curl_bufq_is_empty(&ctx->tunnel.sendbuf)) {
|
||||
CURL_TRC_CF(data, cf, "needs flush");
|
||||
*pres1 = TRUE;
|
||||
return CURLE_OK;
|
||||
}
|
||||
|
@ -96,6 +96,21 @@ static bool cf_hc_baller_data_pending(struct cf_hc_baller *b,
|
||||
return b->cf && !b->result && b->cf->cft->has_data_pending(b->cf, data);
|
||||
}
|
||||
|
||||
static bool cf_hc_baller_needs_flush(struct cf_hc_baller *b,
|
||||
struct Curl_easy *data)
|
||||
{
|
||||
return b->cf && !b->result && Curl_conn_cf_needs_flush(b->cf, data);
|
||||
}
|
||||
|
||||
static CURLcode cf_hc_baller_cntrl(struct cf_hc_baller *b,
|
||||
struct Curl_easy *data,
|
||||
int event, int arg1, void *arg2)
|
||||
{
|
||||
if(b->cf && !b->result)
|
||||
return Curl_conn_cf_cntrl(b->cf, data, FALSE, event, arg1, arg2);
|
||||
return CURLE_OK;
|
||||
}
|
||||
|
||||
struct cf_hc_ctx {
|
||||
cf_hc_state state;
|
||||
const struct Curl_dns_entry *remotehost;
|
||||
@ -428,6 +443,8 @@ static CURLcode cf_hc_query(struct Curl_cfilter *cf,
|
||||
struct Curl_easy *data,
|
||||
int query, int *pres1, void *pres2)
|
||||
{
|
||||
struct cf_hc_ctx *ctx = cf->ctx;
|
||||
|
||||
if(!cf->connected) {
|
||||
switch(query) {
|
||||
case CF_QUERY_TIMER_CONNECT: {
|
||||
@ -440,6 +457,14 @@ static CURLcode cf_hc_query(struct Curl_cfilter *cf,
|
||||
*when = cf_get_max_baller_time(cf, data, CF_QUERY_TIMER_APPCONNECT);
|
||||
return CURLE_OK;
|
||||
}
|
||||
case CF_QUERY_NEED_FLUSH: {
|
||||
if(cf_hc_baller_needs_flush(&ctx->h3_baller, data)
|
||||
|| cf_hc_baller_needs_flush(&ctx->h21_baller, data)) {
|
||||
*pres1 = TRUE;
|
||||
return CURLE_OK;
|
||||
}
|
||||
break;
|
||||
}
|
||||
default:
|
||||
break;
|
||||
}
|
||||
@ -449,6 +474,23 @@ static CURLcode cf_hc_query(struct Curl_cfilter *cf,
|
||||
CURLE_UNKNOWN_OPTION;
|
||||
}
|
||||
|
||||
static CURLcode cf_hc_cntrl(struct Curl_cfilter *cf,
|
||||
struct Curl_easy *data,
|
||||
int event, int arg1, void *arg2)
|
||||
{
|
||||
struct cf_hc_ctx *ctx = cf->ctx;
|
||||
CURLcode result = CURLE_OK;
|
||||
|
||||
if(!cf->connected) {
|
||||
result = cf_hc_baller_cntrl(&ctx->h3_baller, data, event, arg1, arg2);
|
||||
if(!result || (result == CURLE_AGAIN))
|
||||
result = cf_hc_baller_cntrl(&ctx->h21_baller, data, event, arg1, arg2);
|
||||
if(result == CURLE_AGAIN)
|
||||
result = CURLE_OK;
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
static void cf_hc_close(struct Curl_cfilter *cf, struct Curl_easy *data)
|
||||
{
|
||||
CURL_TRC_CF(data, cf, "close");
|
||||
@ -484,7 +526,7 @@ struct Curl_cftype Curl_cft_http_connect = {
|
||||
cf_hc_data_pending,
|
||||
Curl_cf_def_send,
|
||||
Curl_cf_def_recv,
|
||||
Curl_cf_def_cntrl,
|
||||
cf_hc_cntrl,
|
||||
Curl_cf_def_conn_is_alive,
|
||||
Curl_cf_def_conn_keep_alive,
|
||||
cf_hc_query,
|
||||
|
@ -419,6 +419,13 @@ CURLcode Curl_conn_connect(struct Curl_easy *data,
|
||||
|
||||
*done = cf->connected;
|
||||
if(!*done) {
|
||||
if(Curl_conn_needs_flush(data, sockindex)) {
|
||||
DEBUGF(infof(data, "Curl_conn_connect(index=%d), flush", sockindex));
|
||||
result = Curl_conn_flush(data, sockindex);
|
||||
if(result && (result != CURLE_AGAIN))
|
||||
return result;
|
||||
}
|
||||
|
||||
result = cf->cft->do_connect(cf, data, blocking, done);
|
||||
if(!result && *done) {
|
||||
Curl_conn_ev_update_info(data, data->conn);
|
||||
@ -504,17 +511,21 @@ bool Curl_conn_data_pending(struct Curl_easy *data, int sockindex)
|
||||
return FALSE;
|
||||
}
|
||||
|
||||
bool Curl_conn_needs_flush(struct Curl_easy *data, int sockindex)
|
||||
bool Curl_conn_cf_needs_flush(struct Curl_cfilter *cf,
|
||||
struct Curl_easy *data)
|
||||
{
|
||||
CURLcode result;
|
||||
int pending = FALSE;
|
||||
|
||||
struct Curl_cfilter *cf = data->conn->cfilter[sockindex];
|
||||
result = cf? cf->cft->query(cf, data, CF_QUERY_NEED_FLUSH,
|
||||
&pending, NULL) : CURLE_UNKNOWN_OPTION;
|
||||
return (result || pending == FALSE)? FALSE : TRUE;
|
||||
}
|
||||
|
||||
bool Curl_conn_needs_flush(struct Curl_easy *data, int sockindex)
|
||||
{
|
||||
return Curl_conn_cf_needs_flush(data->conn->cfilter[sockindex], data);
|
||||
}
|
||||
|
||||
void Curl_conn_cf_adjust_pollset(struct Curl_cfilter *cf,
|
||||
struct Curl_easy *data,
|
||||
struct easy_pollset *ps)
|
||||
|
@ -344,6 +344,8 @@ bool Curl_conn_cf_is_ssl(struct Curl_cfilter *cf);
|
||||
curl_socket_t Curl_conn_cf_get_socket(struct Curl_cfilter *cf,
|
||||
struct Curl_easy *data);
|
||||
|
||||
bool Curl_conn_cf_needs_flush(struct Curl_cfilter *cf,
|
||||
struct Curl_easy *data);
|
||||
|
||||
#define CURL_CF_SSL_DEFAULT -1
|
||||
#define CURL_CF_SSL_DISABLE 0
|
||||
|
301
lib/http2.c
301
lib/http2.c
@ -191,8 +191,6 @@ struct h2_stream_ctx {
|
||||
struct h1_req_parser h1; /* parsing the request */
|
||||
struct dynhds resp_trailers; /* response trailer fields */
|
||||
size_t resp_hds_len; /* amount of response header bytes in recvbuf */
|
||||
size_t upload_blocked_len;
|
||||
curl_off_t upload_left; /* number of request bytes left to upload */
|
||||
curl_off_t nrcvd_data; /* number of DATA bytes received */
|
||||
|
||||
char **push_headers; /* allocated array */
|
||||
@ -211,6 +209,8 @@ struct h2_stream_ctx {
|
||||
BIT(bodystarted);
|
||||
BIT(send_closed); /* transfer is done sending, we might have still
|
||||
buffered data in stream->sendbuf to upload. */
|
||||
BIT(body_eos); /* the complete body has been added to `sendbuf` and
|
||||
* is being/has been processed from there. */
|
||||
};
|
||||
|
||||
#define H2_STREAM_CTX(ctx,data) ((struct h2_stream_ctx *)(\
|
||||
@ -237,7 +237,6 @@ static struct h2_stream_ctx *h2_stream_ctx_create(struct cf_h2_ctx *ctx)
|
||||
stream->close_handled = FALSE;
|
||||
stream->error = NGHTTP2_NO_ERROR;
|
||||
stream->local_window_size = H2_STREAM_WINDOW_SIZE_INITIAL;
|
||||
stream->upload_left = 0;
|
||||
stream->nrcvd_data = 0;
|
||||
return stream;
|
||||
}
|
||||
@ -349,7 +348,7 @@ static void drain_stream(struct Curl_cfilter *cf,
|
||||
(void)cf;
|
||||
bits = CURL_CSELECT_IN;
|
||||
if(!stream->send_closed &&
|
||||
(stream->upload_left || stream->upload_blocked_len))
|
||||
(!stream->body_eos || !Curl_bufq_is_empty(&stream->sendbuf)))
|
||||
bits |= CURL_CSELECT_OUT;
|
||||
if(data->state.select_bits != bits) {
|
||||
CURL_TRC_CF(data, cf, "[%d] DRAIN select_bits=%x",
|
||||
@ -1169,9 +1168,16 @@ static CURLcode on_stream_frame(struct Curl_cfilter *cf,
|
||||
drain_stream(cf, data, stream);
|
||||
break;
|
||||
case NGHTTP2_WINDOW_UPDATE:
|
||||
if(CURL_WANT_SEND(data)) {
|
||||
if(CURL_WANT_SEND(data) && Curl_bufq_is_empty(&stream->sendbuf)) {
|
||||
/* need more data, force processing of transfer */
|
||||
drain_stream(cf, data, stream);
|
||||
}
|
||||
else if(!Curl_bufq_is_empty(&stream->sendbuf)) {
|
||||
/* resume the potentially suspended stream */
|
||||
rv = nghttp2_session_resume_data(ctx->h2, stream->id);
|
||||
if(nghttp2_is_fatal(rv))
|
||||
return CURLE_SEND_ERROR;
|
||||
}
|
||||
break;
|
||||
default:
|
||||
break;
|
||||
@ -1650,22 +1656,21 @@ static ssize_t req_body_read_callback(nghttp2_session *session,
|
||||
(void)source;
|
||||
|
||||
(void)cf;
|
||||
if(stream_id) {
|
||||
/* get the stream from the hash based on Stream ID, stream ID zero is for
|
||||
connection-oriented stuff */
|
||||
data_s = nghttp2_session_get_stream_user_data(session, stream_id);
|
||||
if(!data_s)
|
||||
/* Receiving a Stream ID not in the hash should not happen, this is an
|
||||
internal error more than anything else! */
|
||||
return NGHTTP2_ERR_CALLBACK_FAILURE;
|
||||
|
||||
stream = H2_STREAM_CTX(ctx, data_s);
|
||||
if(!stream)
|
||||
return NGHTTP2_ERR_CALLBACK_FAILURE;
|
||||
}
|
||||
else
|
||||
if(!stream_id)
|
||||
return NGHTTP2_ERR_INVALID_ARGUMENT;
|
||||
|
||||
/* get the stream from the hash based on Stream ID, stream ID zero is for
|
||||
connection-oriented stuff */
|
||||
data_s = nghttp2_session_get_stream_user_data(session, stream_id);
|
||||
if(!data_s)
|
||||
/* Receiving a Stream ID not in the hash should not happen, this is an
|
||||
internal error more than anything else! */
|
||||
return NGHTTP2_ERR_CALLBACK_FAILURE;
|
||||
|
||||
stream = H2_STREAM_CTX(ctx, data_s);
|
||||
if(!stream)
|
||||
return NGHTTP2_ERR_CALLBACK_FAILURE;
|
||||
|
||||
nread = Curl_bufq_read(&stream->sendbuf, buf, length, &result);
|
||||
if(nread < 0) {
|
||||
if(result != CURLE_AGAIN)
|
||||
@ -1673,17 +1678,13 @@ static ssize_t req_body_read_callback(nghttp2_session *session,
|
||||
nread = 0;
|
||||
}
|
||||
|
||||
if(nread > 0 && stream->upload_left != -1)
|
||||
stream->upload_left -= nread;
|
||||
CURL_TRC_CF(data_s, cf, "[%d] req_body_read(len=%zu) eos=%d -> %zd, %d",
|
||||
stream_id, length, stream->body_eos, nread, result);
|
||||
|
||||
CURL_TRC_CF(data_s, cf, "[%d] req_body_read(len=%zu) left=%"
|
||||
CURL_FORMAT_CURL_OFF_T " -> %zd, %d",
|
||||
stream_id, length, stream->upload_left, nread, result);
|
||||
|
||||
if(stream->upload_left == 0)
|
||||
*data_flags = NGHTTP2_DATA_FLAG_EOF;
|
||||
else if(nread == 0)
|
||||
if(nread == 0)
|
||||
return NGHTTP2_ERR_DEFERRED;
|
||||
if(stream->body_eos && Curl_bufq_is_empty(&stream->sendbuf))
|
||||
*data_flags = NGHTTP2_DATA_FLAG_EOF;
|
||||
|
||||
return nread;
|
||||
}
|
||||
@ -1754,9 +1755,11 @@ static CURLcode http2_data_done_send(struct Curl_cfilter *cf,
|
||||
CURL_TRC_CF(data, cf, "[%d] data done send", stream->id);
|
||||
if(!stream->send_closed) {
|
||||
stream->send_closed = TRUE;
|
||||
if(stream->upload_left) {
|
||||
if(!Curl_bufq_is_empty(&stream->sendbuf)) {
|
||||
/* TODO: if we had not seen EOS on send(), it seems the request
|
||||
* is now aborted? */
|
||||
/* we now know that everything that is buffered is all there is. */
|
||||
stream->upload_left = Curl_bufq_len(&stream->sendbuf);
|
||||
stream->body_eos = TRUE;
|
||||
/* resume sending here to trigger the callback to get called again so
|
||||
that it can signal EOF to nghttp2 */
|
||||
(void)nghttp2_session_resume_data(ctx->h2, stream->id);
|
||||
@ -2086,11 +2089,11 @@ static ssize_t cf_h2_recv(struct Curl_cfilter *cf, struct Curl_easy *data,
|
||||
out:
|
||||
result = h2_progress_egress(cf, data);
|
||||
if(result == CURLE_AGAIN) {
|
||||
/* pending data to send, need to be called again. Ideally, we would
|
||||
* monitor the socket for POLLOUT, but we might not be in SENDING
|
||||
* transfer state any longer and are unable to make this happen.
|
||||
*/
|
||||
drain_stream(cf, data, stream);
|
||||
/* pending data to send, need to be called again. Ideally, we
|
||||
* monitor the socket for POLLOUT, but when not SENDING
|
||||
* any more, we force processing of the transfer. */
|
||||
if(!CURL_WANT_SEND(data))
|
||||
drain_stream(cf, data, stream);
|
||||
}
|
||||
else if(result) {
|
||||
*err = result;
|
||||
@ -2110,10 +2113,57 @@ out:
|
||||
return nread;
|
||||
}
|
||||
|
||||
static ssize_t cf_h2_body_send(struct Curl_cfilter *cf,
|
||||
struct Curl_easy *data,
|
||||
struct h2_stream_ctx *stream,
|
||||
const void *buf, size_t blen, bool eos,
|
||||
CURLcode *err)
|
||||
{
|
||||
struct cf_h2_ctx *ctx = cf->ctx;
|
||||
ssize_t nwritten;
|
||||
|
||||
if(stream->closed) {
|
||||
if(stream->resp_hds_complete) {
|
||||
/* Server decided to close the stream after having sent us a final
|
||||
* response. This is valid if it is not interested in the request
|
||||
* body. This happens on 30x or 40x responses.
|
||||
* We silently discard the data sent, since this is not a transport
|
||||
* error situation. */
|
||||
CURL_TRC_CF(data, cf, "[%d] discarding data"
|
||||
"on closed stream with response", stream->id);
|
||||
if(eos)
|
||||
stream->body_eos = TRUE;
|
||||
*err = CURLE_OK;
|
||||
return (ssize_t)blen;
|
||||
}
|
||||
/* Server closed before we got a response, this is an error */
|
||||
infof(data, "stream %u closed", stream->id);
|
||||
*err = CURLE_SEND_ERROR;
|
||||
return -1;
|
||||
}
|
||||
|
||||
nwritten = Curl_bufq_write(&stream->sendbuf, buf, blen, err);
|
||||
if(nwritten < 0)
|
||||
return -1;
|
||||
|
||||
if(eos && (blen == (size_t)nwritten))
|
||||
stream->body_eos = TRUE;
|
||||
|
||||
if(eos || !Curl_bufq_is_empty(&stream->sendbuf)) {
|
||||
/* resume the potentially suspended stream */
|
||||
int rv = nghttp2_session_resume_data(ctx->h2, stream->id);
|
||||
if(nghttp2_is_fatal(rv)) {
|
||||
*err = CURLE_SEND_ERROR;
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
return nwritten;
|
||||
}
|
||||
|
||||
static ssize_t h2_submit(struct h2_stream_ctx **pstream,
|
||||
struct Curl_cfilter *cf, struct Curl_easy *data,
|
||||
const void *buf, size_t len,
|
||||
size_t *phdslen, CURLcode *err)
|
||||
bool eos, CURLcode *err)
|
||||
{
|
||||
struct cf_h2_ctx *ctx = cf->ctx;
|
||||
struct h2_stream_ctx *stream = NULL;
|
||||
@ -2126,7 +2176,6 @@ static ssize_t h2_submit(struct h2_stream_ctx **pstream,
|
||||
nghttp2_priority_spec pri_spec;
|
||||
ssize_t nwritten;
|
||||
|
||||
*phdslen = 0;
|
||||
Curl_dynhds_init(&h2_headers, 0, DYN_HTTP_REQUEST);
|
||||
|
||||
*err = http2_data_setup(cf, data, &stream);
|
||||
@ -2138,7 +2187,6 @@ static ssize_t h2_submit(struct h2_stream_ctx **pstream,
|
||||
nwritten = Curl_h1_req_parse_read(&stream->h1, buf, len, NULL, 0, err);
|
||||
if(nwritten < 0)
|
||||
goto out;
|
||||
*phdslen = (size_t)nwritten;
|
||||
if(!stream->h1.done) {
|
||||
/* need more data */
|
||||
goto out;
|
||||
@ -2169,19 +2217,12 @@ static ssize_t h2_submit(struct h2_stream_ctx **pstream,
|
||||
case HTTPREQ_POST_FORM:
|
||||
case HTTPREQ_POST_MIME:
|
||||
case HTTPREQ_PUT:
|
||||
if(data->state.infilesize != -1)
|
||||
stream->upload_left = data->state.infilesize;
|
||||
else
|
||||
/* data sending without specifying the data amount up front */
|
||||
stream->upload_left = -1; /* unknown */
|
||||
|
||||
data_prd.read_callback = req_body_read_callback;
|
||||
data_prd.source.ptr = NULL;
|
||||
stream_id = nghttp2_submit_request(ctx->h2, &pri_spec, nva, nheader,
|
||||
&data_prd, data);
|
||||
break;
|
||||
default:
|
||||
stream->upload_left = 0; /* no request body */
|
||||
stream_id = nghttp2_submit_request(ctx->h2, &pri_spec, nva, nheader,
|
||||
NULL, data);
|
||||
}
|
||||
@ -2220,15 +2261,17 @@ static ssize_t h2_submit(struct h2_stream_ctx **pstream,
|
||||
body = (const char *)buf + nwritten;
|
||||
bodylen = len - nwritten;
|
||||
|
||||
if(bodylen) {
|
||||
/* We have request body to send in DATA frame */
|
||||
ssize_t n = Curl_bufq_write(&stream->sendbuf, body, bodylen, err);
|
||||
if(n < 0) {
|
||||
if(bodylen || eos) {
|
||||
ssize_t n = cf_h2_body_send(cf, data, stream, body, bodylen, eos, err);
|
||||
if(n >= 0)
|
||||
nwritten += n;
|
||||
else if(*err == CURLE_AGAIN)
|
||||
*err = CURLE_OK;
|
||||
else if(*err != CURLE_AGAIN) {
|
||||
*err = CURLE_SEND_ERROR;
|
||||
nwritten = -1;
|
||||
goto out;
|
||||
}
|
||||
nwritten += n;
|
||||
}
|
||||
|
||||
out:
|
||||
@ -2247,135 +2290,69 @@ static ssize_t cf_h2_send(struct Curl_cfilter *cf, struct Curl_easy *data,
|
||||
struct cf_h2_ctx *ctx = cf->ctx;
|
||||
struct h2_stream_ctx *stream = H2_STREAM_CTX(ctx, data);
|
||||
struct cf_call_data save;
|
||||
int rv;
|
||||
ssize_t nwritten;
|
||||
size_t hdslen = 0;
|
||||
CURLcode result;
|
||||
int blocked = 0, was_blocked = 0;
|
||||
|
||||
CF_DATA_SAVE(save, cf, data);
|
||||
|
||||
(void)eos; /* TODO: use for stream EOF */
|
||||
if(stream && stream->id != -1) {
|
||||
if(stream->upload_blocked_len) {
|
||||
/* the data in `buf` has already been submitted or added to the
|
||||
* buffers, but have been EAGAINed on the last invocation. */
|
||||
/* TODO: this assertion triggers in OSSFuzz runs and it is not
|
||||
* clear why. Disable for now to let OSSFuzz continue its tests. */
|
||||
DEBUGASSERT(len >= stream->upload_blocked_len);
|
||||
if(len < stream->upload_blocked_len) {
|
||||
/* Did we get called again with a smaller `len`? This should not
|
||||
* happen. We are not prepared to handle that. */
|
||||
failf(data, "HTTP/2 send again with decreased length (%zd vs %zd)",
|
||||
len, stream->upload_blocked_len);
|
||||
*err = CURLE_HTTP2;
|
||||
nwritten = -1;
|
||||
goto out;
|
||||
}
|
||||
nwritten = (ssize_t)stream->upload_blocked_len;
|
||||
stream->upload_blocked_len = 0;
|
||||
was_blocked = 1;
|
||||
}
|
||||
else if(stream->closed) {
|
||||
if(stream->resp_hds_complete) {
|
||||
/* Server decided to close the stream after having sent us a findl
|
||||
* response. This is valid if it is not interested in the request
|
||||
* body. This happens on 30x or 40x responses.
|
||||
* We silently discard the data sent, since this is not a transport
|
||||
* error situation. */
|
||||
CURL_TRC_CF(data, cf, "[%d] discarding data"
|
||||
"on closed stream with response", stream->id);
|
||||
*err = CURLE_OK;
|
||||
nwritten = (ssize_t)len;
|
||||
goto out;
|
||||
}
|
||||
infof(data, "stream %u closed", stream->id);
|
||||
*err = CURLE_SEND_ERROR;
|
||||
nwritten = -1;
|
||||
goto out;
|
||||
}
|
||||
else {
|
||||
/* If stream_id != -1, we have dispatched request HEADERS and
|
||||
* optionally request body, and now are going to send or sending
|
||||
* more request body in DATA frame */
|
||||
nwritten = Curl_bufq_write(&stream->sendbuf, buf, len, err);
|
||||
if(nwritten < 0 && *err != CURLE_AGAIN)
|
||||
goto out;
|
||||
}
|
||||
|
||||
if(!Curl_bufq_is_empty(&stream->sendbuf)) {
|
||||
/* req body data is buffered, resume the potentially suspended stream */
|
||||
rv = nghttp2_session_resume_data(ctx->h2, stream->id);
|
||||
if(nghttp2_is_fatal(rv)) {
|
||||
*err = CURLE_SEND_ERROR;
|
||||
nwritten = -1;
|
||||
goto out;
|
||||
}
|
||||
}
|
||||
}
|
||||
else {
|
||||
nwritten = h2_submit(&stream, cf, data, buf, len, &hdslen, err);
|
||||
if(!stream || stream->id == -1) {
|
||||
nwritten = h2_submit(&stream, cf, data, buf, len, eos, err);
|
||||
if(nwritten < 0) {
|
||||
goto out;
|
||||
}
|
||||
DEBUGASSERT(stream);
|
||||
DEBUGASSERT(hdslen <= (size_t)nwritten);
|
||||
}
|
||||
else if(stream->body_eos) {
|
||||
/* We already wrote this, but CURLE_AGAINed the call due to not
|
||||
* being able to flush stream->sendbuf. Make a 0-length write
|
||||
* to trigger flushing again.
|
||||
* If this works, we report to have written `len` bytes. */
|
||||
DEBUGASSERT(eos);
|
||||
nwritten = cf_h2_body_send(cf, data, stream, buf, 0, eos, err);
|
||||
CURL_TRC_CF(data, cf, "[%d] cf_body_send last CHUNK -> %zd, %d, eos=%d",
|
||||
stream->id, nwritten, *err, eos);
|
||||
if(nwritten < 0) {
|
||||
goto out;
|
||||
}
|
||||
nwritten = len;
|
||||
}
|
||||
else {
|
||||
nwritten = cf_h2_body_send(cf, data, stream, buf, len, eos, err);
|
||||
CURL_TRC_CF(data, cf, "[%d] cf_body_send(len=%zu) -> %zd, %d, eos=%d",
|
||||
stream->id, len, nwritten, *err, eos);
|
||||
}
|
||||
|
||||
/* Call the nghttp2 send loop and flush to write ALL buffered data,
|
||||
* headers and/or request body completely out to the network */
|
||||
result = h2_progress_egress(cf, data);
|
||||
|
||||
/* if the stream has been closed in egress handling (nghttp2 does that
|
||||
* when it does not like the headers, for example */
|
||||
if(stream && stream->closed && !was_blocked) {
|
||||
if(stream && stream->closed) {
|
||||
infof(data, "stream %u closed", stream->id);
|
||||
*err = CURLE_SEND_ERROR;
|
||||
nwritten = -1;
|
||||
goto out;
|
||||
}
|
||||
else if(result == CURLE_AGAIN) {
|
||||
blocked = 1;
|
||||
}
|
||||
else if(result) {
|
||||
else if(result && (result != CURLE_AGAIN)) {
|
||||
*err = result;
|
||||
nwritten = -1;
|
||||
goto out;
|
||||
}
|
||||
else if(stream && !Curl_bufq_is_empty(&stream->sendbuf)) {
|
||||
/* although we wrote everything that nghttp2 wants to send now,
|
||||
* there is data left in our stream send buffer unwritten. This may
|
||||
* be due to the stream's HTTP/2 flow window being exhausted. */
|
||||
blocked = 1;
|
||||
else if(stream && stream->body_eos &&
|
||||
(!Curl_bufq_is_empty(&stream->sendbuf) ||
|
||||
!Curl_bufq_is_empty(&ctx->outbufq))) {
|
||||
/* We added the last send chunk to stream->sendbuf, but were unable
|
||||
* to send it all off. Either the socket EAGAINed or the HTTP/2 flow
|
||||
* control prevents it. This should be a call with `eos` set and
|
||||
* we CURLE_AGAIN it until we flushed everything. */
|
||||
CURL_TRC_CF(data, cf, "[%d] could not flush last send chunk -> EAGAIN",
|
||||
stream->id);
|
||||
*err = CURLE_AGAIN;
|
||||
nwritten = -1;
|
||||
}
|
||||
|
||||
if(stream && blocked && nwritten > 0) {
|
||||
/* Unable to send all data, due to connection blocked or H2 window
|
||||
* exhaustion. Data is left in our stream buffer, or nghttp2's internal
|
||||
* frame buffer or our network out buffer. */
|
||||
size_t rwin = (size_t)nghttp2_session_get_stream_remote_window_size(
|
||||
ctx->h2, stream->id);
|
||||
/* At the start of a stream, we are called with request headers
|
||||
* and, possibly, parts of the body. Later, only body data.
|
||||
* If we cannot send pure body data, we EAGAIN. If there had been
|
||||
* header, we return that *they* have been written and remember the
|
||||
* block on the data length only. */
|
||||
stream->upload_blocked_len = ((size_t)nwritten) - hdslen;
|
||||
CURL_TRC_CF(data, cf, "[%d] cf_send(len=%zu) BLOCK: win %u/%zu "
|
||||
"hds_len=%zu blocked_len=%zu",
|
||||
stream->id, len,
|
||||
nghttp2_session_get_remote_window_size(ctx->h2), rwin,
|
||||
hdslen, stream->upload_blocked_len);
|
||||
if(hdslen) {
|
||||
*err = CURLE_OK;
|
||||
nwritten = hdslen;
|
||||
}
|
||||
else {
|
||||
*err = CURLE_AGAIN;
|
||||
nwritten = -1;
|
||||
goto out;
|
||||
}
|
||||
}
|
||||
else if(should_close_session(ctx)) {
|
||||
if(should_close_session(ctx)) {
|
||||
/* nghttp2 thinks this session is done. If the stream has not been
|
||||
* closed, this is an error state for out transfer */
|
||||
if(stream->closed) {
|
||||
@ -2391,11 +2368,10 @@ static ssize_t cf_h2_send(struct Curl_cfilter *cf, struct Curl_easy *data,
|
||||
out:
|
||||
if(stream) {
|
||||
CURL_TRC_CF(data, cf, "[%d] cf_send(len=%zu) -> %zd, %d, "
|
||||
"upload_left=%" CURL_FORMAT_CURL_OFF_T ", "
|
||||
"h2 windows %d-%d (stream-conn), "
|
||||
"eos=%d, h2 windows %d-%d (stream-conn), "
|
||||
"buffers %zu-%zu (stream-conn)",
|
||||
stream->id, len, nwritten, *err,
|
||||
stream->upload_left,
|
||||
stream->body_eos,
|
||||
nghttp2_session_get_stream_remote_window_size(
|
||||
ctx->h2, stream->id),
|
||||
nghttp2_session_get_remote_window_size(ctx->h2),
|
||||
@ -2480,7 +2456,8 @@ static void cf_h2_adjust_pollset(struct Curl_cfilter *cf,
|
||||
stream->id);
|
||||
want_recv = (want_recv || c_exhaust || s_exhaust);
|
||||
want_send = (!s_exhaust && want_send) ||
|
||||
(!c_exhaust && nghttp2_session_want_write(ctx->h2));
|
||||
(!c_exhaust && nghttp2_session_want_write(ctx->h2)) ||
|
||||
!Curl_bufq_is_empty(&ctx->outbufq);
|
||||
|
||||
Curl_pollset_set(data, ps, sock, want_recv, want_send);
|
||||
CF_DATA_RESTORE(cf, save);
|
||||
@ -2488,7 +2465,8 @@ static void cf_h2_adjust_pollset(struct Curl_cfilter *cf,
|
||||
else if(ctx->sent_goaway && !cf->shutdown) {
|
||||
/* shutdown in progress */
|
||||
CF_DATA_SAVE(save, cf, data);
|
||||
want_send = nghttp2_session_want_write(ctx->h2);
|
||||
want_send = nghttp2_session_want_write(ctx->h2) ||
|
||||
!Curl_bufq_is_empty(&ctx->outbufq);
|
||||
want_recv = nghttp2_session_want_read(ctx->h2);
|
||||
Curl_pollset_set(data, ps, sock, want_recv, want_send);
|
||||
CF_DATA_RESTORE(cf, save);
|
||||
@ -2602,14 +2580,19 @@ static CURLcode cf_h2_shutdown(struct Curl_cfilter *cf,
|
||||
}
|
||||
/* GOAWAY submitted, process egress and ingress until nghttp2 is done. */
|
||||
result = CURLE_OK;
|
||||
if(nghttp2_session_want_write(ctx->h2))
|
||||
if(nghttp2_session_want_write(ctx->h2) ||
|
||||
!Curl_bufq_is_empty(&ctx->outbufq))
|
||||
result = h2_progress_egress(cf, data);
|
||||
if(!result && nghttp2_session_want_read(ctx->h2))
|
||||
result = h2_progress_ingress(cf, data, 0);
|
||||
|
||||
if(result == CURLE_AGAIN)
|
||||
result = CURLE_OK;
|
||||
|
||||
*done = (ctx->conn_closed ||
|
||||
(!result && !nghttp2_session_want_write(ctx->h2) &&
|
||||
!nghttp2_session_want_read(ctx->h2)));
|
||||
!nghttp2_session_want_read(ctx->h2) &&
|
||||
Curl_bufq_is_empty(&ctx->outbufq)));
|
||||
|
||||
out:
|
||||
CF_DATA_RESTORE(cf, save);
|
||||
|
@ -184,6 +184,30 @@ static bool xfer_recv_shutdown_started(struct Curl_easy *data)
|
||||
return Curl_shutdown_started(data, sockindex);
|
||||
}
|
||||
|
||||
CURLcode Curl_xfer_send_shutdown(struct Curl_easy *data, bool *done)
|
||||
{
|
||||
int sockindex;
|
||||
|
||||
if(!data || !data->conn)
|
||||
return CURLE_FAILED_INIT;
|
||||
if(data->conn->writesockfd == CURL_SOCKET_BAD)
|
||||
return CURLE_FAILED_INIT;
|
||||
sockindex = (data->conn->writesockfd == data->conn->sock[SECONDARYSOCKET]);
|
||||
return Curl_conn_shutdown(data, sockindex, done);
|
||||
}
|
||||
|
||||
static bool xfer_send_shutdown_started(struct Curl_easy *data)
|
||||
{
|
||||
int sockindex;
|
||||
|
||||
if(!data || !data->conn)
|
||||
return CURLE_FAILED_INIT;
|
||||
if(data->conn->writesockfd == CURL_SOCKET_BAD)
|
||||
return CURLE_FAILED_INIT;
|
||||
sockindex = (data->conn->writesockfd == data->conn->sock[SECONDARYSOCKET]);
|
||||
return Curl_shutdown_started(data, sockindex);
|
||||
}
|
||||
|
||||
/**
|
||||
* Receive raw response data for the transfer.
|
||||
* @param data the transfer
|
||||
@ -420,6 +444,16 @@ CURLcode Curl_readwrite(struct Curl_easy *data)
|
||||
select_bits = data->state.select_bits;
|
||||
data->state.select_bits = 0;
|
||||
}
|
||||
else if(((k->keepon & KEEP_RECVBITS) == KEEP_RECV) &&
|
||||
xfer_recv_shutdown_started(data)) {
|
||||
DEBUGF(infof(data, "readwrite, recv for finishing shutdown"));
|
||||
select_bits = CURL_CSELECT_IN;
|
||||
}
|
||||
else if(((k->keepon & KEEP_SENDBITS) == KEEP_SEND) &&
|
||||
xfer_send_shutdown_started(data)) {
|
||||
DEBUGF(infof(data, "readwrite, send for finishing shutdown"));
|
||||
select_bits = CURL_CSELECT_OUT;
|
||||
}
|
||||
else {
|
||||
curl_socket_t fd_read;
|
||||
curl_socket_t fd_write;
|
||||
@ -1303,18 +1337,6 @@ CURLcode Curl_xfer_send_close(struct Curl_easy *data)
|
||||
return CURLE_OK;
|
||||
}
|
||||
|
||||
CURLcode Curl_xfer_send_shutdown(struct Curl_easy *data, bool *done)
|
||||
{
|
||||
int sockindex;
|
||||
|
||||
if(!data || !data->conn)
|
||||
return CURLE_FAILED_INIT;
|
||||
if(data->conn->writesockfd == CURL_SOCKET_BAD)
|
||||
return CURLE_FAILED_INIT;
|
||||
sockindex = (data->conn->writesockfd == data->conn->sock[SECONDARYSOCKET]);
|
||||
return Curl_conn_shutdown(data, sockindex, done);
|
||||
}
|
||||
|
||||
bool Curl_xfer_is_blocked(struct Curl_easy *data)
|
||||
{
|
||||
bool want_send = ((data)->req.keepon & KEEP_SEND);
|
||||
|
@ -1828,6 +1828,7 @@ static CURLcode gtls_shutdown(struct Curl_cfilter *cf,
|
||||
CURL_TRC_CF(data, cf, "SSL shutdown, gnutls_bye EAGAIN");
|
||||
connssl->io_need = gnutls_record_get_direction(backend->gtls.session)?
|
||||
CURL_SSL_IO_NEED_SEND : CURL_SSL_IO_NEED_RECV;
|
||||
backend->gtls.sent_shutdown = FALSE;
|
||||
result = CURLE_OK;
|
||||
goto out;
|
||||
}
|
||||
|
@ -182,7 +182,16 @@ static int err(void)
|
||||
exit(2);
|
||||
}
|
||||
|
||||
|
||||
static void usage(const char *msg)
|
||||
{
|
||||
if(msg)
|
||||
fprintf(stderr, "%s\n", msg);
|
||||
fprintf(stderr,
|
||||
"usage: [options] url\n"
|
||||
" upload and pause, options:\n"
|
||||
" -V http_version (http/1.1, h2, h3) http version to use\n"
|
||||
);
|
||||
}
|
||||
|
||||
int main(int argc, char *argv[])
|
||||
{
|
||||
@ -192,12 +201,37 @@ int main(int argc, char *argv[])
|
||||
struct curl_slist *resolve = NULL;
|
||||
char resolve_buf[1024];
|
||||
char *url, *host = NULL, *port = NULL;
|
||||
int http_version = CURL_HTTP_VERSION_1_1;
|
||||
int ch;
|
||||
|
||||
if(argc != 2) {
|
||||
fprintf(stderr, "ERROR: need URL as argument\n");
|
||||
while((ch = getopt(argc, argv, "V:")) != -1) {
|
||||
switch(ch) {
|
||||
case 'V': {
|
||||
if(!strcmp("http/1.1", optarg))
|
||||
http_version = CURL_HTTP_VERSION_1_1;
|
||||
else if(!strcmp("h2", optarg))
|
||||
http_version = CURL_HTTP_VERSION_2_0;
|
||||
else if(!strcmp("h3", optarg))
|
||||
http_version = CURL_HTTP_VERSION_3ONLY;
|
||||
else {
|
||||
usage("invalid http version");
|
||||
return 1;
|
||||
}
|
||||
break;
|
||||
}
|
||||
default:
|
||||
usage("invalid option");
|
||||
return 1;
|
||||
}
|
||||
}
|
||||
argc -= optind;
|
||||
argv += optind;
|
||||
|
||||
if(argc != 1) {
|
||||
usage("not enough arguments");
|
||||
return 2;
|
||||
}
|
||||
url = argv[1];
|
||||
url = argv[0];
|
||||
|
||||
curl_global_init(CURL_GLOBAL_DEFAULT);
|
||||
curl_global_trace("ids,time");
|
||||
@ -247,6 +281,9 @@ int main(int argc, char *argv[])
|
||||
curl_easy_setopt(curl, CURLOPT_CUSTOMREQUEST, "POST");
|
||||
curl_easy_setopt(curl, CURLOPT_UPLOAD, 1L);
|
||||
|
||||
curl_easy_setopt(curl, CURLOPT_SSL_VERIFYPEER, 0L);
|
||||
curl_easy_setopt(curl, CURLOPT_SSL_VERIFYHOST, 0L);
|
||||
|
||||
if(curl_easy_setopt(curl, CURLOPT_VERBOSE, 1L) != CURLE_OK ||
|
||||
curl_easy_setopt(curl, CURLOPT_DEBUGFUNCTION, debug_cb)
|
||||
!= CURLE_OK ||
|
||||
@ -254,6 +291,8 @@ int main(int argc, char *argv[])
|
||||
err();
|
||||
|
||||
curl_easy_setopt(curl, CURLOPT_URL, url);
|
||||
curl_easy_setopt(curl, CURLOPT_HTTP_VERSION, http_version);
|
||||
|
||||
rc = curl_easy_perform(curl);
|
||||
|
||||
if(curl) {
|
||||
|
@ -475,9 +475,14 @@ class TestUpload:
|
||||
client = LocalClient(name='upload-pausing', env=env, timeout=60)
|
||||
if not client.exists():
|
||||
pytest.skip(f'example client not built: {client.name}')
|
||||
url = f'http://{env.domain1}:{env.http_port}/curltest/echo?id=[0-0]&die_after=0'
|
||||
r = client.run([url])
|
||||
r.check_exit_code(18) # PARTIAL_FILE
|
||||
url = f'https://{env.authority_for(env.domain1, proto)}/curltest/echo?id=[0-0]&die_after=0'
|
||||
r = client.run(['-V', proto, url])
|
||||
exp_code = 18 # PARTIAL_FILE
|
||||
if proto == 'h2':
|
||||
exp_code = 92 # CURLE_HTTP2_STREAM
|
||||
elif proto == 'h3':
|
||||
exp_code = 95 # CURLE_HTTP3
|
||||
r.check_exit_code(exp_code)
|
||||
|
||||
# upload data, pause, let connection die without any response at all
|
||||
@pytest.mark.parametrize("proto", ['http/1.1', 'h2', 'h3'])
|
||||
@ -489,9 +494,12 @@ class TestUpload:
|
||||
client = LocalClient(name='upload-pausing', env=env, timeout=60)
|
||||
if not client.exists():
|
||||
pytest.skip(f'example client not built: {client.name}')
|
||||
url = f'http://{env.domain1}:{env.http_port}/curltest/echo?id=[0-0]&just_die=1'
|
||||
r = client.run([url])
|
||||
r.check_exit_code(52) # GOT_NOTHING
|
||||
url = f'https://{env.authority_for(env.domain1, proto)}/curltest/echo?id=[0-0]&just_die=1'
|
||||
r = client.run(['-V', proto, url])
|
||||
exp_code = 52 # GOT_NOTHING
|
||||
if proto == 'h2' or proto == 'h3':
|
||||
exp_code = 0 # we get a 500 from the server
|
||||
r.check_exit_code(exp_code) # GOT_NOTHING
|
||||
|
||||
# upload data, pause, let connection die after 100 continue
|
||||
@pytest.mark.parametrize("proto", ['http/1.1', 'h2', 'h3'])
|
||||
@ -503,9 +511,12 @@ class TestUpload:
|
||||
client = LocalClient(name='upload-pausing', env=env, timeout=60)
|
||||
if not client.exists():
|
||||
pytest.skip(f'example client not built: {client.name}')
|
||||
url = f'http://{env.domain1}:{env.http_port}/curltest/echo?id=[0-0]&die_after_100=1'
|
||||
r = client.run([url])
|
||||
r.check_exit_code(52) # GOT_NOTHING
|
||||
url = f'https://{env.authority_for(env.domain1, proto)}/curltest/echo?id=[0-0]&die_after_100=1'
|
||||
r = client.run(['-V', proto, url])
|
||||
exp_code = 52 # GOT_NOTHING
|
||||
if proto == 'h2' or proto == 'h3':
|
||||
exp_code = 0 # we get a 500 from the server
|
||||
r.check_exit_code(exp_code) # GOT_NOTHING
|
||||
|
||||
# speed limited on put handler
|
||||
@pytest.mark.parametrize("proto", ['http/1.1', 'h2', 'h3'])
|
||||
|
@ -221,7 +221,7 @@ class TestProxy:
|
||||
indata = open(srcfile).readlines()
|
||||
for i in range(count):
|
||||
respdata = open(curl.response_file(i)).readlines()
|
||||
assert respdata == indata
|
||||
assert respdata == indata, f'resonse {i} differs'
|
||||
assert r.total_connects == 1, r.dump_logs()
|
||||
|
||||
@pytest.mark.skipif(condition=not Env.have_ssl_curl(), reason=f"curl without SSL")
|
||||
|
Loading…
Reference in New Issue
Block a user