h3/ngtcp2: improve error handling

- identify ngtcp2 and nghttp3 error codes that are fatal
- close quic connection on fatal errors
- refuse further filter operations once connection is closed
- confusion about the nghttp3 API. We should close the QUIC stream on
  cancel and not use the nghttp3 calls intended to be invoked when the
  QUIC stream was closed by the peer.

Closes #13562
This commit is contained in:
Stefan Eissing 2024-05-08 13:44:35 +02:00 committed by Daniel Stenberg
parent edc5b3502c
commit bc6e3e6049
No known key found for this signature in database
GPG Key ID: 5CC908FDB71E12C2
2 changed files with 173 additions and 95 deletions

View File

@ -138,6 +138,7 @@ struct cf_ngtcp2_ctx {
uint64_t used_bidi_streams; /* bidi streams we have opened */
uint64_t max_bidi_streams; /* max bidi streams we can open */
int qlogfd;
BIT(conn_closed); /* connection is closed */
};
/* How to access `call_data` from a cf_ngtcp2 filter */
@ -175,6 +176,8 @@ struct h3_stream_ctx {
#define H3_STREAM_CTX(ctx,data) ((struct h3_stream_ctx *)(\
data? Curl_hash_offt_get(&(ctx)->streams, (data)->id) : NULL))
#define H3_STREAM_CTX_ID(ctx,id) ((struct h3_stream_ctx *)(\
Curl_hash_offt_get(&(ctx)->streams, (id))))
static void h3_stream_ctx_free(struct h3_stream_ctx *stream)
{
@ -222,28 +225,37 @@ static CURLcode h3_data_setup(struct Curl_cfilter *cf,
return CURLE_OK;
}
static void cf_ngtcp2_stream_close(struct Curl_cfilter *cf,
struct Curl_easy *data,
struct h3_stream_ctx *stream)
{
struct cf_ngtcp2_ctx *ctx = cf->ctx;
DEBUGASSERT(data);
DEBUGASSERT(stream);
if(!stream->closed && ctx->qconn && ctx->h3conn) {
CURLcode result;
nghttp3_conn_set_stream_user_data(ctx->h3conn, stream->id, NULL);
ngtcp2_conn_set_stream_user_data(ctx->qconn, stream->id, NULL);
stream->closed = TRUE;
(void)ngtcp2_conn_shutdown_stream(ctx->qconn, 0, stream->id,
NGHTTP3_H3_REQUEST_CANCELLED);
result = cf_progress_egress(cf, data, NULL);
if(result)
CURL_TRC_CF(data, cf, "[%" CURL_PRId64 "] cancel stream -> %d",
stream->id, result);
}
}
static void h3_data_done(struct Curl_cfilter *cf, struct Curl_easy *data)
{
struct cf_ngtcp2_ctx *ctx = cf->ctx;
struct h3_stream_ctx *stream = H3_STREAM_CTX(ctx, data);
CURLcode result;
(void)cf;
if(stream) {
CURL_TRC_CF(data, cf, "[%" CURL_PRId64 "] easy handle is done",
stream->id);
if(ctx->h3conn && !stream->closed) {
nghttp3_conn_shutdown_stream_read(ctx->h3conn, stream->id);
nghttp3_conn_close_stream(ctx->h3conn, stream->id,
NGHTTP3_H3_REQUEST_CANCELLED);
nghttp3_conn_set_stream_user_data(ctx->h3conn, stream->id, NULL);
ngtcp2_conn_set_stream_user_data(ctx->qconn, stream->id, NULL);
stream->closed = TRUE;
result = cf_progress_egress(cf, data, NULL);
if(result)
CURL_TRC_CF(data, cf, "data_done, flush egress -> %d", result);
}
cf_ngtcp2_stream_close(cf, data, stream);
Curl_hash_offt_remove(&ctx->streams, data->id);
}
}
@ -414,6 +426,52 @@ static int cb_handshake_completed(ngtcp2_conn *tconn, void *user_data)
return 0;
}
static void cf_ngtcp2_conn_close(struct Curl_cfilter *cf,
struct Curl_easy *data);
static bool cf_ngtcp2_err_is_fatal(int code)
{
return (NGTCP2_ERR_FATAL >= code) ||
(NGTCP2_ERR_DROP_CONN == code) ||
(NGTCP2_ERR_IDLE_CLOSE == code);
}
static void cf_ngtcp2_err_set(struct Curl_cfilter *cf,
struct Curl_easy *data, int code)
{
struct cf_ngtcp2_ctx *ctx = cf->ctx;
if(!ctx->last_error.error_code) {
if(NGTCP2_ERR_CRYPTO == code) {
ngtcp2_ccerr_set_tls_alert(&ctx->last_error,
ngtcp2_conn_get_tls_alert(ctx->qconn),
NULL, 0);
}
else {
ngtcp2_ccerr_set_liberr(&ctx->last_error, code, NULL, 0);
}
}
if(cf_ngtcp2_err_is_fatal(code))
cf_ngtcp2_conn_close(cf, data);
}
static bool cf_ngtcp2_h3_err_is_fatal(int code)
{
return (NGHTTP3_ERR_FATAL >= code) ||
(NGHTTP3_ERR_H3_CLOSED_CRITICAL_STREAM == code);
}
static void cf_ngtcp2_h3_err_set(struct Curl_cfilter *cf,
struct Curl_easy *data, int code)
{
struct cf_ngtcp2_ctx *ctx = cf->ctx;
if(!ctx->last_error.error_code) {
ngtcp2_ccerr_set_application_error(&ctx->last_error,
nghttp3_err_infer_quic_app_error_code(code), NULL, 0);
}
if(cf_ngtcp2_h3_err_is_fatal(code))
cf_ngtcp2_conn_close(cf, data);
}
static int cb_recv_stream_data(ngtcp2_conn *tconn, uint32_t flags,
int64_t sid, uint64_t offset,
const uint8_t *buf, size_t buflen,
@ -425,28 +483,23 @@ static int cb_recv_stream_data(ngtcp2_conn *tconn, uint32_t flags,
nghttp3_ssize nconsumed;
int fin = (flags & NGTCP2_STREAM_DATA_FLAG_FIN) ? 1 : 0;
struct Curl_easy *data = stream_user_data;
struct h3_stream_ctx *stream = H3_STREAM_CTX(ctx, data);
(void)offset;
(void)data;
nconsumed =
nghttp3_conn_read_stream(ctx->h3conn, stream_id, buf, buflen, fin);
CURL_TRC_CF(data, cf, "[%" CURL_PRId64 "] read_stream(len=%zu) -> %zd",
stream_id, buflen, nconsumed);
if(!data)
data = CF_DATA_CURRENT(cf);
if(data)
CURL_TRC_CF(data, cf, "[%" CURL_PRId64 "] read_stream(len=%zu) -> %zd",
stream_id, buflen, nconsumed);
if(nconsumed < 0) {
/* consume all bytes */
ngtcp2_conn_extend_max_stream_offset(tconn, stream_id, buflen);
ngtcp2_conn_extend_max_offset(tconn, buflen);
if(!data || (stream && stream->reset) ||
NGHTTP3_ERR_H3_STREAM_CREATION_ERROR == (int)nconsumed) {
struct Curl_easy *cdata = CF_DATA_CURRENT(cf);
CURL_TRC_CF(cdata, cf, "[%" CURL_PRId64 "] discard data for stream %s",
stream_id, (data && stream)? "reset" : "unknown");
return 0;
struct h3_stream_ctx *stream = H3_STREAM_CTX_ID(ctx, stream_id);
if(data && stream) {
CURL_TRC_CF(data, cf, "[%" CURL_PRId64 "] error on known stream, "
"reset=%d, closed=%d",
stream_id, stream->reset, stream->closed);
}
ngtcp2_ccerr_set_application_error(
&ctx->last_error,
nghttp3_err_infer_quic_app_error_code((int)nconsumed), NULL, 0);
return NGTCP2_ERR_CALLBACK_FAILURE;
}
@ -486,26 +539,28 @@ static int cb_stream_close(ngtcp2_conn *tconn, uint32_t flags,
void *user_data, void *stream_user_data)
{
struct Curl_cfilter *cf = user_data;
struct cf_ngtcp2_ctx *ctx = cf->ctx;
struct Curl_easy *data = stream_user_data;
curl_int64_t stream_id = (curl_int64_t)sid;
struct cf_ngtcp2_ctx *ctx = cf->ctx;
int rv;
(void)tconn;
(void)data;
/* stream is closed... */
if(!data)
data = CF_DATA_CURRENT(cf);
if(!data)
return NGTCP2_ERR_CALLBACK_FAILURE;
if(!(flags & NGTCP2_STREAM_CLOSE_FLAG_APP_ERROR_CODE_SET)) {
app_error_code = NGHTTP3_H3_NO_ERROR;
}
rv = nghttp3_conn_close_stream(ctx->h3conn, stream_id, app_error_code);
CURL_TRC_CF(data, cf, "[%" CURL_PRId64 "] quic close(err=%"
CURL_TRC_CF(data, cf, "[%" CURL_PRId64 "] quic close(app_error=%"
CURL_PRIu64 ") -> %d", stream_id, (curl_uint64_t)app_error_code,
rv);
if(rv && rv != NGHTTP3_ERR_STREAM_NOT_FOUND) {
ngtcp2_ccerr_set_application_error(
&ctx->last_error, nghttp3_err_infer_quic_app_error_code(rv), NULL, 0);
cf_ngtcp2_h3_err_set(cf, data, rv);
return NGTCP2_ERR_CALLBACK_FAILURE;
}
@ -725,7 +780,7 @@ static CURLcode check_and_set_expiry(struct Curl_cfilter *cf,
if(rv) {
failf(data, "ngtcp2_conn_handle_expiry returned error: %s",
ngtcp2_strerror(rv));
ngtcp2_ccerr_set_liberr(&ctx->last_error, rv, NULL, 0);
cf_ngtcp2_err_set(cf, data, rv);
return CURLE_SEND_ERROR;
}
result = cf_progress_ingress(cf, data, pktx);
@ -832,16 +887,13 @@ static void h3_xfer_write_resp(struct Curl_cfilter *cf,
{
/* If we already encountered an error, skip further writes */
if(!stream->xfer_result)
if(!stream->xfer_result) {
stream->xfer_result = Curl_xfer_write_resp(data, buf, blen, eos);
/* If the transfer write is errored, we do not want any more data */
if(stream->xfer_result) {
struct cf_ngtcp2_ctx *ctx = cf->ctx;
CURL_TRC_CF(data, cf, "[%"CURL_PRId64"] error %d writing %zu bytes "
"of data, cancelling stream",
stream->id, stream->xfer_result, blen);
nghttp3_conn_close_stream(ctx->h3conn, stream->id,
NGHTTP3_H3_REQUEST_CANCELLED);
/* If the transfer write is errored, we do not want any more data */
if(stream->xfer_result) {
CURL_TRC_CF(data, cf, "[%"CURL_PRId64"] error %d writing %zu bytes "
"of data", stream->id, stream->xfer_result, blen);
}
}
}
@ -1150,7 +1202,7 @@ static ssize_t cf_ngtcp2_recv(struct Curl_cfilter *cf, struct Curl_easy *data,
pktx_init(&pktx, cf, data);
if(!stream) {
if(!stream || ctx->conn_closed) {
*err = CURLE_RECV_ERROR;
goto out;
}
@ -1163,6 +1215,7 @@ static ssize_t cf_ngtcp2_recv(struct Curl_cfilter *cf, struct Curl_easy *data,
if(stream->xfer_result) {
CURL_TRC_CF(data, cf, "[%" CURL_PRId64 "] xfer write failed", stream->id);
cf_ngtcp2_stream_close(cf, data, stream);
*err = stream->xfer_result;
nread = -1;
goto out;
@ -1451,6 +1504,12 @@ static ssize_t cf_ngtcp2_send(struct Curl_cfilter *cf, struct Curl_easy *data,
}
if(!stream || stream->id < 0) {
if(ctx->conn_closed) {
CURL_TRC_CF(data, cf, "cannot open stream on closed connection");
*err = CURLE_SEND_ERROR;
sent = -1;
goto out;
}
sent = h3_stream_open(cf, data, buf, len, err);
if(sent < 0) {
CURL_TRC_CF(data, cf, "failed to open stream -> %d", *err);
@ -1458,6 +1517,13 @@ static ssize_t cf_ngtcp2_send(struct Curl_cfilter *cf, struct Curl_easy *data,
}
stream = H3_STREAM_CTX(ctx, data);
}
else if(stream->xfer_result) {
CURL_TRC_CF(data, cf, "[%" CURL_PRId64 "] xfer write failed", stream->id);
cf_ngtcp2_stream_close(cf, data, stream);
*err = stream->xfer_result;
sent = -1;
goto out;
}
else if(stream->upload_blocked_len) {
/* the data in `buf` has already been submitted or added to the
* buffers, but have been EAGAINed on the last invocation. */
@ -1492,6 +1558,12 @@ static ssize_t cf_ngtcp2_send(struct Curl_cfilter *cf, struct Curl_easy *data,
sent = -1;
goto out;
}
else if(ctx->conn_closed) {
CURL_TRC_CF(data, cf, "cannot send on closed connection");
*err = CURLE_SEND_ERROR;
sent = -1;
goto out;
}
else {
sent = Curl_bufq_write(&stream->sendbuf, buf, len, err);
CURL_TRC_CF(data, cf, "[%" CURL_PRId64 "] cf_send, add to "
@ -1568,16 +1640,7 @@ static CURLcode recv_pkt(const unsigned char *pkt, size_t pktlen,
if(rv) {
CURL_TRC_CF(pktx->data, pktx->cf, "ingress, read_pkt -> %s (%d)",
ngtcp2_strerror(rv), rv);
if(!ctx->last_error.error_code) {
if(rv == NGTCP2_ERR_CRYPTO) {
ngtcp2_ccerr_set_tls_alert(&ctx->last_error,
ngtcp2_conn_get_tls_alert(ctx->qconn),
NULL, 0);
}
else {
ngtcp2_ccerr_set_liberr(&ctx->last_error, rv, NULL, 0);
}
}
cf_ngtcp2_err_set(pktx->cf, pktx->data, rv);
if(rv == NGTCP2_ERR_CRYPTO)
/* this is a "TLS problem", but a failed certificate verification
@ -1660,9 +1723,7 @@ static ssize_t read_pkt_to_send(void *userp,
if(veccnt < 0) {
failf(x->data, "nghttp3_conn_writev_stream returned error: %s",
nghttp3_strerror((int)veccnt));
ngtcp2_ccerr_set_application_error(
&ctx->last_error,
nghttp3_err_infer_quic_app_error_code((int)veccnt), NULL, 0);
cf_ngtcp2_h3_err_set(x->cf, x->data, (int)veccnt);
*err = CURLE_SEND_ERROR;
return -1;
}
@ -1709,7 +1770,7 @@ static ssize_t read_pkt_to_send(void *userp,
DEBUGASSERT(ndatalen == -1);
failf(x->data, "ngtcp2_conn_writev_stream returned error: %s",
ngtcp2_strerror((int)n));
ngtcp2_ccerr_set_liberr(&ctx->last_error, (int)n, NULL, 0);
cf_ngtcp2_err_set(x->cf, x->data, (int)n);
*err = CURLE_SEND_ERROR;
nwritten = -1;
goto out;
@ -1946,6 +2007,31 @@ static void cf_ngtcp2_ctx_clear(struct cf_ngtcp2_ctx *ctx)
ctx->call_data = save;
}
static void cf_ngtcp2_conn_close(struct Curl_cfilter *cf,
struct Curl_easy *data)
{
struct cf_ngtcp2_ctx *ctx = cf->ctx;
if(ctx && ctx->qconn && !ctx->conn_closed) {
char buffer[NGTCP2_MAX_UDP_PAYLOAD_SIZE];
struct pkt_io_ctx pktx;
ngtcp2_ssize rc;
ctx->conn_closed = TRUE;
pktx_init(&pktx, cf, data);
rc = ngtcp2_conn_write_connection_close(ctx->qconn, NULL, /* path */
NULL, /* pkt_info */
(uint8_t *)buffer, sizeof(buffer),
&ctx->last_error, pktx.ts);
CURL_TRC_CF(data, cf, "closing connection(err_type=%d, err_code=%"
CURL_PRIu64 ") -> %d", ctx->last_error.type,
(curl_uint64_t)ctx->last_error.error_code, (int)rc);
if(rc > 0) {
while((send(ctx->q.sockfd, buffer, (SEND_TYPE_ARG3)rc, 0) == -1) &&
SOCKERRNO == EINTR);
}
}
}
static void cf_ngtcp2_close(struct Curl_cfilter *cf, struct Curl_easy *data)
{
struct cf_ngtcp2_ctx *ctx = cf->ctx;
@ -1953,24 +2039,10 @@ static void cf_ngtcp2_close(struct Curl_cfilter *cf, struct Curl_easy *data)
CF_DATA_SAVE(save, cf, data);
if(ctx && ctx->qconn) {
char buffer[NGTCP2_MAX_UDP_PAYLOAD_SIZE];
struct pkt_io_ctx pktx;
ngtcp2_ssize rc;
CURL_TRC_CF(data, cf, "close");
pktx_init(&pktx, cf, data);
rc = ngtcp2_conn_write_connection_close(ctx->qconn, NULL, /* path */
NULL, /* pkt_info */
(uint8_t *)buffer, sizeof(buffer),
&ctx->last_error, pktx.ts);
if(rc > 0) {
while((send(ctx->q.sockfd, buffer, (SEND_TYPE_ARG3)rc, 0) == -1) &&
SOCKERRNO == EINTR);
}
cf_ngtcp2_conn_close(cf, data);
cf_ngtcp2_ctx_clear(ctx);
CURL_TRC_CF(data, cf, "close");
}
cf->connected = FALSE;
CF_DATA_RESTORE(cf, save);
}
@ -2259,7 +2331,10 @@ static CURLcode cf_ngtcp2_query(struct Curl_cfilter *cf,
* by callback. QUIC counts the number over the lifetime of the
* connection, ever increasing.
* We count the *open* transfers plus the budget for new ones. */
if(ctx->max_bidi_streams) {
if(!ctx->qconn || ctx->conn_closed) {
*pres1 = 0;
}
else if(ctx->max_bidi_streams) {
uint64_t avail_bidi_streams = 0;
uint64_t max_streams = CONN_INUSE(cf->conn);
if(ctx->max_bidi_streams > ctx->used_bidi_streams)
@ -2312,9 +2387,9 @@ static bool cf_ngtcp2_conn_is_alive(struct Curl_cfilter *cf,
const ngtcp2_transport_params *rp;
struct cf_call_data save;
CF_DATA_SAVE(save, cf, data);
CF_DATA_SAVE(save, cf, data);
*input_pending = FALSE;
if(!ctx->qconn)
if(!ctx->qconn || ctx->conn_closed)
goto out;
/* Both sides of the QUIC connection announce they max idle times in

View File

@ -62,9 +62,9 @@ class TestInfo:
curl = CurlClient(env=env)
url = f'https://{env.authority_for(env.domain1, proto)}/data.json?[0-{count-1}]'
r = curl.http_download(urls=[url], alpn_proto=proto, with_stats=True)
r.check_stats(count=count, http_status=200)
for s in r.stats:
self.check_stat(s, dl_size=30, ul_size=0)
r.check_stats(count=count, http_status=200, exitcode=0)
for idx, s in enumerate(r.stats):
self.check_stat(idx, s, r, dl_size=30, ul_size=0)
# download plain file with a 302 redirect
@pytest.mark.parametrize("proto", ['http/1.1', 'h2', 'h3'])
@ -77,9 +77,9 @@ class TestInfo:
r = curl.http_download(urls=[url], alpn_proto=proto, with_stats=True, extra_args=[
'--location'
])
r.check_stats(count=count, http_status=200)
for s in r.stats:
self.check_stat(s, dl_size=30, ul_size=0)
r.check_stats(count=count, http_status=200, exitcode=0)
for idx, s in enumerate(r.stats):
self.check_stat(idx, s, r, dl_size=30, ul_size=0)
@pytest.mark.parametrize("proto", ['http/1.1', 'h2', 'h3'])
def test_16_03_info_upload(self, env: Env, httpd, nghttpx, proto, repeat):
@ -91,11 +91,13 @@ class TestInfo:
curl = CurlClient(env=env)
url = f'https://{env.authority_for(env.domain1, proto)}/curltest/echo?id=[0-{count-1}]'
r = curl.http_upload(urls=[url], data=f'@{fdata}', alpn_proto=proto,
with_headers=True)
with_headers=True, extra_args=[
'--trace-config', 'http/2,http/3'
])
r.check_response(count=count, http_status=200)
r.check_stats(count=count, http_status=200)
for s in r.stats:
self.check_stat(s, dl_size=fsize, ul_size=fsize)
r.check_stats(count=count, http_status=200, exitcode=0)
for idx, s in enumerate(r.stats):
self.check_stat(idx, s, r, dl_size=fsize, ul_size=fsize)
# download plain file via http: ('time_appconnect' is 0)
@pytest.mark.parametrize("proto", ['http/1.1'])
@ -104,21 +106,22 @@ class TestInfo:
curl = CurlClient(env=env)
url = f'http://{env.domain1}:{env.http_port}/data.json?[0-{count-1}]'
r = curl.http_download(urls=[url], alpn_proto=proto, with_stats=True)
r.check_stats(count=count, http_status=200)
for s in r.stats:
self.check_stat(s, dl_size=30, ul_size=0)
r.check_stats(count=count, http_status=200, exitcode=0)
for idx, s in enumerate(r.stats):
self.check_stat(idx, s, r, dl_size=30, ul_size=0)
def check_stat(self, s, dl_size=None, ul_size=None):
def check_stat(self, idx, s, r, dl_size=None, ul_size=None):
self.check_stat_times(s)
# we always send something
self.check_stat_positive(s, 'size_request')
# we always receive response headers
self.check_stat_positive(s, 'size_header')
if ul_size is not None:
assert s['size_upload'] == ul_size # the file we sent
assert s['size_request'] >= s['size_upload'], f'"size_request" smaller than "size_upload", {s}'
assert s['size_upload'] == ul_size, f'stat #{idx}\n{r.dump_logs()}' # the file we sent
assert s['size_request'] >= s['size_upload'], \
f'stat #{idx}, "size_request" smaller than "size_upload", {s}\n{r.dump_logs()}'
if dl_size is not None:
assert s['size_download'] == dl_size # the file we received
assert s['size_download'] == dl_size, f'stat #{idx}\n{r.dump_logs()}' # the file we received
def check_stat_positive(self, s, key):
assert key in s, f'stat "{key}" missing: {s}'