http3/ngtcp2: upload EAGAIN handling

- refs #11389 where IDLE timeouts on upload are reported
- reword ngtcp2 expiry handling to apply to both send+recv
  calls into the filter
- EAGAIN uploads similar to the recent changes in HTTP/2, e.g.
  report success only when send data was ACKed.
- HOLD sending of EAGAINed uploads to avoid cpu busy loops
- rename internal function for consistency with HTTP/2
  implementation

Fixes #11389
Closes #11390
This commit is contained in:
Stefan Eissing 2023-06-27 12:06:21 +02:00 committed by Daniel Stenberg
parent 933aedcde8
commit 15b131352b
No known key found for this signature in database
GPG Key ID: 5CC908FDB71E12C2

View File

@ -177,6 +177,7 @@ struct h3_stream_ctx {
struct bufq sendbuf; /* h3 request body */
struct bufq recvbuf; /* h3 response body */
size_t sendbuf_len_in_flight; /* sendbuf amount "in flight" */
size_t upload_blocked_len; /* the amount written last and EGAINed */
size_t recv_buf_nonflow; /* buffered bytes, not counting for flow control */
uint64_t error3; /* HTTP/3 stream error code */
curl_off_t upload_left; /* number of request bytes left to upload */
@ -272,12 +273,12 @@ static void pktx_init(struct pkt_io_ctx *pktx,
ngtcp2_path_storage_zero(&pktx->ps);
}
static CURLcode cf_process_ingress(struct Curl_cfilter *cf,
static CURLcode cf_progress_ingress(struct Curl_cfilter *cf,
struct Curl_easy *data,
struct pkt_io_ctx *pktx);
static CURLcode cf_progress_egress(struct Curl_cfilter *cf,
struct Curl_easy *data,
struct pkt_io_ctx *pktx);
static CURLcode cf_flush_egress(struct Curl_cfilter *cf,
struct Curl_easy *data,
struct pkt_io_ctx *pktx);
static int cb_h3_acked_req_body(nghttp3_conn *conn, int64_t stream_id,
uint64_t datalen, void *user_data,
void *stream_user_data);
@ -985,6 +986,63 @@ static ngtcp2_callbacks ng_callbacks = {
NULL, /* early_data_rejected */
};
/**
* Connection maintenance like timeouts on packet ACKs etc. are done by us, not
* the OS like for TCP. POLL events on the socket therefore are not
* sufficient.
* ngtcp2 tells us when it wants to be invoked again. We handle that via
* the `Curl_expire()` mechanisms.
*/
static CURLcode check_and_set_expiry(struct Curl_cfilter *cf,
struct Curl_easy *data,
struct pkt_io_ctx *pktx)
{
struct cf_ngtcp2_ctx *ctx = cf->ctx;
struct pkt_io_ctx local_pktx;
ngtcp2_tstamp expiry;
ngtcp2_duration timeout;
if(!pktx) {
pktx_init(&local_pktx, cf, data);
pktx = &local_pktx;
}
else {
pktx->ts = timestamp();
}
expiry = ngtcp2_conn_get_expiry(ctx->qconn);
if(expiry != UINT64_MAX) {
if(expiry <= pktx->ts) {
CURLcode result;
int rv = ngtcp2_conn_handle_expiry(ctx->qconn, pktx->ts);
if(rv) {
failf(data, "ngtcp2_conn_handle_expiry returned error: %s",
ngtcp2_strerror(rv));
ngtcp2_ccerr_set_liberr(&ctx->last_error, rv, NULL, 0);
return CURLE_SEND_ERROR;
}
timeout = 0;
result = cf_progress_ingress(cf, data, pktx);
if(result)
return result;
result = cf_progress_egress(cf, data, pktx);
if(result)
return result;
/* ask again, things might have changed */
expiry = ngtcp2_conn_get_expiry(ctx->qconn);
}
if(expiry > pktx->ts) {
timeout = expiry - pktx->ts;
if(timeout % NGTCP2_MILLISECONDS) {
timeout += NGTCP2_MILLISECONDS;
}
Curl_expire(data, timeout / NGTCP2_MILLISECONDS, EXPIRE_QUIC);
}
}
return CURLE_OK;
}
static int cf_ngtcp2_get_select_socks(struct Curl_cfilter *cf,
struct Curl_easy *data,
curl_socket_t *socks)
@ -1022,7 +1080,7 @@ static void h3_drain_stream(struct Curl_cfilter *cf,
(void)cf;
bits = CURL_CSELECT_IN;
if(stream && !stream->send_closed && stream->upload_left)
if(stream && stream->upload_left && !stream->send_closed)
bits |= CURL_CSELECT_OUT;
if(data->state.dselect_bits != bits) {
data->state.dselect_bits = bits;
@ -1420,7 +1478,7 @@ static ssize_t cf_ngtcp2_recv(struct Curl_cfilter *cf, struct Curl_easy *data,
report_consumed_data(cf, data, nread);
}
if(cf_process_ingress(cf, data, &pktx)) {
if(cf_progress_ingress(cf, data, &pktx)) {
*err = CURLE_RECV_ERROR;
nread = -1;
goto out;
@ -1450,10 +1508,17 @@ static ssize_t cf_ngtcp2_recv(struct Curl_cfilter *cf, struct Curl_easy *data,
}
out:
if(cf_flush_egress(cf, data, &pktx)) {
if(cf_progress_egress(cf, data, &pktx)) {
*err = CURLE_SEND_ERROR;
nread = -1;
}
else {
CURLcode result2 = check_and_set_expiry(cf, data, &pktx);
if(result2) {
*err = result2;
nread = -1;
}
}
DEBUGF(LOG_CF(data, cf, "[h3sid=%" PRId64 "] cf_recv(len=%zu) -> %zd, %d",
stream? stream->id : -1, len, nread, *err));
CF_DATA_RESTORE(cf, save);
@ -1482,10 +1547,8 @@ static int cb_h3_acked_req_body(nghttp3_conn *conn, int64_t stream_id,
Curl_bufq_skip(&stream->sendbuf, skiplen);
stream->sendbuf_len_in_flight -= skiplen;
/* `sendbuf` *might* now have more room. If so, resume this
* possibly paused stream. And also tell our transfer engine that
* it may continue KEEP_SEND if told to PAUSE. */
if(!Curl_bufq_is_full(&stream->sendbuf)) {
/* Everything ACKed, we resume upload processing */
if(!stream->sendbuf_len_in_flight) {
int rv = nghttp3_conn_resume_stream(conn, stream_id);
if(rv) {
return NGTCP2_ERR_CALLBACK_FAILURE;
@ -1644,16 +1707,19 @@ static ssize_t h3_stream_open(struct Curl_cfilter *cf,
else
/* data sending without specifying the data amount up front */
stream->upload_left = -1; /* unknown */
reader.read_data = cb_h3_read_req_body;
preader = &reader;
break;
default:
/* there is not request body */
stream->upload_left = 0; /* no request body */
preader = NULL;
break;
}
stream->send_closed = (stream->upload_left == 0);
if(!stream->send_closed) {
reader.read_data = cb_h3_read_req_body;
preader = &reader;
}
rc = nghttp3_conn_submit_request(ctx->h3conn, stream->id,
nva, nheader, preader, data);
if(rc) {
@ -1691,6 +1757,7 @@ static ssize_t cf_ngtcp2_send(struct Curl_cfilter *cf, struct Curl_easy *data,
ssize_t sent = 0;
struct cf_call_data save;
struct pkt_io_ctx pktx;
CURLcode result;
CF_DATA_SAVE(save, cf, data);
DEBUGASSERT(cf->connected);
@ -1699,10 +1766,10 @@ static ssize_t cf_ngtcp2_send(struct Curl_cfilter *cf, struct Curl_easy *data,
pktx_init(&pktx, cf, data);
*err = CURLE_OK;
if(stream && stream->closed) {
*err = CURLE_HTTP3;
result = cf_progress_ingress(cf, data, &pktx);
if(result) {
*err = result;
sent = -1;
goto out;
}
if(!stream || stream->id < 0) {
@ -1712,32 +1779,64 @@ static ssize_t cf_ngtcp2_send(struct Curl_cfilter *cf, struct Curl_easy *data,
goto out;
}
}
else if(stream->upload_blocked_len) {
/* the data in `buf` has alread been submitted or added to the
* buffers, but have been EAGAINed on the last invocation. */
DEBUGASSERT(len >= stream->upload_blocked_len);
if(len < stream->upload_blocked_len) {
/* Did we get called again with a smaller `len`? This should not
* happen. We are not prepared to handle that. */
failf(data, "HTTP/3 send again with decreased length");
*err = CURLE_HTTP3;
sent = -1;
goto out;
}
sent = (ssize_t)stream->upload_blocked_len;
stream->upload_blocked_len = 0;
}
else if(stream->closed) {
*err = CURLE_HTTP3;
sent = -1;
goto out;
}
else {
sent = Curl_bufq_write(&stream->sendbuf, buf, len, err);
DEBUGF(LOG_CF(data, cf, "[h3sid=%" PRId64 "] cf_send, add to "
"sendbuf(len=%zu) -> %zd, %d",
stream->id, len, sent, *err));
if(sent < 0) {
if(*err == CURLE_AGAIN) {
/* Can't add more to the send buf, needs to drain first.
* Pause the sending to avoid a busy loop. */
data->req.keepon |= KEEP_SEND_HOLD;
DEBUGF(LOG_CF(data, cf, "[h3sid=%" PRId64 "] pause send",
stream->id));
}
goto out;
}
(void)nghttp3_conn_resume_stream(ctx->h3conn, stream->id);
}
if(cf_flush_egress(cf, data, &pktx)) {
*err = CURLE_SEND_ERROR;
result = cf_progress_egress(cf, data, &pktx);
if(result) {
*err = result;
sent = -1;
goto out;
}
if(stream && sent > 0 && stream->sendbuf_len_in_flight) {
/* We have unacknowledged DATA and cannot report success to our
* caller. Instead we EAGAIN and remember how much we have already
* "written" into our various internal connection buffers.
* We put the stream upload on HOLD, until this gets ACKed. */
stream->upload_blocked_len = sent;
DEBUGF(LOG_CF(data, cf, "[h3sid=%" PRId64 "] cf_send(len=%zu), "
"%zu bytes in flight -> EGAIN", stream->id, len,
stream->sendbuf_len_in_flight));
*err = CURLE_AGAIN;
sent = -1;
data->req.keepon |= KEEP_SEND_HOLD;
}
out:
result = check_and_set_expiry(cf, data, &pktx);
if(result) {
*err = result;
sent = -1;
}
CF_DATA_RESTORE(cf, save);
return sent;
}
@ -1837,15 +1936,15 @@ static CURLcode recv_pkt(const unsigned char *pkt, size_t pktlen,
return CURLE_OK;
}
static CURLcode cf_process_ingress(struct Curl_cfilter *cf,
struct Curl_easy *data,
struct pkt_io_ctx *pktx)
static CURLcode cf_progress_ingress(struct Curl_cfilter *cf,
struct Curl_easy *data,
struct pkt_io_ctx *pktx)
{
struct cf_ngtcp2_ctx *ctx = cf->ctx;
struct pkt_io_ctx local_pktx;
size_t pkts_chunk = 128, i;
size_t pkts_max = 10 * pkts_chunk;
CURLcode result;
CURLcode result = CURLE_OK;
if(!pktx) {
pktx_init(&local_pktx, cf, data);
@ -1864,7 +1963,9 @@ static CURLcode cf_process_ingress(struct Curl_cfilter *cf,
if(pktx->pkt_count < pkts_chunk) /* got less than we could */
break;
/* give egress a chance before we receive more */
result = cf_flush_egress(cf, data, pktx);
result = cf_progress_egress(cf, data, pktx);
if(result) /* error */
break;
}
return result;
}
@ -1976,18 +2077,15 @@ out:
return nwritten;
}
static CURLcode cf_flush_egress(struct Curl_cfilter *cf,
struct Curl_easy *data,
struct pkt_io_ctx *pktx)
static CURLcode cf_progress_egress(struct Curl_cfilter *cf,
struct Curl_easy *data,
struct pkt_io_ctx *pktx)
{
struct cf_ngtcp2_ctx *ctx = cf->ctx;
int rv;
ssize_t nread;
size_t max_payload_size, path_max_payload_size, max_pktcnt;
size_t pktcnt = 0;
size_t gsolen = 0; /* this disables gso until we have a clue */
ngtcp2_tstamp expiry;
ngtcp2_duration timeout;
CURLcode curlcode;
struct pkt_io_ctx local_pktx;
@ -2000,14 +2098,6 @@ static CURLcode cf_flush_egress(struct Curl_cfilter *cf,
ngtcp2_path_storage_zero(&pktx->ps);
}
rv = ngtcp2_conn_handle_expiry(ctx->qconn, pktx->ts);
if(rv) {
failf(data, "ngtcp2_conn_handle_expiry returned error: %s",
ngtcp2_strerror(rv));
ngtcp2_ccerr_set_liberr(&ctx->last_error, rv, NULL, 0);
return CURLE_SEND_ERROR;
}
curlcode = vquic_flush(cf, data, &ctx->q);
if(curlcode) {
if(curlcode == CURLE_AGAIN) {
@ -2098,21 +2188,6 @@ static CURLcode cf_flush_egress(struct Curl_cfilter *cf,
}
out:
/* non-errored exit. check when we should run again. */
expiry = ngtcp2_conn_get_expiry(ctx->qconn);
if(expiry != UINT64_MAX) {
if(expiry <= pktx->ts) {
timeout = 0;
}
else {
timeout = expiry - pktx->ts;
if(timeout % NGTCP2_MILLISECONDS) {
timeout += NGTCP2_MILLISECONDS;
}
}
Curl_expire(data, timeout / NGTCP2_MILLISECONDS, EXPIRE_QUIC);
}
return CURLE_OK;
}
@ -2172,11 +2247,7 @@ static CURLcode cf_ngtcp2_data_event(struct Curl_cfilter *cf,
break;
}
case CF_CTRL_DATA_IDLE:
if(timestamp() >= ngtcp2_conn_get_expiry(ctx->qconn)) {
if(cf_flush_egress(cf, data, NULL)) {
result = CURLE_SEND_ERROR;
}
}
result = check_and_set_expiry(cf, data, NULL);
break;
default:
break;
@ -2398,16 +2469,16 @@ static CURLcode cf_ngtcp2_connect(struct Curl_cfilter *cf,
result = cf_connect_start(cf, data, &pktx);
if(result)
goto out;
result = cf_flush_egress(cf, data, &pktx);
result = cf_progress_egress(cf, data, &pktx);
/* we do not expect to be able to recv anything yet */
goto out;
}
result = cf_process_ingress(cf, data, &pktx);
result = cf_progress_ingress(cf, data, &pktx);
if(result)
goto out;
result = cf_flush_egress(cf, data, &pktx);
result = cf_progress_egress(cf, data, &pktx);
if(result)
goto out;
@ -2464,6 +2535,9 @@ out:
r_ip, r_port, curl_easy_strerror(result));
}
#endif
if(!result && ctx->qconn) {
result = check_and_set_expiry(cf, data, &pktx);
}
DEBUGF(LOG_CF(data, cf, "connect -> %d, done=%d", result, *done));
CF_DATA_RESTORE(cf, save);
return result;
@ -2535,7 +2609,7 @@ static bool cf_ngtcp2_conn_is_alive(struct Curl_cfilter *cf,
not in use by any other transfer, there shouldn't be any data here,
only "protocol frames" */
*input_pending = FALSE;
if(cf_process_ingress(cf, data, NULL))
if(cf_progress_ingress(cf, data, NULL))
alive = FALSE;
else {
alive = TRUE;