mirror of
https://github.com/curl/curl.git
synced 2025-01-06 13:44:52 +08:00
http/3: add shutdown support
- openssl-quic shutdown handling - ngtcp2 shutdown handling - quiche shutdown handling - add test_19_06 for verfication Reported-by: Dexter Gerig Closes #14027 Fixes #14022
This commit is contained in:
parent
868ae0673c
commit
bb09a304bb
@ -138,7 +138,7 @@ struct cf_ngtcp2_ctx {
|
||||
uint64_t used_bidi_streams; /* bidi streams we have opened */
|
||||
uint64_t max_bidi_streams; /* max bidi streams we can open */
|
||||
int qlogfd;
|
||||
BIT(conn_closed); /* connection is closed */
|
||||
BIT(shutdown_started); /* queued shutdown packets */
|
||||
};
|
||||
|
||||
/* How to access `call_data` from a cf_ngtcp2 filter */
|
||||
@ -816,6 +816,9 @@ static void cf_ngtcp2_adjust_pollset(struct Curl_cfilter *cf,
|
||||
return;
|
||||
|
||||
Curl_pollset_check(data, ps, ctx->q.sockfd, &want_recv, &want_send);
|
||||
if(!want_send && !Curl_bufq_is_empty(&ctx->q.sendbuf))
|
||||
want_send = TRUE;
|
||||
|
||||
if(want_recv || want_send) {
|
||||
struct h3_stream_ctx *stream = H3_STREAM_CTX(ctx, data);
|
||||
struct cf_call_data save;
|
||||
@ -1203,7 +1206,7 @@ static ssize_t cf_ngtcp2_recv(struct Curl_cfilter *cf, struct Curl_easy *data,
|
||||
|
||||
pktx_init(&pktx, cf, data);
|
||||
|
||||
if(!stream || ctx->conn_closed) {
|
||||
if(!stream || ctx->shutdown_started) {
|
||||
*err = CURLE_RECV_ERROR;
|
||||
goto out;
|
||||
}
|
||||
@ -1505,7 +1508,7 @@ static ssize_t cf_ngtcp2_send(struct Curl_cfilter *cf, struct Curl_easy *data,
|
||||
}
|
||||
|
||||
if(!stream || stream->id < 0) {
|
||||
if(ctx->conn_closed) {
|
||||
if(ctx->shutdown_started) {
|
||||
CURL_TRC_CF(data, cf, "cannot open stream on closed connection");
|
||||
*err = CURLE_SEND_ERROR;
|
||||
sent = -1;
|
||||
@ -1559,7 +1562,7 @@ static ssize_t cf_ngtcp2_send(struct Curl_cfilter *cf, struct Curl_easy *data,
|
||||
sent = -1;
|
||||
goto out;
|
||||
}
|
||||
else if(ctx->conn_closed) {
|
||||
else if(ctx->shutdown_started) {
|
||||
CURL_TRC_CF(data, cf, "cannot send on closed connection");
|
||||
*err = CURLE_SEND_ERROR;
|
||||
sent = -1;
|
||||
@ -2008,29 +2011,97 @@ static void cf_ngtcp2_ctx_clear(struct cf_ngtcp2_ctx *ctx)
|
||||
ctx->call_data = save;
|
||||
}
|
||||
|
||||
static CURLcode cf_ngtcp2_shutdown(struct Curl_cfilter *cf,
|
||||
struct Curl_easy *data, bool *done)
|
||||
{
|
||||
struct cf_ngtcp2_ctx *ctx = cf->ctx;
|
||||
struct cf_call_data save;
|
||||
struct pkt_io_ctx pktx;
|
||||
CURLcode result = CURLE_OK;
|
||||
|
||||
if(cf->shutdown || !ctx->qconn) {
|
||||
*done = TRUE;
|
||||
return CURLE_OK;
|
||||
}
|
||||
|
||||
CF_DATA_SAVE(save, cf, data);
|
||||
*done = FALSE;
|
||||
pktx_init(&pktx, cf, data);
|
||||
|
||||
if(!ctx->shutdown_started) {
|
||||
char buffer[NGTCP2_MAX_UDP_PAYLOAD_SIZE];
|
||||
ngtcp2_ssize nwritten;
|
||||
|
||||
if(!Curl_bufq_is_empty(&ctx->q.sendbuf)) {
|
||||
CURL_TRC_CF(data, cf, "shutdown, flushing sendbuf");
|
||||
result = cf_progress_egress(cf, data, &pktx);
|
||||
if(!Curl_bufq_is_empty(&ctx->q.sendbuf)) {
|
||||
CURL_TRC_CF(data, cf, "sending shutdown packets blocked");
|
||||
result = CURLE_OK;
|
||||
goto out;
|
||||
}
|
||||
else if(result) {
|
||||
CURL_TRC_CF(data, cf, "shutdown, error %d flushing sendbuf", result);
|
||||
*done = TRUE;
|
||||
goto out;
|
||||
}
|
||||
}
|
||||
|
||||
ctx->shutdown_started = TRUE;
|
||||
nwritten = ngtcp2_conn_write_connection_close(
|
||||
ctx->qconn, NULL, /* path */
|
||||
NULL, /* pkt_info */
|
||||
(uint8_t *)buffer, sizeof(buffer),
|
||||
&ctx->last_error, pktx.ts);
|
||||
CURL_TRC_CF(data, cf, "start shutdown(err_type=%d, err_code=%"
|
||||
CURL_PRIu64 ") -> %d", ctx->last_error.type,
|
||||
(curl_uint64_t)ctx->last_error.error_code, (int)nwritten);
|
||||
if(nwritten > 0) {
|
||||
Curl_bufq_write(&ctx->q.sendbuf, (const unsigned char *)buffer,
|
||||
(size_t)nwritten, &result);
|
||||
if(result) {
|
||||
CURL_TRC_CF(data, cf, "error %d adding shutdown packets to sendbuf, "
|
||||
"aborting shutdown", result);
|
||||
goto out;
|
||||
}
|
||||
ctx->q.no_gso = TRUE;
|
||||
ctx->q.gsolen = (size_t)nwritten;
|
||||
ctx->q.split_len = 0;
|
||||
}
|
||||
}
|
||||
|
||||
if(!Curl_bufq_is_empty(&ctx->q.sendbuf)) {
|
||||
CURL_TRC_CF(data, cf, "shutdown, flushing egress");
|
||||
result = vquic_flush(cf, data, &ctx->q);
|
||||
if(result == CURLE_AGAIN) {
|
||||
CURL_TRC_CF(data, cf, "sending shutdown packets blocked");
|
||||
result = CURLE_OK;
|
||||
goto out;
|
||||
}
|
||||
else if(result) {
|
||||
CURL_TRC_CF(data, cf, "shutdown, error %d flushing sendbuf", result);
|
||||
*done = TRUE;
|
||||
goto out;
|
||||
}
|
||||
}
|
||||
|
||||
if(Curl_bufq_is_empty(&ctx->q.sendbuf)) {
|
||||
/* Sent everything off. ngtcp2 seems to have no support for graceful
|
||||
* shutdowns. So, we are done. */
|
||||
CURL_TRC_CF(data, cf, "shutdown completely sent off, done");
|
||||
*done = TRUE;
|
||||
result = CURLE_OK;
|
||||
}
|
||||
out:
|
||||
CF_DATA_RESTORE(cf, save);
|
||||
return result;
|
||||
}
|
||||
|
||||
static void cf_ngtcp2_conn_close(struct Curl_cfilter *cf,
|
||||
struct Curl_easy *data)
|
||||
{
|
||||
struct cf_ngtcp2_ctx *ctx = cf->ctx;
|
||||
if(ctx && ctx->qconn && !ctx->conn_closed) {
|
||||
char buffer[NGTCP2_MAX_UDP_PAYLOAD_SIZE];
|
||||
struct pkt_io_ctx pktx;
|
||||
ngtcp2_ssize rc;
|
||||
|
||||
ctx->conn_closed = TRUE;
|
||||
pktx_init(&pktx, cf, data);
|
||||
rc = ngtcp2_conn_write_connection_close(ctx->qconn, NULL, /* path */
|
||||
NULL, /* pkt_info */
|
||||
(uint8_t *)buffer, sizeof(buffer),
|
||||
&ctx->last_error, pktx.ts);
|
||||
CURL_TRC_CF(data, cf, "closing connection(err_type=%d, err_code=%"
|
||||
CURL_PRIu64 ") -> %d", ctx->last_error.type,
|
||||
(curl_uint64_t)ctx->last_error.error_code, (int)rc);
|
||||
if(rc > 0) {
|
||||
while((send(ctx->q.sockfd, buffer, (SEND_TYPE_ARG3)rc, 0) == -1) &&
|
||||
SOCKERRNO == EINTR);
|
||||
}
|
||||
}
|
||||
bool done;
|
||||
cf_ngtcp2_shutdown(cf, data, &done);
|
||||
}
|
||||
|
||||
static void cf_ngtcp2_close(struct Curl_cfilter *cf, struct Curl_easy *data)
|
||||
@ -2332,7 +2403,7 @@ static CURLcode cf_ngtcp2_query(struct Curl_cfilter *cf,
|
||||
* by callback. QUIC counts the number over the lifetime of the
|
||||
* connection, ever increasing.
|
||||
* We count the *open* transfers plus the budget for new ones. */
|
||||
if(!ctx->qconn || ctx->conn_closed) {
|
||||
if(!ctx->qconn || ctx->shutdown_started) {
|
||||
*pres1 = 0;
|
||||
}
|
||||
else if(ctx->max_bidi_streams) {
|
||||
@ -2390,7 +2461,7 @@ static bool cf_ngtcp2_conn_is_alive(struct Curl_cfilter *cf,
|
||||
|
||||
CF_DATA_SAVE(save, cf, data);
|
||||
*input_pending = FALSE;
|
||||
if(!ctx->qconn || ctx->conn_closed)
|
||||
if(!ctx->qconn || ctx->shutdown_started)
|
||||
goto out;
|
||||
|
||||
/* Both sides of the QUIC connection announce they max idle times in
|
||||
@ -2438,7 +2509,7 @@ struct Curl_cftype Curl_cft_http3 = {
|
||||
cf_ngtcp2_destroy,
|
||||
cf_ngtcp2_connect,
|
||||
cf_ngtcp2_close,
|
||||
Curl_cf_def_shutdown,
|
||||
cf_ngtcp2_shutdown,
|
||||
Curl_cf_def_get_host,
|
||||
cf_ngtcp2_adjust_pollset,
|
||||
cf_ngtcp2_data_pending,
|
||||
|
@ -294,10 +294,10 @@ struct cf_osslq_ctx {
|
||||
size_t max_stream_window; /* max flow window for one stream */
|
||||
uint64_t max_idle_ms; /* max idle time for QUIC connection */
|
||||
BIT(got_first_byte); /* if first byte was received */
|
||||
#ifdef USE_OPENSSL
|
||||
BIT(x509_store_setup); /* if x509 store has been set up */
|
||||
BIT(protocol_shutdown); /* QUIC connection is shut down */
|
||||
#endif
|
||||
BIT(need_recv); /* QUIC connection needs to receive */
|
||||
BIT(need_send); /* QUIC connection needs to send */
|
||||
};
|
||||
|
||||
static void cf_osslq_ctx_clear(struct cf_osslq_ctx *ctx)
|
||||
@ -316,6 +316,77 @@ static void cf_osslq_ctx_clear(struct cf_osslq_ctx *ctx)
|
||||
ctx->call_data = save;
|
||||
}
|
||||
|
||||
static CURLcode cf_osslq_shutdown(struct Curl_cfilter *cf,
|
||||
struct Curl_easy *data, bool *done)
|
||||
{
|
||||
struct cf_osslq_ctx *ctx = cf->ctx;
|
||||
struct cf_call_data save;
|
||||
CURLcode result = CURLE_OK;
|
||||
int rc;
|
||||
|
||||
CF_DATA_SAVE(save, cf, data);
|
||||
|
||||
if(cf->shutdown || ctx->protocol_shutdown) {
|
||||
*done = TRUE;
|
||||
return CURLE_OK;
|
||||
}
|
||||
|
||||
CF_DATA_SAVE(save, cf, data);
|
||||
*done = FALSE;
|
||||
ctx->need_send = FALSE;
|
||||
ctx->need_recv = FALSE;
|
||||
|
||||
rc = SSL_shutdown_ex(ctx->tls.ossl.ssl,
|
||||
SSL_SHUTDOWN_FLAG_NO_BLOCK, NULL, 0);
|
||||
if(rc == 0) { /* ongoing */
|
||||
CURL_TRC_CF(data, cf, "shutdown ongoing");
|
||||
ctx->need_recv = TRUE;
|
||||
goto out;
|
||||
}
|
||||
else if(rc == 1) { /* done */
|
||||
CURL_TRC_CF(data, cf, "shutdown finished");
|
||||
*done = TRUE;
|
||||
goto out;
|
||||
}
|
||||
else {
|
||||
long sslerr;
|
||||
char err_buffer[256];
|
||||
int err = SSL_get_error(ctx->tls.ossl.ssl, rc);
|
||||
|
||||
switch(err) {
|
||||
case SSL_ERROR_NONE:
|
||||
case SSL_ERROR_ZERO_RETURN:
|
||||
CURL_TRC_CF(data, cf, "shutdown not received, but closed");
|
||||
*done = TRUE;
|
||||
goto out;
|
||||
case SSL_ERROR_WANT_READ:
|
||||
/* SSL has send its notify and now wants to read the reply
|
||||
* from the server. We are not really interested in that. */
|
||||
CURL_TRC_CF(data, cf, "shutdown sent, want receive");
|
||||
ctx->need_recv = TRUE;
|
||||
goto out;
|
||||
case SSL_ERROR_WANT_WRITE:
|
||||
CURL_TRC_CF(data, cf, "shutdown send blocked");
|
||||
ctx->need_send = TRUE;
|
||||
goto out;
|
||||
default:
|
||||
/* We give up on this. */
|
||||
sslerr = ERR_get_error();
|
||||
CURL_TRC_CF(data, cf, "shutdown, ignore recv error: '%s', errno %d",
|
||||
(sslerr ?
|
||||
osslq_strerror(sslerr, err_buffer, sizeof(err_buffer)) :
|
||||
osslq_SSL_ERROR_to_str(err)),
|
||||
SOCKERRNO);
|
||||
*done = TRUE;
|
||||
result = CURLE_OK;
|
||||
goto out;
|
||||
}
|
||||
}
|
||||
out:
|
||||
CF_DATA_RESTORE(cf, save);
|
||||
return result;
|
||||
}
|
||||
|
||||
static void cf_osslq_close(struct Curl_cfilter *cf, struct Curl_easy *data)
|
||||
{
|
||||
struct cf_osslq_ctx *ctx = cf->ctx;
|
||||
@ -323,8 +394,13 @@ static void cf_osslq_close(struct Curl_cfilter *cf, struct Curl_easy *data)
|
||||
|
||||
CF_DATA_SAVE(save, cf, data);
|
||||
if(ctx && ctx->tls.ossl.ssl) {
|
||||
/* TODO: send connection close */
|
||||
CURL_TRC_CF(data, cf, "cf_osslq_close()");
|
||||
if(!cf->shutdown && !ctx->protocol_shutdown) {
|
||||
/* last best effort, which OpenSSL calls a "rapid" shutdown. */
|
||||
SSL_shutdown_ex(ctx->tls.ossl.ssl,
|
||||
(SSL_SHUTDOWN_FLAG_NO_BLOCK | SSL_SHUTDOWN_FLAG_RAPID),
|
||||
NULL, 0);
|
||||
}
|
||||
cf_osslq_ctx_clear(ctx);
|
||||
}
|
||||
|
||||
@ -2182,6 +2258,10 @@ static void cf_osslq_adjust_pollset(struct Curl_cfilter *cf,
|
||||
SSL_net_read_desired(ctx->tls.ossl.ssl),
|
||||
SSL_net_write_desired(ctx->tls.ossl.ssl));
|
||||
}
|
||||
else if(ctx->need_recv || ctx->need_send) {
|
||||
Curl_pollset_set(data, ps, ctx->q.sockfd,
|
||||
ctx->need_recv, ctx->need_send);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -2245,7 +2325,7 @@ struct Curl_cftype Curl_cft_http3 = {
|
||||
cf_osslq_destroy,
|
||||
cf_osslq_connect,
|
||||
cf_osslq_close,
|
||||
Curl_cf_def_shutdown,
|
||||
cf_osslq_shutdown,
|
||||
Curl_cf_def_get_host,
|
||||
cf_osslq_adjust_pollset,
|
||||
cf_osslq_data_pending,
|
||||
|
@ -103,6 +103,7 @@ struct cf_quiche_ctx {
|
||||
curl_off_t data_recvd;
|
||||
BIT(goaway); /* got GOAWAY from server */
|
||||
BIT(x509_store_setup); /* if x509 store has been set up */
|
||||
BIT(shutdown_started); /* queued shutdown packets */
|
||||
};
|
||||
|
||||
#ifdef DEBUG_QUICHE
|
||||
@ -1464,18 +1465,60 @@ out:
|
||||
return result;
|
||||
}
|
||||
|
||||
static CURLcode cf_quiche_shutdown(struct Curl_cfilter *cf,
|
||||
struct Curl_easy *data, bool *done)
|
||||
{
|
||||
struct cf_quiche_ctx *ctx = cf->ctx;
|
||||
CURLcode result = CURLE_OK;
|
||||
|
||||
if(cf->shutdown || !ctx || !ctx->qconn) {
|
||||
*done = TRUE;
|
||||
return CURLE_OK;
|
||||
}
|
||||
|
||||
*done = FALSE;
|
||||
if(!ctx->shutdown_started) {
|
||||
int err;
|
||||
|
||||
ctx->shutdown_started = TRUE;
|
||||
vquic_ctx_update_time(&ctx->q);
|
||||
err = quiche_conn_close(ctx->qconn, TRUE, 0, NULL, 0);
|
||||
if(err) {
|
||||
CURL_TRC_CF(data, cf, "error %d adding shutdown packet, "
|
||||
"aborting shutdown", err);
|
||||
result = CURLE_SEND_ERROR;
|
||||
goto out;
|
||||
}
|
||||
}
|
||||
|
||||
if(!Curl_bufq_is_empty(&ctx->q.sendbuf)) {
|
||||
CURL_TRC_CF(data, cf, "shutdown, flushing sendbuf");
|
||||
result = cf_flush_egress(cf, data);
|
||||
if(result)
|
||||
goto out;
|
||||
}
|
||||
|
||||
if(Curl_bufq_is_empty(&ctx->q.sendbuf)) {
|
||||
/* sent everything, quiche does not seem to support a graceful
|
||||
* shutdown waiting for a reply, so ware done. */
|
||||
CURL_TRC_CF(data, cf, "shutdown completely sent off, done");
|
||||
*done = TRUE;
|
||||
}
|
||||
else {
|
||||
CURL_TRC_CF(data, cf, "shutdown sending blocked");
|
||||
}
|
||||
|
||||
out:
|
||||
return result;
|
||||
}
|
||||
|
||||
static void cf_quiche_close(struct Curl_cfilter *cf, struct Curl_easy *data)
|
||||
{
|
||||
struct cf_quiche_ctx *ctx = cf->ctx;
|
||||
|
||||
if(ctx) {
|
||||
if(ctx->qconn) {
|
||||
vquic_ctx_update_time(&ctx->q);
|
||||
(void)quiche_conn_close(ctx->qconn, TRUE, 0, NULL, 0);
|
||||
/* flushing the egress is not a failsafe way to deliver all the
|
||||
outstanding packets, but we also don't want to get stuck here... */
|
||||
(void)cf_flush_egress(cf, data);
|
||||
}
|
||||
bool done;
|
||||
(void)cf_quiche_shutdown(cf, data, &done);
|
||||
cf_quiche_ctx_clear(ctx);
|
||||
}
|
||||
}
|
||||
@ -1580,7 +1623,7 @@ struct Curl_cftype Curl_cft_http3 = {
|
||||
cf_quiche_destroy,
|
||||
cf_quiche_connect,
|
||||
cf_quiche_close,
|
||||
Curl_cf_def_shutdown,
|
||||
cf_quiche_shutdown,
|
||||
Curl_cf_def_get_host,
|
||||
cf_quiche_adjust_pollset,
|
||||
cf_quiche_data_pending,
|
||||
|
@ -153,4 +153,24 @@ class TestShutdown:
|
||||
removes = [l for l in r.trace_lines if re.match(r'.*socket cb: socket \d+ REMOVED', l)]
|
||||
assert len(removes) == count, f'{removes}'
|
||||
|
||||
# check graceful shutdown on multiplexed http
|
||||
@pytest.mark.parametrize("proto", ['h2', 'h3'])
|
||||
def test_19_06_check_shutdown(self, env: Env, httpd, nghttpx, repeat, proto):
|
||||
if proto == 'h3' and not env.have_h3():
|
||||
pytest.skip("h3 not supported")
|
||||
if not env.curl_is_debug():
|
||||
pytest.skip('only works for curl debug builds')
|
||||
curl = CurlClient(env=env, run_env={
|
||||
'CURL_GRACEFUL_SHUTDOWN': '2000',
|
||||
'CURL_DEBUG': 'all'
|
||||
})
|
||||
url = f'https://{env.authority_for(env.domain1, proto)}/data.json?[0-1]'
|
||||
r = curl.http_download(urls=[url], alpn_proto=proto, with_tcpdump=True, extra_args=[
|
||||
'--parallel'
|
||||
])
|
||||
r.check_response(http_status=200, count=2)
|
||||
# check connection cache closings
|
||||
shutdowns = [l for l in r.trace_lines if re.match(r'.*CCACHE\] shutdown #\d+, done=1', l)]
|
||||
assert len(shutdowns) == 1, f'{shutdowns}'
|
||||
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user