2
0
mirror of https://github.com/curl/curl.git synced 2025-04-12 16:20:35 +08:00

http2: do flow window accounting for cancelled streams

- nghttp2 does not free connection level window flow for
  aborted streams
- when closing transfers, make sure that any buffered
  response data is "given back" to the flow control window
- add tests test_02_22 and test_02_23 to reproduce

Closes 
This commit is contained in:
Stefan Eissing 2023-04-28 11:27:25 +02:00 committed by Daniel Stenberg
parent b0edf0b7da
commit a9b7f72bc9
No known key found for this signature in database
GPG Key ID: 5CC908FDB71E12C2
3 changed files with 162 additions and 33 deletions

@ -160,6 +160,9 @@ static void cf_h2_ctx_free(struct cf_h2_ctx *ctx)
}
}
static CURLcode h2_progress_egress(struct Curl_cfilter *cf,
struct Curl_easy *data);
/**
* All about the H3 internals of a stream
*/
@ -272,6 +275,16 @@ static void http2_data_done(struct Curl_cfilter *cf,
stream->id, NGHTTP2_STREAM_CLOSED))
(void)nghttp2_session_send(ctx->h2);
}
if(!Curl_bufq_is_empty(&stream->recvbuf)) {
/* Anything in the recvbuf is still being counted
* in stream and connection window flow control. Need
* to free that space or the connection window might get
* exhausted eventually. */
nghttp2_session_consume(ctx->h2, stream->id,
Curl_bufq_len(&stream->recvbuf));
/* give WINDOW_UPATE a chance to be sent */
h2_progress_egress(cf, data);
}
/* -1 means unassigned and 0 means cleared */
if(nghttp2_session_get_stream_user_data(ctx->h2, stream->id)) {
@ -1825,7 +1838,7 @@ out:
ctx->h2, stream->id),
nghttp2_session_get_stream_effective_local_window_size(
ctx->h2, stream->id),
nghttp2_session_get_effective_local_window_size(ctx->h2),
nghttp2_session_get_local_window_size(ctx->h2),
HTTP2_HUGE_WINDOW_SIZE));
CF_DATA_RESTORE(cf, save);

@ -90,12 +90,13 @@ struct transfer {
FILE *out;
curl_off_t recv_size;
curl_off_t pause_at;
int started;
int paused;
int resumed;
int done;
};
static size_t transfer_count;
static size_t transfer_count = 1;
static struct transfer *transfers;
static struct transfer *get_transfer_for_easy(CURL *easy)
@ -117,7 +118,7 @@ static size_t my_write_cb(char *buf, size_t nitems, size_t buflen,
if(!t->resumed &&
t->recv_size < t->pause_at &&
((curl_off_t)(t->recv_size + (nitems * buflen)) >= t->pause_at)) {
fprintf(stderr, "transfer %d: PAUSE\n", t->idx);
fprintf(stderr, "[t-%d] PAUSE\n", t->idx);
t->paused = 1;
return CURL_WRITEFUNC_PAUSE;
}
@ -132,7 +133,7 @@ static size_t my_write_cb(char *buf, size_t nitems, size_t buflen,
nwritten = fwrite(buf, nitems, buflen, t->out);
if(nwritten < 0) {
fprintf(stderr, "transfer %d: write failure\n", t->idx);
fprintf(stderr, "[t-%d] write failure\n", t->idx);
return 0;
}
t->recv_size += nwritten;
@ -162,27 +163,65 @@ static int setup(CURL *hnd, const char *url, struct transfer *t)
return 0; /* all is good */
}
static void usage(const char *msg)
{
if(msg)
fprintf(stderr, "%s\n", msg);
fprintf(stderr,
"usage: [options] url\n"
" download a url with following options:\n"
" -m number max parallel downloads\n"
" -n number total downloads\n"
" -p number pause transfer after `number` response bytes\n"
);
}
/*
* Download a file over HTTP/2, take care of server push.
*/
int main(int argc, char *argv[])
{
CURLM *multi_handle;
int active_transfers;
struct CURLMsg *m;
const char *url;
size_t i;
long pause_offset;
size_t i, n, max_parallel = 1;
size_t active_transfers;
long pause_offset = 0;
int abort_paused = 0;
struct transfer *t;
int ch;
if(argc != 4) {
fprintf(stderr, "usage: h2-download count pause-offset url\n");
while((ch = getopt(argc, argv, "ahm:n:P:")) != -1) {
switch(ch) {
case 'h':
usage(NULL);
return 2;
break;
case 'a':
abort_paused = 1;
break;
case 'm':
max_parallel = (size_t)strtol(optarg, NULL, 10);
break;
case 'n':
transfer_count = (size_t)strtol(optarg, NULL, 10);
break;
case 'P':
pause_offset = strtol(optarg, NULL, 10);
break;
default:
usage("invalid option");
return 1;
}
}
argc -= optind;
argv += optind;
if(argc != 1) {
usage("not enough arguments");
return 2;
}
transfer_count = (size_t)strtol(argv[1], NULL, 10);
pause_offset = strtol(argv[2], NULL, 10);
url = argv[3];
url = argv[0];
transfers = calloc(transfer_count, sizeof(*transfers));
if(!transfers) {
@ -198,13 +237,20 @@ int main(int argc, char *argv[])
t = &transfers[i];
t->idx = (int)i;
t->pause_at = (curl_off_t)pause_offset * i;
}
n = (max_parallel < transfer_count)? max_parallel : transfer_count;
for(i = 0; i < n; ++i) {
t = &transfers[i];
t->easy = curl_easy_init();
if(!t->easy || setup(t->easy, url, t)) {
fprintf(stderr, "setup of transfer #%d failed\n", (int)i);
fprintf(stderr, "[t-%d] FAILED setup\n", (int)i);
return 1;
}
curl_multi_add_handle(multi_handle, t->easy);
t->started = 1;
++active_transfers;
fprintf(stderr, "[t-%d] STARTED\n", t->idx);
}
do {
@ -220,11 +266,6 @@ int main(int argc, char *argv[])
if(mc)
break;
/*
* A little caution when doing server push is that libcurl itself has
* created and added one or more easy handles but we need to clean them up
* when we are done.
*/
do {
int msgq = 0;
m = curl_multi_info_read(multi_handle, &msgq);
@ -240,18 +281,53 @@ int main(int argc, char *argv[])
curl_easy_cleanup(e);
}
else {
/* nothing happending, resume one paused transfer if there is one */
for(i = 0; i < transfer_count; ++i) {
t = &transfers[i];
if(!t->done && t->paused) {
t->resumed = 1;
t->paused = 0;
curl_easy_pause(t->easy, CURLPAUSE_CONT);
fprintf(stderr, "transfer %d: RESUME\n", t->idx);
break;
/* nothing happening, maintenance */
if(abort_paused) {
/* abort paused transfers */
for(i = 0; i < transfer_count; ++i) {
t = &transfers[i];
if(!t->done && t->paused && t->easy) {
curl_multi_remove_handle(multi_handle, t->easy);
t->done = 1;
active_transfers--;
fprintf(stderr, "[t-%d] ABORTED\n", t->idx);
}
}
}
else {
/* resume one paused transfer */
for(i = 0; i < transfer_count; ++i) {
t = &transfers[i];
if(!t->done && t->paused) {
t->resumed = 1;
t->paused = 0;
curl_easy_pause(t->easy, CURLPAUSE_CONT);
fprintf(stderr, "[t-%d] RESUMED\n", t->idx);
break;
}
}
}
while(active_transfers < max_parallel) {
for(i = 0; i < transfer_count; ++i) {
t = &transfers[i];
if(!t->started) {
t->easy = curl_easy_init();
if(!t->easy || setup(t->easy, url, t)) {
fprintf(stderr, "[t-%d] FAILEED setup\n", (int)i);
return 1;
}
curl_multi_add_handle(multi_handle, t->easy);
t->started = 1;
++active_transfers;
fprintf(stderr, "[t-%d] STARTED\n", t->idx);
break;
}
}
/* all started */
if(i == transfer_count)
break;
}
}
} while(m);

@ -281,25 +281,65 @@ class TestDownload:
assert httpd.stop()
assert httpd.start()
# download via lib client, pause/resume at different offsets
# download via lib client, 1 at a time, pause/resume at different offsets
@pytest.mark.parametrize("pause_offset", [0, 10*1024, 100*1023, 640000])
def test_02_21_h2_lib_download(self, env: Env, httpd, nghttpx, pause_offset, repeat):
def test_02_21_h2_lib_serial(self, env: Env, httpd, nghttpx, pause_offset, repeat):
count = 10
docname = 'data-10m'
url = f'https://localhost:{env.https_port}/{docname}'
client = LocalClient(name='h2-download', env=env)
if not client.exists():
pytest.skip(f'example client not built: {client.name}')
r = client.run(args=[str(count), str(pause_offset), url])
r = client.run(args=[
'-n', f'{count}', '-P', f'{pause_offset}', url
])
r.check_exit_code(0)
srcfile = os.path.join(httpd.docs_dir, docname)
self.check_downloads(client, srcfile, count)
def check_downloads(self, client, srcfile: str, count: int):
# download via lib client, several at a time, pause/resume
@pytest.mark.parametrize("pause_offset", [100*1023])
def test_02_22_h2_lib_parallel_resume(self, env: Env, httpd, nghttpx, pause_offset, repeat):
count = 10
max_parallel = 5
docname = 'data-10m'
url = f'https://localhost:{env.https_port}/{docname}'
client = LocalClient(name='h2-download', env=env)
if not client.exists():
pytest.skip(f'example client not built: {client.name}')
r = client.run(args=[
'-n', f'{count}', '-m', f'{max_parallel}',
'-P', f'{pause_offset}', url
])
r.check_exit_code(0)
srcfile = os.path.join(httpd.docs_dir, docname)
self.check_downloads(client, srcfile, count)
# download, several at a time, pause and abort paused
@pytest.mark.parametrize("pause_offset", [100*1023])
def test_02_23_h2_lib_parallel_abort(self, env: Env, httpd, nghttpx, pause_offset, repeat):
count = 200
max_parallel = 100
docname = 'data-10m'
url = f'https://localhost:{env.https_port}/{docname}'
client = LocalClient(name='h2-download', env=env)
if not client.exists():
pytest.skip(f'example client not built: {client.name}')
r = client.run(args=[
'-n', f'{count}', '-m', f'{max_parallel}', '-a',
'-P', f'{pause_offset}', url
])
r.check_exit_code(0)
srcfile = os.path.join(httpd.docs_dir, docname)
# downloads should be there, but not necessarily complete
self.check_downloads(client, srcfile, count, complete=False)
def check_downloads(self, client, srcfile: str, count: int,
complete: bool = True):
for i in range(count):
dfile = client.download_file(i)
assert os.path.exists(dfile)
if not filecmp.cmp(srcfile, dfile, shallow=False):
if complete and not filecmp.cmp(srcfile, dfile, shallow=False):
diff = "".join(difflib.unified_diff(a=open(srcfile).readlines(),
b=open(dfile).readlines(),
fromfile=srcfile,