diff --git a/lib/bufq.c b/lib/bufq.c index e0d7726613..f0ab6bb75c 100644 --- a/lib/bufq.c +++ b/lib/bufq.c @@ -287,12 +287,6 @@ bool Curl_bufq_is_full(const struct bufq *q) return chunk_is_full(q->tail); } -static size_t data_pass_size(struct bufq *q) -{ - (void)q; - return 4*1024; -} - static struct buf_chunk *get_spare(struct bufq *q) { struct buf_chunk *chunk = NULL; @@ -426,9 +420,12 @@ ssize_t Curl_bufq_read(struct bufq *q, unsigned char *buf, size_t len, return nread; } -bool Curl_bufq_peek(const struct bufq *q, +bool Curl_bufq_peek(struct bufq *q, const unsigned char **pbuf, size_t *plen) { + if(q->head && chunk_is_empty(q->head)) { + prune_head(q); + } if(q->head && !chunk_is_empty(q->head)) { chunk_peek(q->head, pbuf, plen); return TRUE; @@ -438,7 +435,7 @@ bool Curl_bufq_peek(const struct bufq *q, return FALSE; } -bool Curl_bufq_peek_at(const struct bufq *q, size_t offset, +bool Curl_bufq_peek_at(struct bufq *q, size_t offset, const unsigned char **pbuf, size_t *plen) { struct buf_chunk *c = q->head; @@ -502,13 +499,11 @@ ssize_t Curl_bufq_write_pass(struct bufq *q, CURLcode *err) { ssize_t nwritten = 0, n; - bool prefer_direct = (len >= data_pass_size(q)); *err = CURLE_OK; while(len) { - if(Curl_bufq_is_full(q) || (!Curl_bufq_is_empty(q) && prefer_direct)) { - /* try to make room in case we are full - * or empty the buffer when adding "large" data */ + if(Curl_bufq_is_full(q)) { + /* try to make room in case we are full */ n = Curl_bufq_pass(q, writer, writer_ctx, err); if(n < 0) { if(*err != CURLE_AGAIN) { @@ -519,22 +514,6 @@ ssize_t Curl_bufq_write_pass(struct bufq *q, } } - if(Curl_bufq_is_empty(q) && prefer_direct) { - /* empty and `data` is "large", try passing directly */ - n = writer(writer_ctx, buf, len, err); - if(n < 0) { - if(*err != CURLE_AGAIN) { - /* real error, fail */ - return -1; - } - /* passing would block */ - n = 0; - } - buf += (size_t)n; - len -= (size_t)n; - nwritten += (size_t)n; - } - if(len) { /* Add whatever is remaining now to bufq */ n = Curl_bufq_write(q, buf, len, err); diff --git a/lib/bufq.h b/lib/bufq.h index a4ca21ecea..09af226a9e 100644 --- a/lib/bufq.h +++ b/lib/bufq.h @@ -201,10 +201,10 @@ ssize_t Curl_bufq_read(struct bufq *q, unsigned char *buf, size_t len, * Repeated calls return the same information until the buffer queue * is modified, see `Curl_bufq_skip()`` */ -bool Curl_bufq_peek(const struct bufq *q, +bool Curl_bufq_peek(struct bufq *q, const unsigned char **pbuf, size_t *plen); -bool Curl_bufq_peek_at(const struct bufq *q, size_t offset, +bool Curl_bufq_peek_at(struct bufq *q, size_t offset, const unsigned char **pbuf, size_t *plen); /** diff --git a/lib/http.c b/lib/http.c index bcaa794871..cc2f5f0573 100644 --- a/lib/http.c +++ b/lib/http.c @@ -4556,7 +4556,8 @@ CURLcode Curl_http_req_make(struct http_req **preq, if(!req->path) goto out; } - Curl_dynhds_init(&req->headers, 128, DYN_H2_HEADERS); + Curl_dynhds_init(&req->headers, 0, DYN_H2_HEADERS); + Curl_dynhds_init(&req->trailers, 0, DYN_H2_TRAILERS); result = CURLE_OK; out: @@ -4573,6 +4574,7 @@ void Curl_http_req_free(struct http_req *req) free(req->authority); free(req->path); Curl_dynhds_free(&req->headers); + Curl_dynhds_free(&req->trailers); free(req); } } @@ -4594,7 +4596,8 @@ CURLcode Curl_http_resp_make(struct http_resp **presp, if(!resp->description) goto out; } - Curl_dynhds_init(&resp->headers, 128, DYN_H2_HEADERS); + Curl_dynhds_init(&resp->headers, 0, DYN_H2_HEADERS); + Curl_dynhds_init(&resp->trailers, 0, DYN_H2_TRAILERS); result = CURLE_OK; out: @@ -4609,6 +4612,7 @@ void Curl_http_resp_free(struct http_resp *resp) if(resp) { free(resp->description); Curl_dynhds_free(&resp->headers); + Curl_dynhds_free(&resp->trailers); if(resp->prev) Curl_http_resp_free(resp->prev); free(resp); diff --git a/lib/http.h b/lib/http.h index 5f4fcb9043..b9a2e61492 100644 --- a/lib/http.h +++ b/lib/http.h @@ -29,6 +29,7 @@ #include #endif +#include "bufq.h" #include "dynhds.h" #include "ws.h" @@ -227,14 +228,12 @@ struct HTTP { #ifdef USE_NGHTTP2 /*********** for HTTP/2 we store stream-local data here *************/ int32_t stream_id; /* stream we are interested in */ - - /* We store non-final and final response headers here, per-stream */ - struct dynbuf header_recvbuf; - size_t nread_header_recvbuf; /* number of bytes in header_recvbuf fed into - upper layer */ - struct dynbuf trailer_recvbuf; - const uint8_t *pausedata; /* pointer to data received in on_data_chunk */ - size_t pauselen; /* the number of bytes left in data */ + struct bufq h2_sendbuf; /* request body data buffere for sending */ + size_t h2_send_hds_len; /* amount of bytes in first cf_send() that + are header bytes. Or 0 if not known. */ + struct bufq h2_recvbuf; + size_t h2_recv_hds_len; /* how many bytes in recvbuf are headers */ + struct dynhds resp_trailers; bool close_handled; /* TRUE if stream closure is handled by libcurl */ char **push_headers; /* allocated array */ @@ -346,6 +345,7 @@ struct http_req { char *authority; char *path; struct dynhds headers; + struct dynhds trailers; }; /** @@ -366,6 +366,7 @@ struct http_resp { int status; char *description; struct dynhds headers; + struct dynhds trailers; struct http_resp *prev; }; diff --git a/lib/http2.c b/lib/http2.c index b0ce87d987..f43462d705 100644 --- a/lib/http2.c +++ b/lib/http2.c @@ -27,6 +27,7 @@ #ifdef USE_NGHTTP2 #include #include "urldata.h" +#include "bufq.h" #include "http2.h" #include "http.h" #include "sendf.h" @@ -48,8 +49,6 @@ #include "curl_memory.h" #include "memdebug.h" -#define H2_BUFSIZE 32768 - #if (NGHTTP2_VERSION_NUM < 0x010c00) #error too old nghttp2 version, upgrade! #endif @@ -62,8 +61,23 @@ #define NGHTTP2_HAS_SET_LOCAL_WINDOW_SIZE 1 #endif -#define HTTP2_HUGE_WINDOW_SIZE (32 * 1024 * 1024) /* 32 MB */ +/* buffer dimensioning: + * use 16K as chunk size, as that fits H2 DATA frames well */ +#define H2_CHUNK_SIZE (16 * 1024) +/* this is how much we want "in flight" for a stream */ +#define H2_STREAM_WINDOW_SIZE (512 * 1024) +/* on receving from TLS, we prep for holding a full stream window */ +#define H2_NW_RECV_CHUNKS (H2_STREAM_WINDOW_SIZE / H2_CHUNK_SIZE) +/* on send into TLS, we just want to accumulate small frames */ +#define H2_NW_SEND_CHUNKS 1 +/* stream recv/send chunks are a result of window / chunk sizes */ +#define H2_STREAM_RECV_CHUNKS (H2_STREAM_WINDOW_SIZE / H2_CHUNK_SIZE) +#define H2_STREAM_SEND_CHUNKS (H2_STREAM_WINDOW_SIZE / H2_CHUNK_SIZE) +/* spare chunks we keep for a full window */ +#define H2_STREAM_POOL_SPARES (H2_STREAM_WINDOW_SIZE / H2_CHUNK_SIZE) + +#define HTTP2_HUGE_WINDOW_SIZE (16 * H2_STREAM_WINDOW_SIZE) #define H2_SETTINGS_IV_LEN 3 #define H2_BINSETTINGS_LEN 80 @@ -75,7 +89,7 @@ static int populate_settings(nghttp2_settings_entry *iv, iv[0].value = Curl_multi_max_concurrent_streams(data->multi); iv[1].settings_id = NGHTTP2_SETTINGS_INITIAL_WINDOW_SIZE; - iv[1].value = HTTP2_HUGE_WINDOW_SIZE; + iv[1].value = H2_STREAM_WINDOW_SIZE; iv[2].settings_id = NGHTTP2_SETTINGS_ENABLE_PUSH; iv[2].value = data->multi->push_cb != NULL; @@ -101,22 +115,14 @@ struct cf_h2_ctx { /* The easy handle used in the current filter call, cleared at return */ struct cf_call_data call_data; - char *inbuf; /* buffer to receive data from underlying socket */ - size_t inbuflen; /* number of bytes filled in inbuf */ - size_t nread_inbuf; /* number of bytes read from in inbuf */ + struct bufq inbufq; /* network input */ + struct bufq outbufq; /* network output */ + struct bufc_pool stream_bufcp; /* spares for stream buffers */ - struct dynbuf outbuf; - - /* We need separate buffer for transmission and reception because we - may call nghttp2_session_send() after the - nghttp2_session_mem_recv() but mem buffer is still not full. In - this case, we wrongly sends the content of mem buffer if we share - them for both cases. */ - int32_t pause_stream_id; /* stream ID which paused - nghttp2_session_mem_recv */ size_t drain_total; /* sum of all stream's UrlState.drain */ int32_t goaway_error; int32_t last_stream_id; + BIT(conn_closed); BIT(goaway); BIT(enable_push); }; @@ -133,8 +139,9 @@ static void cf_h2_ctx_clear(struct cf_h2_ctx *ctx) if(ctx->h2) { nghttp2_session_del(ctx->h2); } - free(ctx->inbuf); - Curl_dyn_free(&ctx->outbuf); + Curl_bufq_free(&ctx->inbufq); + Curl_bufq_free(&ctx->outbufq); + Curl_bufcp_free(&ctx->stream_bufcp); memset(ctx, 0, sizeof(*ctx)); ctx->call_data = save; } @@ -151,22 +158,42 @@ static int h2_client_new(struct Curl_cfilter *cf, nghttp2_session_callbacks *cbs) { struct cf_h2_ctx *ctx = cf->ctx; - -#if NGHTTP2_VERSION_NUM < 0x013200 - /* before 1.50.0 */ - return nghttp2_session_client_new(&ctx->h2, cbs, cf); -#else nghttp2_option *o; + int rc = nghttp2_option_new(&o); if(rc) return rc; + /* We handle window updates ourself to enfore buffer limits */ + nghttp2_option_set_no_auto_window_update(o, 1); +#if NGHTTP2_VERSION_NUM >= 0x013200 + /* with 1.50.0 */ /* turn off RFC 9113 leading and trailing white spaces validation against HTTP field value. */ nghttp2_option_set_no_rfc9113_leading_and_trailing_ws_validation(o, 1); +#endif rc = nghttp2_session_client_new2(&ctx->h2, cbs, cf, o); nghttp2_option_del(o); return rc; -#endif +} + +static ssize_t nw_in_reader(void *reader_ctx, + unsigned char *buf, size_t buflen, + CURLcode *err) +{ + struct Curl_cfilter *cf = reader_ctx; + struct Curl_easy *data = CF_DATA_CURRENT(cf); + + return Curl_conn_cf_recv(cf->next, data, (char *)buf, buflen, err); +} + +static ssize_t nw_out_writer(void *writer_ctx, + const unsigned char *buf, size_t buflen, + CURLcode *err) +{ + struct Curl_cfilter *cf = writer_ctx; + struct Curl_easy *data = CF_DATA_CURRENT(cf); + + return Curl_conn_cf_send(cf->next, data, (const char *)buf, buflen, err); } static ssize_t send_callback(nghttp2_session *h2, @@ -204,6 +231,7 @@ static void multi_connchanged(struct Curl_multi *multi) static CURLcode http2_data_setup(struct Curl_cfilter *cf, struct Curl_easy *data) { + struct cf_h2_ctx *ctx = cf->ctx; struct HTTP *stream = data->req.p.http; (void)cf; @@ -212,22 +240,19 @@ static CURLcode http2_data_setup(struct Curl_cfilter *cf, stream->stream_id = -1; - Curl_dyn_init(&stream->header_recvbuf, DYN_H2_HEADERS); - Curl_dyn_init(&stream->trailer_recvbuf, DYN_H2_TRAILERS); - + Curl_bufq_initp(&stream->h2_sendbuf, &ctx->stream_bufcp, + H2_STREAM_SEND_CHUNKS, BUFQ_OPT_NONE); + Curl_bufq_initp(&stream->h2_recvbuf, &ctx->stream_bufcp, + H2_STREAM_RECV_CHUNKS, BUFQ_OPT_SOFT_LIMIT); + Curl_dynhds_init(&stream->resp_trailers, 0, DYN_H2_TRAILERS); + stream->h2_send_hds_len = 0; + stream->h2_recv_hds_len = 0; stream->bodystarted = FALSE; stream->status_code = -1; - stream->pausedata = NULL; - stream->pauselen = 0; stream->closed = FALSE; stream->close_handled = FALSE; - stream->memlen = 0; stream->error = NGHTTP2_NO_ERROR; stream->upload_left = 0; - stream->upload_mem = NULL; - stream->upload_len = 0; - stream->mem = data->state.buffer; - stream->len = data->set.buffer_size; return CURLE_OK; } @@ -246,11 +271,10 @@ static CURLcode cf_h2_ctx_init(struct Curl_cfilter *cf, nghttp2_session_callbacks *cbs = NULL; DEBUGASSERT(!ctx->h2); - ctx->inbuf = malloc(H2_BUFSIZE); - if(!ctx->inbuf) - goto out; - /* we want to aggregate small frames, SETTINGS, PRIO, UPDATES */ - Curl_dyn_init(&ctx->outbuf, 4*1024); + Curl_bufcp_init(&ctx->stream_bufcp, H2_CHUNK_SIZE, H2_STREAM_POOL_SPARES); + Curl_bufq_initp(&ctx->inbufq, &ctx->stream_bufcp, H2_NW_RECV_CHUNKS, 0); + Curl_bufq_initp(&ctx->outbufq, &ctx->stream_bufcp, H2_NW_SEND_CHUNKS, 0); + ctx->last_stream_id = 2147483647; rc = nghttp2_session_callbacks_new(&cbs); if(rc) { @@ -345,19 +369,14 @@ out: return result; } -static CURLcode h2_session_send(struct Curl_cfilter *cf, - struct Curl_easy *data); -static int h2_process_pending_input(struct Curl_cfilter *cf, - struct Curl_easy *data, - CURLcode *err); - /* * http2_stream_free() free HTTP2 stream related data */ static void http2_stream_free(struct HTTP *stream) { if(stream) { - Curl_dyn_free(&stream->header_recvbuf); + Curl_bufq_free(&stream->h2_sendbuf); + Curl_bufq_free(&stream->h2_recvbuf); for(; stream->push_headers_used > 0; --stream->push_headers_used) { free(stream->push_headers[stream->push_headers_used - 1]); } @@ -375,6 +394,54 @@ static int should_close_session(struct cf_h2_ctx *ctx) !nghttp2_session_want_write(ctx->h2); } +/* + * Processes pending input left in network input buffer. + * This function returns 0 if it succeeds, or -1 and error code will + * be assigned to *err. + */ +static int h2_process_pending_input(struct Curl_cfilter *cf, + struct Curl_easy *data, + CURLcode *err) +{ + struct cf_h2_ctx *ctx = cf->ctx; + const unsigned char *buf; + size_t blen; + ssize_t rv; + + while(Curl_bufq_peek(&ctx->inbufq, &buf, &blen)) { + + rv = nghttp2_session_mem_recv(ctx->h2, (const uint8_t *)buf, blen); + DEBUGF(LOG_CF(data, cf, + "fed %zu bytes from nw to nghttp2 -> %zd", blen, rv)); + if(rv < 0) { + failf(data, + "process_pending_input: nghttp2_session_mem_recv() returned " + "%zd:%s", rv, nghttp2_strerror((int)rv)); + *err = CURLE_RECV_ERROR; + return -1; + } + Curl_bufq_skip(&ctx->inbufq, (size_t)rv); + if(Curl_bufq_is_empty(&ctx->inbufq)) { + DEBUGF(LOG_CF(data, cf, "all data in connection buffer processed")); + break; + } + else { + DEBUGF(LOG_CF(data, cf, "process_pending_input: %zu bytes left " + "in connection buffer", Curl_bufq_len(&ctx->inbufq))); + } + } + + if(nghttp2_session_check_request_allowed(ctx->h2) == 0) { + /* No more requests are allowed in the current session, so + the connection may not be reused. This is set when a + GOAWAY frame has been received or when the limit of stream + identifiers has been reached. */ + connclose(cf->conn, "http/2: No new requests allowed"); + } + + return 0; +} + /* * The server may send us data at any point (e.g. PING frames). Therefore, * we cannot assume that an HTTP/2 socket is dead just because it is readable. @@ -401,13 +468,10 @@ static bool http2_connisalive(struct Curl_cfilter *cf, struct Curl_easy *data, *input_pending = FALSE; Curl_attach_connection(data, cf->conn); - nread = Curl_conn_cf_recv(cf->next, data, - ctx->inbuf, H2_BUFSIZE, &result); + nread = Curl_bufq_slurp(&ctx->inbufq, nw_in_reader, cf, &result); if(nread != -1) { - DEBUGF(LOG_CF(data, cf, "%d bytes stray data read before trying " - "h2 connection", (int)nread)); - ctx->nread_inbuf = 0; - ctx->inbuflen = nread; + DEBUGF(LOG_CF(data, cf, "%zd bytes stray data read before trying " + "h2 connection", nread)); if(h2_process_pending_input(cf, data, &result) < 0) /* immediate error, considered dead */ alive = FALSE; @@ -456,30 +520,23 @@ void Curl_http2_ver(char *p, size_t len) (void)msnprintf(p, len, "nghttp2/%s", h2->version_str); } -static CURLcode flush_output(struct Curl_cfilter *cf, +static CURLcode nw_out_flush(struct Curl_cfilter *cf, struct Curl_easy *data) { struct cf_h2_ctx *ctx = cf->ctx; - size_t buflen = Curl_dyn_len(&ctx->outbuf); - ssize_t written; + ssize_t nwritten; CURLcode result; - if(!buflen) + (void)data; + if(Curl_bufq_is_empty(&ctx->outbufq)) return CURLE_OK; - DEBUGF(LOG_CF(data, cf, "h2 conn flush %zu bytes", buflen)); - written = Curl_conn_cf_send(cf->next, data, Curl_dyn_ptr(&ctx->outbuf), - buflen, &result); - if(written < 0) { + DEBUGF(LOG_CF(data, cf, "h2 conn flush %zu bytes", + Curl_bufq_len(&ctx->outbufq))); + nwritten = Curl_bufq_pass(&ctx->outbufq, nw_out_writer, cf, &result); + if(nwritten < 0 && result != CURLE_AGAIN) { return result; } - if((size_t)written < buflen) { - Curl_dyn_tail(&ctx->outbuf, buflen - (size_t)written); - return CURLE_AGAIN; - } - else { - Curl_dyn_reset(&ctx->outbuf); - } return CURLE_OK; } @@ -495,49 +552,27 @@ static ssize_t send_callback(nghttp2_session *h2, struct Curl_cfilter *cf = userp; struct cf_h2_ctx *ctx = cf->ctx; struct Curl_easy *data = CF_DATA_CURRENT(cf); - ssize_t written; + ssize_t nwritten; CURLcode result = CURLE_OK; - size_t buflen = Curl_dyn_len(&ctx->outbuf); (void)h2; (void)flags; DEBUGASSERT(data); - if(blen < 1024 && (buflen + blen + 1 < ctx->outbuf.toobig)) { - result = Curl_dyn_addn(&ctx->outbuf, buf, blen); - if(result) { - failf(data, "Failed to add data to output buffer"); - return NGHTTP2_ERR_CALLBACK_FAILURE; + nwritten = Curl_bufq_write_pass(&ctx->outbufq, buf, blen, + nw_out_writer, cf, &result); + if(nwritten < 0) { + if(result == CURLE_AGAIN) { + return NGHTTP2_ERR_WOULDBLOCK; } - return blen; - } - if(buflen) { - /* not adding, flush buffer */ - result = flush_output(cf, data); - if(result) { - if(result == CURLE_AGAIN) { - return NGHTTP2_ERR_WOULDBLOCK; - } - failf(data, "Failed sending HTTP2 data"); - return NGHTTP2_ERR_CALLBACK_FAILURE; - } - } - - DEBUGF(LOG_CF(data, cf, "h2 conn send %zu bytes", blen)); - written = Curl_conn_cf_send(cf->next, data, buf, blen, &result); - if(result == CURLE_AGAIN) { - return NGHTTP2_ERR_WOULDBLOCK; - } - - if(written == -1) { failf(data, "Failed sending HTTP2 data"); return NGHTTP2_ERR_CALLBACK_FAILURE; } - if(!written) + if(!nwritten) return NGHTTP2_ERR_WOULDBLOCK; - return written; + return nwritten; } @@ -779,17 +814,21 @@ static int push_promise(struct Curl_cfilter *cf, } rv = nghttp2_session_set_stream_user_data(ctx->h2, - frame->promised_stream_id, + newstream->stream_id, newhandle); if(rv) { infof(data, "failed to set user_data for stream %u", - frame->promised_stream_id); + newstream->stream_id); DEBUGASSERT(0); rv = CURL_PUSH_DENY; goto fail; } - Curl_dyn_init(&newstream->header_recvbuf, DYN_H2_HEADERS); - Curl_dyn_init(&newstream->trailer_recvbuf, DYN_H2_TRAILERS); + Curl_bufq_initp(&newstream->h2_sendbuf, &ctx->stream_bufcp, + H2_STREAM_SEND_CHUNKS, BUFQ_OPT_NONE); + Curl_bufq_initp(&newstream->h2_recvbuf, &ctx->stream_bufcp, + H2_STREAM_RECV_CHUNKS, BUFQ_OPT_SOFT_LIMIT); + newstream->h2_send_hds_len = 0; + Curl_dynhds_init(&newstream->resp_trailers, 0, DYN_H2_TRAILERS); } else { DEBUGF(LOG_CF(data, cf, "Got PUSH_PROMISE, ignore it")); @@ -799,6 +838,25 @@ static int push_promise(struct Curl_cfilter *cf, return rv; } +static CURLcode recvbuf_write_hds(struct Curl_cfilter *cf, + struct Curl_easy *data, + const char *buf, size_t blen) +{ + struct HTTP *stream = data->req.p.http; + ssize_t nwritten; + CURLcode result; + + (void)cf; + nwritten = Curl_bufq_write(&stream->h2_recvbuf, + (const unsigned char *)buf, blen, &result); + if(nwritten < 0) + return result; + stream->h2_recv_hds_len += (size_t)nwritten; + /* TODO: make sure recvbuf is more flexible with overflow */ + DEBUGASSERT((size_t)nwritten == blen); + return CURLE_OK; +} + static int on_frame_recv(nghttp2_session *session, const nghttp2_frame *frame, void *userp) { @@ -808,7 +866,6 @@ static int on_frame_recv(nghttp2_session *session, const nghttp2_frame *frame, struct HTTP *stream = NULL; struct Curl_easy *data = CF_DATA_CURRENT(cf); int rv; - size_t left, ncopy; int32_t stream_id = frame->hd.stream_id; CURLcode result; @@ -841,6 +898,8 @@ static int on_frame_recv(nghttp2_session *session, const nghttp2_frame *frame, ctx->goaway_error = frame->goaway.error_code; ctx->last_stream_id = frame->goaway.last_stream_id; if(data) { + DEBUGF(LOG_CF(data, cf, "recv GOAWAY, error=%d, last_stream=%u", + ctx->goaway_error, ctx->last_stream_id)); infof(data, "recveived GOAWAY, error=%d, last_stream=%u", ctx->goaway_error, ctx->last_stream_id); multi_connchanged(data->multi); @@ -882,7 +941,7 @@ static int on_frame_recv(nghttp2_session *session, const nghttp2_frame *frame, if(frame->hd.flags & NGHTTP2_FLAG_END_STREAM) { /* Stream has ended. If there is pending data, ensure that read will occur to consume it. */ - if(!data->state.drain && stream->memlen) { + if(!data->state.drain && !Curl_bufq_is_empty(&stream->h2_recvbuf)) { drain_this(cf, data_s); Curl_expire(data, 0, EXPIRE_RUN_NOW); } @@ -908,29 +967,16 @@ static int on_frame_recv(nghttp2_session *session, const nghttp2_frame *frame, stream->status_code = -1; } - result = Curl_dyn_addn(&stream->header_recvbuf, STRCONST("\r\n")); + result = recvbuf_write_hds(cf, data_s, STRCONST("\r\n")); if(result) return NGHTTP2_ERR_CALLBACK_FAILURE; - left = Curl_dyn_len(&stream->header_recvbuf) - - stream->nread_header_recvbuf; - ncopy = CURLMIN(stream->len, left); - - memcpy(&stream->mem[stream->memlen], - Curl_dyn_ptr(&stream->header_recvbuf) + - stream->nread_header_recvbuf, - ncopy); - stream->nread_header_recvbuf += ncopy; - - DEBUGASSERT(stream->mem); - DEBUGF(LOG_CF(data_s, cf, "[h2sid=%u] %zu header bytes, at %p", - stream_id, ncopy, (void *)stream->mem)); - - stream->len -= ncopy; - stream->memlen += ncopy; - - drain_this(cf, data_s); - Curl_expire(data_s, 0, EXPIRE_RUN_NOW); + DEBUGF(LOG_CF(data_s, cf, "[h2sid=%u] %zu header bytes", + stream_id, Curl_bufq_len(&stream->h2_recvbuf))); + if(CF_DATA_CURRENT(cf) != data_s) { + drain_this(cf, data_s); + Curl_expire(data_s, 0, EXPIRE_RUN_NOW); + } break; case NGHTTP2_PUSH_PROMISE: DEBUGF(LOG_CF(data_s, cf, "[h2sid=%u] recv PUSH_PROMISE", stream_id)); @@ -980,10 +1026,10 @@ static int on_data_chunk_recv(nghttp2_session *session, uint8_t flags, const uint8_t *mem, size_t len, void *userp) { struct Curl_cfilter *cf = userp; - struct cf_h2_ctx *ctx = cf->ctx; struct HTTP *stream; struct Curl_easy *data_s; - size_t nread; + ssize_t nwritten; + CURLcode result; (void)flags; DEBUGASSERT(stream_id); /* should never be a zero stream ID here */ @@ -997,6 +1043,8 @@ static int on_data_chunk_recv(nghttp2_session *session, uint8_t flags, in the pipeline. Silently ignore. */ DEBUGF(LOG_CF(CF_DATA_CURRENT(cf), cf, "[h2sid=%u] Data for unknown", stream_id)); + /* consumed explicitly as no one will read it */ + nghttp2_session_consume(session, stream_id, len); return 0; } @@ -1004,11 +1052,13 @@ static int on_data_chunk_recv(nghttp2_session *session, uint8_t flags, if(!stream) return NGHTTP2_ERR_CALLBACK_FAILURE; - nread = CURLMIN(stream->len, len); - memcpy(&stream->mem[stream->memlen], mem, nread); + nwritten = Curl_bufq_write(&stream->h2_recvbuf, mem, len, &result); + if(nwritten < 0) { + if(result != CURLE_AGAIN) + return NGHTTP2_ERR_CALLBACK_FAILURE; - stream->len -= nread; - stream->memlen += nread; + nwritten = 0; + } /* if we receive data for another handle, wake that up */ if(CF_DATA_CURRENT(cf) != data_s) { @@ -1016,20 +1066,10 @@ static int on_data_chunk_recv(nghttp2_session *session, uint8_t flags, Curl_expire(data_s, 0, EXPIRE_RUN_NOW); } - DEBUGF(LOG_CF(data_s, cf, "[h2sid=%u] %zu DATA recvd, " - "(buffer now holds %zu, %zu still free in %p)", - stream_id, nread, - stream->memlen, stream->len, (void *)stream->mem)); - - if(nread < len) { - stream->pausedata = mem + nread; - stream->pauselen = len - nread; - DEBUGF(LOG_CF(data_s, cf, "[h2sid=%u] %zu not recvd -> NGHTTP2_ERR_PAUSE", - stream_id, len - nread)); - ctx->pause_stream_id = stream_id; - drain_this(cf, data_s); - return NGHTTP2_ERR_PAUSE; - } + DEBUGASSERT((size_t)nwritten == len); + DEBUGF(LOG_CF(data_s, cf, "[h2sid=%u] %zd/%zu DATA recvd, " + "(buffer now holds %zu)", + stream_id, nwritten, len, Curl_bufq_len(&stream->h2_recvbuf))); return 0; } @@ -1038,7 +1078,6 @@ static int on_stream_close(nghttp2_session *session, int32_t stream_id, uint32_t error_code, void *userp) { struct Curl_cfilter *cf = userp; - struct cf_h2_ctx *ctx = cf->ctx; struct Curl_easy *data_s; struct HTTP *stream; int rv; @@ -1074,11 +1113,6 @@ static int on_stream_close(nghttp2_session *session, int32_t stream_id, stream_id); DEBUGASSERT(0); } - if(stream_id == ctx->pause_stream_id) { - DEBUGF(LOG_CF(data_s, cf, "[h2sid=%u] closed the pause stream", - stream_id)); - ctx->pause_stream_id = 0; - } DEBUGF(LOG_CF(data_s, cf, "[h2sid=%u] closed now", stream_id)); return 0; } @@ -1110,33 +1144,6 @@ static int on_begin_headers(nghttp2_session *session, return 0; } -/* Decode HTTP status code. Returns -1 if no valid status code was - decoded. */ -static int decode_status_code(const uint8_t *value, size_t len) -{ - int i; - int res; - - if(len != 3) { - return -1; - } - - res = 0; - - for(i = 0; i < 3; ++i) { - char c = value[i]; - - if(c < '0' || c > '9') { - return -1; - } - - res *= 10; - res += c - '0'; - } - - return res; -} - /* frame->hd.type is either NGHTTP2_HEADERS or NGHTTP2_PUSH_PROMISE */ static int on_header(nghttp2_session *session, const nghttp2_frame *frame, const uint8_t *name, size_t namelen, @@ -1234,9 +1241,9 @@ static int on_header(nghttp2_session *session, const nghttp2_frame *frame, stream->stream_id, (int)namelen, name, (int)valuelen, value)); - result = Curl_dyn_addf(&stream->trailer_recvbuf, - "%.*s: %.*s\r\n", (int)namelen, name, - (int)valuelen, value); + result = Curl_dynhds_add(&stream->resp_trailers, + (const char *)name, namelen, + (const char *)value, valuelen); if(result) return NGHTTP2_ERR_CALLBACK_FAILURE; @@ -1245,25 +1252,25 @@ static int on_header(nghttp2_session *session, const nghttp2_frame *frame, if(namelen == sizeof(H2H3_PSEUDO_STATUS) - 1 && memcmp(H2H3_PSEUDO_STATUS, name, namelen) == 0) { - /* nghttp2 guarantees :status is received first and only once, and - value is 3 digits status code, and decode_status_code always - succeeds. */ + /* nghttp2 guarantees :status is received first and only once. */ char buffer[32]; - stream->status_code = decode_status_code(value, valuelen); - DEBUGASSERT(stream->status_code != -1); + result = Curl_http_decode_status(&stream->status_code, + (const char *)value, valuelen); + if(result) + return NGHTTP2_ERR_CALLBACK_FAILURE; msnprintf(buffer, sizeof(buffer), H2H3_PSEUDO_STATUS ":%u\r", stream->status_code); result = Curl_headers_push(data_s, buffer, CURLH_PSEUDO); if(result) return NGHTTP2_ERR_CALLBACK_FAILURE; - result = Curl_dyn_addn(&stream->header_recvbuf, STRCONST("HTTP/2 ")); + result = recvbuf_write_hds(cf, data_s, STRCONST("HTTP/2 ")); if(result) return NGHTTP2_ERR_CALLBACK_FAILURE; - result = Curl_dyn_addn(&stream->header_recvbuf, value, valuelen); + result = recvbuf_write_hds(cf, data_s, (const char *)value, valuelen); if(result) return NGHTTP2_ERR_CALLBACK_FAILURE; /* the space character after the status code is mandatory */ - result = Curl_dyn_addn(&stream->header_recvbuf, STRCONST(" \r\n")); + result = recvbuf_write_hds(cf, data_s, STRCONST(" \r\n")); if(result) return NGHTTP2_ERR_CALLBACK_FAILURE; /* if we receive data for another handle, wake that up */ @@ -1278,16 +1285,16 @@ static int on_header(nghttp2_session *session, const nghttp2_frame *frame, /* nghttp2 guarantees that namelen > 0, and :status was already received, and this is not pseudo-header field . */ /* convert to an HTTP1-style header */ - result = Curl_dyn_addn(&stream->header_recvbuf, name, namelen); + result = recvbuf_write_hds(cf, data_s, (const char *)name, namelen); if(result) return NGHTTP2_ERR_CALLBACK_FAILURE; - result = Curl_dyn_addn(&stream->header_recvbuf, STRCONST(": ")); + result = recvbuf_write_hds(cf, data_s, STRCONST(": ")); if(result) return NGHTTP2_ERR_CALLBACK_FAILURE; - result = Curl_dyn_addn(&stream->header_recvbuf, value, valuelen); + result = recvbuf_write_hds(cf, data_s, (const char *)value, valuelen); if(result) return NGHTTP2_ERR_CALLBACK_FAILURE; - result = Curl_dyn_addn(&stream->header_recvbuf, STRCONST("\r\n")); + result = recvbuf_write_hds(cf, data_s, STRCONST("\r\n")); if(result) return NGHTTP2_ERR_CALLBACK_FAILURE; /* if we receive data for another handle, wake that up */ @@ -1302,17 +1309,18 @@ static int on_header(nghttp2_session *session, const nghttp2_frame *frame, return 0; /* 0 is successful */ } -static ssize_t data_source_read_callback(nghttp2_session *session, - int32_t stream_id, - uint8_t *buf, size_t length, - uint32_t *data_flags, - nghttp2_data_source *source, - void *userp) +static ssize_t req_body_read_callback(nghttp2_session *session, + int32_t stream_id, + uint8_t *buf, size_t length, + uint32_t *data_flags, + nghttp2_data_source *source, + void *userp) { struct Curl_cfilter *cf = userp; struct Curl_easy *data_s; struct HTTP *stream = NULL; - size_t nread; + CURLcode result; + ssize_t nread; (void)source; (void)cf; @@ -1332,23 +1340,25 @@ static ssize_t data_source_read_callback(nghttp2_session *session, else return NGHTTP2_ERR_INVALID_ARGUMENT; - nread = CURLMIN(stream->upload_len, length); - if(nread > 0) { - memcpy(buf, stream->upload_mem, nread); - stream->upload_mem += nread; - stream->upload_len -= nread; - if(data_s->state.infilesize != -1) - stream->upload_left -= nread; + nread = Curl_bufq_read(&stream->h2_sendbuf, buf, length, &result); + if(nread < 0) { + if(result != CURLE_AGAIN) + return NGHTTP2_ERR_CALLBACK_FAILURE; + nread = 0; } + if(nread > 0 && data_s->state.infilesize != -1) + stream->upload_left -= nread; + + DEBUGF(LOG_CF(data_s, cf, "[h2sid=%u] req_body_read(len=%zu) left=%zd" + " -> %zd, %d", + stream_id, length, stream->upload_left, nread, result)); + if(stream->upload_left == 0) *data_flags = NGHTTP2_DATA_FLAG_EOF; else if(nread == 0) return NGHTTP2_ERR_DEFERRED; - DEBUGF(LOG_CF(data_s, cf, "[h2sid=%u] data_source_read_callback: " - "returns %zu bytes", stream_id, nread)); - return nread; } @@ -1374,8 +1384,9 @@ static void http2_data_done(struct Curl_cfilter *cf, /* there might be allocated resources done before this got the 'h2' pointer setup */ - Curl_dyn_free(&stream->header_recvbuf); - Curl_dyn_free(&stream->trailer_recvbuf); + Curl_bufq_free(&stream->h2_sendbuf); + Curl_bufq_free(&stream->h2_recvbuf); + Curl_dynhds_free(&stream->resp_trailers); if(stream->push_headers) { /* if they weren't used and then freed before */ for(; stream->push_headers_used > 0; --stream->push_headers_used) { @@ -1388,24 +1399,17 @@ static void http2_data_done(struct Curl_cfilter *cf, if(!ctx || !ctx->h2) return; - /* do this before the reset handling, as that might clear ->stream_id */ - if(stream->stream_id && stream->stream_id == ctx->pause_stream_id) { - DEBUGF(LOG_CF(data, cf, "[h2sid=%u] DONE, the pause stream", - stream->stream_id)); - ctx->pause_stream_id = 0; - } - (void)premature; if(!stream->closed && stream->stream_id) { /* RST_STREAM */ - DEBUGF(LOG_CF(data, cf, "[h2sid=%u] RST", stream->stream_id)); + DEBUGF(LOG_CF(data, cf, "[h2sid=%u] premature DATA_DONE, RST stream", + stream->stream_id)); if(!nghttp2_submit_rst_stream(ctx->h2, NGHTTP2_FLAG_NONE, stream->stream_id, NGHTTP2_STREAM_CLOSED)) (void)nghttp2_session_send(ctx->h2); } - if(data->state.drain) - drained_transfer(cf, data); + drained_transfer(cf, data); /* -1 means unassigned and 0 means cleared */ if(nghttp2_session_get_stream_user_data(ctx->h2, stream->stream_id)) { @@ -1458,78 +1462,6 @@ CURLcode Curl_http2_request_upgrade(struct dynbuf *req, return result; } -/* - * h2_process_pending_input() processes pending input left in - * httpc->inbuf. Then, call h2_session_send() to send pending data. - * This function returns 0 if it succeeds, or -1 and error code will - * be assigned to *err. - */ -static int h2_process_pending_input(struct Curl_cfilter *cf, - struct Curl_easy *data, - CURLcode *err) -{ - struct cf_h2_ctx *ctx = cf->ctx; - ssize_t nread; - ssize_t rv; - - nread = ctx->inbuflen - ctx->nread_inbuf; - if(nread) { - char *inbuf = ctx->inbuf + ctx->nread_inbuf; - - rv = nghttp2_session_mem_recv(ctx->h2, (const uint8_t *)inbuf, nread); - if(rv < 0) { - failf(data, - "h2_process_pending_input: nghttp2_session_mem_recv() returned " - "%zd:%s", rv, nghttp2_strerror((int)rv)); - *err = CURLE_RECV_ERROR; - return -1; - } - - if(nread == rv) { - DEBUGF(LOG_CF(data, cf, "all data in connection buffer processed")); - ctx->inbuflen = 0; - ctx->nread_inbuf = 0; - } - else { - ctx->nread_inbuf += rv; - DEBUGF(LOG_CF(data, cf, "h2_process_pending_input: %zu bytes left " - "in connection buffer", - ctx->inbuflen - ctx->nread_inbuf)); - } - } - - rv = h2_session_send(cf, data); - if(rv) { - *err = CURLE_SEND_ERROR; - return -1; - } - - if(nghttp2_session_check_request_allowed(ctx->h2) == 0) { - /* No more requests are allowed in the current session, so - the connection may not be reused. This is set when a - GOAWAY frame has been received or when the limit of stream - identifiers has been reached. */ - connclose(cf->conn, "http/2: No new requests allowed"); - } - - if(should_close_session(ctx)) { - struct HTTP *stream = data->req.p.http; - DEBUGF(LOG_CF(data, cf, - "h2_process_pending_input: nothing to do in this session")); - if(stream->reset) - *err = CURLE_PARTIAL_FILE; - else if(stream->error) - *err = CURLE_HTTP2; - else { - /* not an error per se, but should still close the connection */ - connclose(cf->conn, "GOAWAY received"); - *err = CURLE_OK; - } - return -1; - } - return 0; -} - static CURLcode http2_data_done_send(struct Curl_cfilter *cf, struct Curl_easy *data) { @@ -1540,32 +1472,16 @@ static CURLcode http2_data_done_send(struct Curl_cfilter *cf, if(!ctx || !ctx->h2) goto out; + DEBUGF(LOG_CF(data, cf, "[h2sid=%u] data done", stream->stream_id)); if(stream->upload_left) { /* If the stream still thinks there's data left to upload. */ - stream->upload_left = 0; /* DONE! */ + if(stream->upload_left == -1) + stream->upload_left = 0; /* DONE! */ /* resume sending here to trigger the callback to get called again so that it can signal EOF to nghttp2 */ (void)nghttp2_session_resume_data(ctx->h2, stream->stream_id); - (void)h2_process_pending_input(cf, data, &result); - } - - /* If nghttp2 still has pending frames unsent */ - if(nghttp2_session_want_write(ctx->h2)) { - struct SingleRequest *k = &data->req; - int rv; - - DEBUGF(LOG_CF(data, cf, "HTTP/2 still wants to send data")); - - /* and attempt to send the pending frames */ - rv = h2_session_send(cf, data); - if(rv) - result = CURLE_SEND_ERROR; - - if(nghttp2_session_want_write(ctx->h2)) { - /* re-set KEEP_SEND to make sure we are called again */ - k->keepon |= KEEP_SEND; - } + drain_this(cf, data); } out: @@ -1576,20 +1492,10 @@ static ssize_t http2_handle_stream_close(struct Curl_cfilter *cf, struct Curl_easy *data, struct HTTP *stream, CURLcode *err) { - struct cf_h2_ctx *ctx = cf->ctx; - - if(ctx->pause_stream_id == stream->stream_id) { - ctx->pause_stream_id = 0; - } + ssize_t rv = 0; drained_transfer(cf, data); - if(ctx->pause_stream_id == 0) { - if(h2_process_pending_input(cf, data, err) != 0) { - return -1; - } - } - if(stream->error == NGHTTP2_REFUSED_STREAM) { DEBUGF(LOG_CF(data, cf, "[h2sid=%u] REFUSED_STREAM, try again on a new " "connection", stream->stream_id)); @@ -1619,34 +1525,42 @@ static ssize_t http2_handle_stream_close(struct Curl_cfilter *cf, return -1; } - if(Curl_dyn_len(&stream->trailer_recvbuf)) { - char *trailp = Curl_dyn_ptr(&stream->trailer_recvbuf); - char *lf; + if(Curl_dynhds_count(&stream->resp_trailers)) { + struct dynhds_entry *e; + struct dynbuf dbuf; + size_t i; - do { - size_t len = 0; - CURLcode result; - /* each trailer line ends with a newline */ - lf = strchr(trailp, '\n'); - if(!lf) + *err = CURLE_OK; + Curl_dyn_init(&dbuf, DYN_TRAILERS); + for(i = 0; i < Curl_dynhds_count(&stream->resp_trailers); ++i) { + e = Curl_dynhds_getn(&stream->resp_trailers, i); + if(!e) break; - len = lf + 1 - trailp; - - Curl_debug(data, CURLINFO_HEADER_IN, trailp, len); - /* pass the trailers one by one to the callback */ - result = Curl_client_write(data, CLIENTWRITE_HEADER, trailp, len); - if(result) { - *err = result; - return -1; - } - trailp = ++lf; - } while(lf); + Curl_dyn_reset(&dbuf); + *err = Curl_dyn_addf(&dbuf, "%.*s: %.*s\x0d\x0a", + (int)e->namelen, e->name, + (int)e->valuelen, e->value); + if(*err) + break; + Curl_debug(data, CURLINFO_HEADER_IN, Curl_dyn_ptr(&dbuf), + Curl_dyn_len(&dbuf)); + *err = Curl_client_write(data, CLIENTWRITE_HEADER|CLIENTWRITE_TRAILER, + Curl_dyn_ptr(&dbuf), Curl_dyn_len(&dbuf)); + if(*err) + break; + } + Curl_dyn_free(&dbuf); + if(*err) + goto out; } stream->close_handled = TRUE; + *err = CURLE_OK; + rv = 0; - DEBUGF(LOG_CF(data, cf, "[h2sid=%u] closed cleanly", stream->stream_id)); - return 0; +out: + DEBUGF(LOG_CF(data, cf, "handle_stream_close -> %zd, %d", rv, *err)); + return rv; } static int sweight_wanted(const struct Curl_easy *data) @@ -1683,12 +1597,13 @@ static void h2_pri_spec(struct Curl_easy *data, } /* - * h2_session_send() checks if there's been an update in the priority / + * Check if there's been an update in the priority / * dependency settings and if so it submits a PRIORITY frame with the updated * info. + * Flush any out data pending in the network buffer. */ -static CURLcode h2_session_send(struct Curl_cfilter *cf, - struct Curl_easy *data) +static CURLcode h2_progress_egress(struct Curl_cfilter *cf, + struct Curl_easy *data) { struct cf_h2_ctx *ctx = cf->ctx; struct HTTP *stream = data->req.p.http; @@ -1717,7 +1632,100 @@ out: nghttp2_strerror(rv), rv)); return CURLE_SEND_ERROR; } - return flush_output(cf, data); + return nw_out_flush(cf, data); +} + +static ssize_t stream_recv(struct Curl_cfilter *cf, struct Curl_easy *data, + char *buf, size_t len, CURLcode *err) +{ + struct cf_h2_ctx *ctx = cf->ctx; + struct HTTP *stream = data->req.p.http; + ssize_t nread = -1; + + *err = CURLE_AGAIN; + if(!Curl_bufq_is_empty(&stream->h2_recvbuf)) { + nread = Curl_bufq_read(&stream->h2_recvbuf, + (unsigned char *)buf, len, err); + DEBUGF(LOG_CF(data, cf, "recvbuf read(len=%zu) -> %zd, %d", + len, nread, *err)); + if(nread < 0) + goto out; + DEBUGASSERT(nread > 0); + } + + if(nread < 0) { + if(stream->closed) { + nread = http2_handle_stream_close(cf, data, stream, err); + } + else if(stream->reset || + (ctx->conn_closed && Curl_bufq_is_empty(&ctx->inbufq)) || + (ctx->goaway && ctx->last_stream_id < stream->stream_id)) { + *err = stream->bodystarted? CURLE_PARTIAL_FILE : CURLE_RECV_ERROR; + nread = -1; + } + } + else if(nread == 0) { + *err = CURLE_AGAIN; + nread = -1; + } + +out: + DEBUGF(LOG_CF(data, cf, "stream_recv(len=%zu) -> %zd, %d", + len, nread, *err)); + return nread; +} + +static CURLcode h2_progress_ingress(struct Curl_cfilter *cf, + struct Curl_easy *data) +{ + struct cf_h2_ctx *ctx = cf->ctx; + struct HTTP *stream = data->req.p.http; + CURLcode result = CURLE_OK; + ssize_t nread; + bool keep_reading = TRUE; + + /* Process network input buffer fist */ + if(!Curl_bufq_is_empty(&ctx->inbufq)) { + DEBUGF(LOG_CF(data, cf, "Process %zd bytes in connection buffer", + Curl_bufq_len(&ctx->inbufq))); + if(h2_process_pending_input(cf, data, &result) < 0) + return result; + } + + /* Receive data from the "lower" filters, e.g. network until + * it is time to stop or we have enough data for this stream */ + while(keep_reading && + !ctx->conn_closed && /* not closed the connection */ + !stream->closed && /* nor the stream */ + Curl_bufq_is_empty(&ctx->inbufq) && /* and we consumed our input */ + !Curl_bufq_is_full(&stream->h2_recvbuf) && /* enough? */ + Curl_bufq_len(&stream->h2_recvbuf) < data->set.buffer_size) { + + nread = Curl_bufq_slurp(&ctx->inbufq, nw_in_reader, cf, &result); + DEBUGF(LOG_CF(data, cf, "read %zd bytes nw data -> %zd, %d", + Curl_bufq_len(&ctx->inbufq), nread, result)); + if(nread < 0) { + if(result != CURLE_AGAIN) { + failf(data, "Failed receiving HTTP2 data"); + return result; + } + break; + } + else if(nread == 0) { + ctx->conn_closed = TRUE; + break; + } + + keep_reading = Curl_bufq_is_full(&ctx->inbufq); + if(h2_process_pending_input(cf, data, &result)) + return result; + } + + if(ctx->conn_closed && Curl_bufq_is_empty(&ctx->inbufq)) { + connclose(cf->conn, "GOAWAY received"); + } + + return CURLE_OK; } static ssize_t cf_h2_recv(struct Curl_cfilter *cf, struct Curl_easy *data, @@ -1726,250 +1734,67 @@ static ssize_t cf_h2_recv(struct Curl_cfilter *cf, struct Curl_easy *data, struct cf_h2_ctx *ctx = cf->ctx; struct HTTP *stream = data->req.p.http; ssize_t nread = -1; + CURLcode result; struct cf_call_data save; - bool conn_is_closed = FALSE; CF_DATA_SAVE(save, cf, data); - /* If the h2 session has told us to GOAWAY with an error AND - * indicated the highest stream id it has processes AND - * the stream we are trying to read has a higher id, this - * means we will most likely not receive any more for it. - * Treat this as if the server explicitly had RST the stream */ - if((ctx->goaway && ctx->goaway_error && - ctx->last_stream_id > 0 && - ctx->last_stream_id < stream->stream_id)) { - stream->reset = TRUE; - } - - /* If a stream is RST, it does not matter what state the h2 session - * is in, our answer to receiving data is always the same. */ - if(stream->reset) { - *err = stream->bodystarted? CURLE_PARTIAL_FILE : CURLE_RECV_ERROR; - nread = -1; - goto out; - } - - if(should_close_session(ctx)) { - DEBUGF(LOG_CF(data, cf, "http2_recv: nothing to do in this session")); - if(cf->conn->bits.close) { - /* already marked for closure, return OK and we're done */ - drained_transfer(cf, data); - *err = CURLE_OK; - nread = 0; - goto out; - } - *err = CURLE_HTTP2; - nread = -1; - goto out; - } - - /* Nullify here because we call nghttp2_session_send() and they - might refer to the old buffer. */ - stream->upload_mem = NULL; - stream->upload_len = 0; - - /* - * At this point 'stream' is just in the Curl_easy the connection - * identifies as its owner at this time. - */ - - if(stream->bodystarted && - stream->nread_header_recvbuf < Curl_dyn_len(&stream->header_recvbuf)) { - /* If there is header data pending for this stream to return, do that */ - size_t left = - Curl_dyn_len(&stream->header_recvbuf) - stream->nread_header_recvbuf; - size_t ncopy = CURLMIN(len, left); - memcpy(buf, Curl_dyn_ptr(&stream->header_recvbuf) + - stream->nread_header_recvbuf, ncopy); - stream->nread_header_recvbuf += ncopy; - - DEBUGF(LOG_CF(data, cf, "recv: Got %d bytes from header_recvbuf", - (int)ncopy)); - nread = ncopy; - goto out; - } - DEBUGF(LOG_CF(data, cf, "[h2sid=%u] cf_recv: win %u/%u", stream->stream_id, nghttp2_session_get_local_window_size(ctx->h2), nghttp2_session_get_stream_local_window_size(ctx->h2, stream->stream_id) )); - - if(stream->memlen) { - DEBUGF(LOG_CF(data, cf, "[h2sid=%u] recv: DRAIN %zu bytes (%p => %p)", - stream->stream_id, stream->memlen, - (void *)stream->mem, (void *)buf)); - if(buf != stream->mem) { - /* if we didn't get the same buffer this time, we must move the data to - the beginning */ - memmove(buf, stream->mem, stream->memlen); - stream->len = len - stream->memlen; - stream->mem = buf; - } - - if(ctx->pause_stream_id == stream->stream_id && !stream->pausedata) { - /* We have paused nghttp2, but we have no pause data (see - on_data_chunk_recv). */ - ctx->pause_stream_id = 0; - if(h2_process_pending_input(cf, data, err) != 0) { - nread = -1; - goto out; - } - } - } - else if(stream->pausedata) { - DEBUGASSERT(ctx->pause_stream_id == stream->stream_id); - nread = CURLMIN(len, stream->pauselen); - memcpy(buf, stream->pausedata, nread); - - stream->pausedata += nread; - stream->pauselen -= nread; - drain_this(cf, data); - - if(stream->pauselen == 0) { - DEBUGF(LOG_CF(data, cf, "[h2sid=%u] Unpaused", stream->stream_id)); - DEBUGASSERT(ctx->pause_stream_id == stream->stream_id); - ctx->pause_stream_id = 0; - - stream->pausedata = NULL; - stream->pauselen = 0; - } - DEBUGF(LOG_CF(data, cf, "[h2sid=%u] recv: returns unpaused %zd bytes", - stream->stream_id, nread)); + nread = stream_recv(cf, data, buf, len, err); + if(nread < 0 && *err != CURLE_AGAIN) goto out; - } - else if(ctx->pause_stream_id) { - /* If a stream paused nghttp2_session_mem_recv previously, and has - not processed all data, it still refers to the buffer in - nghttp2_session. If we call nghttp2_session_mem_recv(), we may - overwrite that buffer. To avoid that situation, just return - here with CURLE_AGAIN. This could be busy loop since data in - socket is not read. But it seems that usually streams are - notified with its drain property, and socket is read again - quickly. */ - if(stream->closed) { - /* closed overrides paused */ - drained_transfer(cf, data); - nread = 0; + + if(nread < 0) { + *err = h2_progress_ingress(cf, data); + if(*err) goto out; - } - DEBUGF(LOG_CF(data, cf, "[h2sid=%u] is paused, pause h2sid: %u", - stream->stream_id, ctx->pause_stream_id)); - *err = CURLE_AGAIN; - nread = -1; - goto out; - } - else { - /* We have nothing buffered for `data` and no other stream paused - * the processing of incoming data, we can therefore read new data - * from the network. - * If DATA is coming for this stream, we want to store it ad the - * `buf` passed in right away - saving us a copy. - */ - stream->mem = buf; - stream->len = len; - stream->memlen = 0; - if(ctx->inbuflen > 0) { - DEBUGF(LOG_CF(data, cf, "[h2sid=%u] %zd bytes in inbuf", - stream->stream_id, ctx->inbuflen - ctx->nread_inbuf)); - if(h2_process_pending_input(cf, data, err)) - return -1; + nread = stream_recv(cf, data, buf, len, err); + if(Curl_bufq_is_empty(&stream->h2_recvbuf)) { + drained_transfer(cf, data); } - - while(stream->memlen == 0 && /* have no data for this stream */ - !stream->closed && /* and it is not closed/reset */ - !ctx->pause_stream_id && /* we are not paused either */ - ctx->inbuflen == 0 && /* and out input buffer is empty */ - !conn_is_closed) { /* and connection is not closed */ - /* Receive data from the "lower" filters */ - nread = Curl_conn_cf_recv(cf->next, data, ctx->inbuf, H2_BUFSIZE, err); - if(nread < 0) { - DEBUGASSERT(*err); - if(*err == CURLE_AGAIN) { - break; - } - failf(data, "Failed receiving HTTP2 data"); - conn_is_closed = TRUE; - } - else if(nread == 0) { - DEBUGF(LOG_CF(data, cf, "[h2sid=%u] underlying connection is closed", - stream->stream_id)); - conn_is_closed = TRUE; - } - else { - DEBUGF(LOG_CF(data, cf, "[h2sid=%u] read %zd from connection", - stream->stream_id, nread)); - ctx->inbuflen = nread; - DEBUGASSERT(ctx->nread_inbuf == 0); - if(h2_process_pending_input(cf, data, err)) - return -1; - } - } - } - if(stream->memlen) { - ssize_t retlen = stream->memlen; - - /* TODO: all this buffer handling is very brittle */ - stream->len += stream->memlen; - stream->memlen = 0; - - if(ctx->pause_stream_id == stream->stream_id) { - /* data for this stream is returned now, but this stream caused a pause - already so we need it called again asap */ - DEBUGF(LOG_CF(data, cf, "[h2sid=%u] Data returned for PAUSED stream", - stream->stream_id)); - drain_this(cf, data); - Curl_expire(data, 0, EXPIRE_RUN_NOW); - } - else if(stream->closed) { - if(stream->reset || stream->error) { - nread = http2_handle_stream_close(cf, data, stream, err); - goto out; - } - /* this stream is closed, trigger a another read ASAP to detect that */ - DEBUGF(LOG_CF(data, cf, "[h2sid=%u] is closed now, run again", - stream->stream_id)); - drain_this(cf, data); - Curl_expire(data, 0, EXPIRE_RUN_NOW); + if(nread > 0) { + size_t data_consumed = (size_t)nread; + /* Now that we transferred this to the upper layer, we report + * the actual amount of DATA consumed to the H2 session, so + * that it adjusts stream flow control */ + if(stream->h2_recv_hds_len >= data_consumed) { + stream->h2_recv_hds_len -= data_consumed; /* no DATA */ } else { - drained_transfer(cf, data); + if(stream->h2_recv_hds_len) { + data_consumed -= stream->h2_recv_hds_len; + stream->h2_recv_hds_len = 0; + } + if(data_consumed) { + DEBUGF(LOG_CF(data, cf, "[h2sid=%u] increase window by %zu", + stream->stream_id, data_consumed)); + nghttp2_session_consume(ctx->h2, stream->stream_id, data_consumed); + } } - *err = CURLE_OK; - nread = retlen; - goto out; + if(stream->closed) { + DEBUGF(LOG_CF(data, cf, "[h2sid=%u] closed stream, set drain", + stream->stream_id)); + drain_this(cf, data); + } } - if(conn_is_closed && !stream->closed) { - /* underlying connection is closed and we have nothing for the stream. - * Treat this as a RST */ - stream->closed = stream->reset = TRUE; - failf(data, "HTTP/2 stream %u was not closed cleanly before" - " end of the underlying connection", - stream->stream_id); - } - - if(stream->closed) { - nread = http2_handle_stream_close(cf, data, stream, err); - goto out; - } - - if(!data->state.drain && Curl_conn_cf_data_pending(cf->next, data)) { - DEBUGF(LOG_CF(data, cf, "[h2sid=%u] pending data, set drain", - stream->stream_id)); - drain_this(cf, data); - } - *err = CURLE_AGAIN; - nread = -1; out: - DEBUGF(LOG_CF(data, cf, "[h2sid=%u] cf_recv -> %zd, %d", - stream->stream_id, nread, *err)); + result = h2_progress_egress(cf, data); + if(result) { + *err = result; + nread = -1; + } + DEBUGF(LOG_CF(data, cf, "[h2sid=%u] cf_recv(len=%zu) -> %zd %d", + stream->stream_id, len, nread, *err)); CF_DATA_RESTORE(cf, save); return nread; } @@ -1996,9 +1821,15 @@ static ssize_t cf_h2_send(struct Curl_cfilter *cf, struct Curl_easy *data, ssize_t nwritten; CF_DATA_SAVE(save, cf, data); - DEBUGF(LOG_CF(data, cf, "cf_send(len=%zu) start", len)); if(stream->stream_id != -1) { + DEBUGF(LOG_CF(data, cf, "[h2sid=%u] cf_send: win %u/%u", + stream->stream_id, + nghttp2_session_get_remote_window_size(ctx->h2), + nghttp2_session_get_stream_remote_window_size( + ctx->h2, stream->stream_id) + )); + if(stream->close_handled) { infof(data, "stream %u closed", stream->stream_id); *err = CURLE_HTTP2_STREAM; @@ -2011,24 +1842,35 @@ static ssize_t cf_h2_send(struct Curl_cfilter *cf, struct Curl_easy *data, } /* If stream_id != -1, we have dispatched request HEADERS, and now are going to send or sending request body in DATA frame */ - stream->upload_mem = buf; - stream->upload_len = len; - rv = nghttp2_session_resume_data(ctx->h2, stream->stream_id); - if(nghttp2_is_fatal(rv)) { - *err = CURLE_SEND_ERROR; - nwritten = -1; - goto out; + nwritten = Curl_bufq_write(&stream->h2_sendbuf, buf, len, err); + if(nwritten < 0) { + if(*err != CURLE_AGAIN) + goto out; + nwritten = 0; } - result = h2_session_send(cf, data); + + if(!Curl_bufq_is_empty(&stream->h2_sendbuf)) { + rv = nghttp2_session_resume_data(ctx->h2, stream->stream_id); + if(nghttp2_is_fatal(rv)) { + *err = CURLE_SEND_ERROR; + nwritten = -1; + goto out; + } + } + + result = h2_progress_ingress(cf, data); if(result) { *err = result; nwritten = -1; goto out; } - nwritten = (ssize_t)len - (ssize_t)stream->upload_len; - stream->upload_mem = NULL; - stream->upload_len = 0; + result = h2_progress_egress(cf, data); + if(result) { + *err = result; + nwritten = -1; + goto out; + } if(should_close_session(ctx)) { DEBUGF(LOG_CF(data, cf, "send: nothing to do in this session")); @@ -2037,14 +1879,6 @@ static ssize_t cf_h2_send(struct Curl_cfilter *cf, struct Curl_easy *data, goto out; } - if(stream->upload_left) { - /* we are sure that we have more data to send here. Calling the - following API will make nghttp2_session_want_write() return - nonzero if remote window allows it, which then libcurl checks - socket is writable or not. See http2_perform_getsock(). */ - nghttp2_session_resume_data(ctx->h2, stream->stream_id); - } - if(!nwritten) { size_t rwin = nghttp2_session_get_stream_remote_window_size(ctx->h2, stream->stream_id); @@ -2060,18 +1894,23 @@ static ssize_t cf_h2_send(struct Curl_cfilter *cf, struct Curl_easy *data, "window is exhausted", stream->stream_id)); } } - DEBUGF(LOG_CF(data, cf, "[h2sid=%u] cf_send returns %zd ", - stream->stream_id, nwritten)); - /* handled writing BODY for open stream. */ goto out; } + + DEBUGF(LOG_CF(data, cf, "cf_send, submit %s", data->state.url)); + if(!stream->h2_send_hds_len) { + /* first invocation carries the HTTP/1.1 formatted request headers. + * we remember that in case we EAGAIN this call, because the next + * invocation may have added request body data into the buffer. */ + stream->h2_send_hds_len = len; + } + /* Stream has not been opened yet. `buf` is expected to contain - * request headers. */ - /* TODO: this assumes that the `buf` and `len` we are called with - * is *all* HEADERs and no body. We have no way to determine here - * if that is indeed the case. */ - result = Curl_pseudo_headers(data, buf, len, NULL, &hreq); + * `stream->h2_send_hds_len` bytes of request headers. */ + DEBUGASSERT(stream->h2_send_hds_len <= len); + result = Curl_pseudo_headers(data, buf, stream->h2_send_hds_len, + NULL, &hreq); if(result) { *err = result; nwritten = -1; @@ -2114,7 +1953,7 @@ static ssize_t cf_h2_send(struct Curl_cfilter *cf, struct Curl_easy *data, /* data sending without specifying the data amount up front */ stream->upload_left = -1; /* unknown, but not zero */ - data_prd.read_callback = data_source_read_callback; + data_prd.read_callback = req_body_read_callback; data_prd.source.ptr = NULL; stream_id = nghttp2_submit_request(ctx->h2, &pri_spec, nva, nheader, &data_prd, data); @@ -2134,14 +1973,21 @@ static ssize_t cf_h2_send(struct Curl_cfilter *cf, struct Curl_easy *data, goto out; } + DEBUGF(LOG_CF(data, cf, "[h2sid=%u] cf_send(len=%zu) submit %s", + stream_id, len, data->state.url)); infof(data, "Using Stream ID: %u (easy handle %p)", stream_id, (void *)data); stream->stream_id = stream_id; - /* See TODO above. We assume that the whole buf was consumed by - * generating the request headers. */ - nwritten = len; + nwritten = stream->h2_send_hds_len; - result = h2_session_send(cf, data); + result = h2_progress_ingress(cf, data); + if(result) { + *err = result; + nwritten = -1; + goto out; + } + + result = h2_progress_egress(cf, data); if(result) { *err = result; nwritten = -1; @@ -2155,16 +2001,9 @@ static ssize_t cf_h2_send(struct Curl_cfilter *cf, struct Curl_easy *data, goto out; } - /* If whole HEADERS frame was sent off to the underlying socket, the nghttp2 - library calls data_source_read_callback. But only it found that no data - available, so it deferred the DATA transmission. Which means that - nghttp2_session_want_write() returns 0 on http2_perform_getsock(), which - results that no writable socket check is performed. To workaround this, - we issue nghttp2_session_resume_data() here to bring back DATA - transmission from deferred state. */ - nghttp2_session_resume_data(ctx->h2, stream->stream_id); - out: + DEBUGF(LOG_CF(data, cf, "[h2sid=%u] cf_send -> %zd, %d", + stream->stream_id, nwritten, *err)); CF_DATA_RESTORE(cf, save); return nwritten; } @@ -2182,7 +2021,7 @@ static int cf_h2_get_select_socks(struct Curl_cfilter *cf, CF_DATA_SAVE(save, cf, data); sock[0] = Curl_conn_cf_get_socket(cf, data); - if(!(k->keepon & KEEP_RECV_PAUSE)) + if(!(k->keepon & (KEEP_RECV_PAUSE|KEEP_RECV_HOLD))) /* Unless paused - in an HTTP/2 connection we can basically always get a frame so we should always be ready for one */ bitmap |= GETSOCK_READSOCK(0); @@ -2230,10 +2069,13 @@ static CURLcode cf_h2_connect(struct Curl_cfilter *cf, goto out; } - if(-1 == h2_process_pending_input(cf, data, &result)) { - result = CURLE_HTTP2; + result = h2_progress_ingress(cf, data); + if(result) + goto out; + + result = h2_progress_egress(cf, data); + if(result) goto out; - } *done = TRUE; cf->connected = TRUE; @@ -2278,7 +2120,7 @@ static CURLcode http2_data_pause(struct Curl_cfilter *cf, #ifdef NGHTTP2_HAS_SET_LOCAL_WINDOW_SIZE if(ctx && ctx->h2) { struct HTTP *stream = data->req.p.http; - uint32_t window = !pause * HTTP2_HUGE_WINDOW_SIZE; + uint32_t window = !pause * H2_STREAM_WINDOW_SIZE; CURLcode result; int rv = nghttp2_session_set_local_window_size(ctx->h2, @@ -2292,7 +2134,7 @@ static CURLcode http2_data_pause(struct Curl_cfilter *cf, } /* make sure the window update gets sent */ - result = h2_session_send(cf, data); + result = h2_progress_egress(cf, data); if(result) return result; @@ -2329,10 +2171,9 @@ static CURLcode cf_h2_cntrl(struct Curl_cfilter *cf, result = http2_data_setup(cf, data); break; } - case CF_CTRL_DATA_PAUSE: { + case CF_CTRL_DATA_PAUSE: result = http2_data_pause(cf, data, (arg1 != 0)); break; - } case CF_CTRL_DATA_DONE_SEND: { result = http2_data_done_send(cf, data); break; @@ -2352,7 +2193,10 @@ static bool cf_h2_data_pending(struct Curl_cfilter *cf, const struct Curl_easy *data) { struct cf_h2_ctx *ctx = cf->ctx; - if(ctx && ctx->inbuflen > 0 && ctx->nread_inbuf > ctx->inbuflen) + struct HTTP *stream = data->req.p.http; + + if(ctx && (!Curl_bufq_is_empty(&ctx->inbufq) + || (stream && !Curl_bufq_is_empty(&stream->h2_recvbuf)))) return TRUE; return cf->next? cf->next->cft->has_data_pending(cf->next, data) : FALSE; } @@ -2606,23 +2450,26 @@ CURLcode Curl_http2_upgrade(struct Curl_easy *data, if(result) return result; - if(nread) { - /* we are going to copy mem to httpc->inbuf. This is required since - mem is part of buffer pointed by stream->mem, and callbacks - called by nghttp2_session_mem_recv() will write stream specific - data into stream->mem, overwriting data already there. */ - if(H2_BUFSIZE < nread) { - failf(data, "connection buffer size is too small to store data " - "following HTTP Upgrade response header: buflen=%d, datalen=%zu", - H2_BUFSIZE, nread); + if(nread > 0) { + /* Remaining data from the protocol switch reply is already using + * the switched protocol, ie. HTTP/2. We add that to the network + * inbufq. */ + ssize_t copied; + + copied = Curl_bufq_write(&ctx->inbufq, + (const unsigned char *)mem, nread, &result); + if(copied < 0) { + failf(data, "error on copying HTTP Upgrade response: %d", result); + return CURLE_RECV_ERROR; + } + if((size_t)copied < nread) { + failf(data, "connection buffer size could not take all data " + "from HTTP Upgrade response header: copied=%zd, datalen=%zu", + copied, nread); return CURLE_HTTP2; } - - infof(data, "Copying HTTP/2 data in stream buffer to connection buffer" + infof(data, "Copied HTTP/2 data in stream buffer to connection buffer" " after upgrade: len=%zu", nread); - DEBUGASSERT(ctx->nread_inbuf == 0); - memcpy(ctx->inbuf, mem, nread); - ctx->inbuflen = nread; } conn->httpversion = 20; /* we know we're on HTTP/2 now */ diff --git a/tests/data/test1800 b/tests/data/test1800 index d7cc73af41..6b14bfb37a 100644 --- a/tests/data/test1800 +++ b/tests/data/test1800 @@ -49,7 +49,7 @@ User-Agent: curl/%VERSION Accept: */* Connection: Upgrade, HTTP2-Settings Upgrade: %H2CVER -HTTP2-Settings: AAMAAABkAAQCAAAAAAIAAAAA +HTTP2-Settings: AAMAAABkAAQACAAAAAIAAAAA diff --git a/tests/http/scorecard.py b/tests/http/scorecard.py index 271bf31fd0..294008d01c 100644 --- a/tests/http/scorecard.py +++ b/tests/http/scorecard.py @@ -59,17 +59,16 @@ class ScoreCard: def handshakes(self, proto: str) -> Dict[str, Any]: props = {} - sample_size = 10 - self.info(f'handshaking ') + sample_size = 5 + self.info(f'TLS Handshake\n') for authority in [ f'{self.env.authority_for(self.env.domain1, proto)}' ]: - self.info('localhost') + self.info(' localhost...') c_samples = [] hs_samples = [] errors = [] for i in range(sample_size): - self.info('.') curl = CurlClient(env=self.env, silent=True) url = f'https://{authority}/' r = curl.http_download(urls=[url], alpn_proto=proto, no_save=True) @@ -79,20 +78,25 @@ class ScoreCard: else: errors.append(f'exit={r.exit_code}') props['localhost'] = { - 'connect': mean(c_samples), - 'handshake': mean(hs_samples), - 'errors': errors + 'ipv4-connect': mean(c_samples), + 'ipv4-handshake': mean(hs_samples), + 'ipv4-errors': errors, + 'ipv6-connect': 0, + 'ipv6-handshake': 0, + 'ipv6-errors': [], } + self.info('ok.\n') for authority in [ - 'curl.se', 'google.com', 'cloudflare.com', 'nghttp2.org', + 'curl.se', 'nghttp2.org', ]: + self.info(f' {authority}...') + props[authority] = {} for ipv in ['ipv4', 'ipv6']: - self.info(f'{authority}-{ipv}') + self.info(f'{ipv}...') c_samples = [] hs_samples = [] errors = [] for i in range(sample_size): - self.info('.') curl = CurlClient(env=self.env, silent=True) args = [ '--http3-only' if proto == 'h3' else '--http2', @@ -104,12 +108,10 @@ class ScoreCard: hs_samples.append(r.stats[0]['time_appconnect']) else: errors.append(f'exit={r.exit_code}') - props[f'{authority}-{ipv}'] = { - 'connect': mean(c_samples) if len(c_samples) else -1, - 'handshake': mean(hs_samples) if len(hs_samples) else -1, - 'errors': errors - } - self.info('\n') + props[authority][f'{ipv}-connect'] = mean(c_samples) if len(c_samples) else -1 + props[authority][f'{ipv}-handshake'] = mean(hs_samples) if len(hs_samples) else -1 + props[authority][f'{ipv}-errors'] = errors + self.info('ok.\n') return props def _make_docs_file(self, docs_dir: str, fname: str, fsize: int): @@ -138,16 +140,17 @@ class ScoreCard: count = 1 samples = [] errors = [] - self.info(f'{sample_size}x single') + self.info(f'single...') for i in range(sample_size): curl = CurlClient(env=self.env, silent=True) - r = curl.http_download(urls=[url], alpn_proto=proto, no_save=True) + r = curl.http_download(urls=[url], alpn_proto=proto, no_save=True, + with_headers=False) err = self._check_downloads(r, count) if err: errors.append(err) else: - samples.append(r.stats[0]['speed_download']) - self.info(f'.') + total_size = sum([s['size_download'] for s in r.stats]) + samples.append(total_size / r.duration.total_seconds()) return { 'count': count, 'samples': sample_size, @@ -160,17 +163,17 @@ class ScoreCard: samples = [] errors = [] url = f'{url}?[0-{count - 1}]' - self.info(f'{sample_size}x{count} serial') + self.info(f'serial...') for i in range(sample_size): curl = CurlClient(env=self.env, silent=True) - r = curl.http_download(urls=[url], alpn_proto=proto, no_save=True) - self.info(f'.') + r = curl.http_download(urls=[url], alpn_proto=proto, no_save=True, + with_headers=False) err = self._check_downloads(r, count) if err: errors.append(err) else: - for s in r.stats: - samples.append(s['speed_download']) + total_size = sum([s['size_download'] for s in r.stats]) + samples.append(total_size / r.duration.total_seconds()) return { 'count': count, 'samples': sample_size, @@ -183,19 +186,18 @@ class ScoreCard: samples = [] errors = [] url = f'{url}?[0-{count - 1}]' - self.info(f'{sample_size}x{count} parallel') + self.info(f'parallel...') for i in range(sample_size): curl = CurlClient(env=self.env, silent=True) - start = datetime.now() r = curl.http_download(urls=[url], alpn_proto=proto, no_save=True, - extra_args=['--parallel']) + with_headers=False, + extra_args=['--parallel', '--parallel-max', str(count)]) err = self._check_downloads(r, count) if err: errors.append(err) else: - duration = datetime.now() - start total_size = sum([s['size_download'] for s in r.stats]) - samples.append(total_size / duration.total_seconds()) + samples.append(total_size / r.duration.total_seconds()) return { 'count': count, 'samples': sample_size, @@ -210,7 +212,7 @@ class ScoreCard: 'serial': self.transfer_serial(url=url, proto=proto, count=count), 'parallel': self.transfer_parallel(url=url, proto=proto, count=count), } - self.info(f'\n') + self.info(f'ok.\n') return props def downloads(self, proto: str, test_httpd: bool = True, @@ -234,9 +236,9 @@ class ScoreCard: url100 = f'https://{self.env.domain1}:{port}/score100.data' scores[via] = { 'description': descr, - '1MB-local': self.download_url(url=url1, proto=proto, count=50), - '10MB-local': self.download_url(url=url10, proto=proto, count=50), - '100MB-local': self.download_url(url=url100, proto=proto, count=50), + '1MB': self.download_url(url=url1, proto=proto, count=50), + '10MB': self.download_url(url=url10, proto=proto, count=50), + '100MB': self.download_url(url=url100, proto=proto, count=50), } if test_caddy and self.caddy: port = self.caddy.port @@ -251,15 +253,85 @@ class ScoreCard: url100 = f'https://{self.env.domain1}:{port}/score100.data' scores[via] = { 'description': descr, - '1MB-local': self.download_url(url=url1, proto=proto, count=50), - '10MB-local': self.download_url(url=url10, proto=proto, count=50), - '100MB-local': self.download_url(url=url100, proto=proto, count=50), + '1MB': self.download_url(url=url1, proto=proto, count=50), + '10MB': self.download_url(url=url10, proto=proto, count=50), + '100MB': self.download_url(url=url100, proto=proto, count=50), + } + return scores + + def do_requests(self, url: str, proto: str, count: int, max_parallel: int = 1): + sample_size = 1 + samples = [] + errors = [] + url = f'{url}?[0-{count - 1}]' + extra_args = ['--parallel', '--parallel-max', str(max_parallel)] if max_parallel > 1 else [] + self.info(f'{max_parallel}...') + for i in range(sample_size): + curl = CurlClient(env=self.env) + r = curl.http_download(urls=[url], alpn_proto=proto, no_save=True, + with_headers=False, + extra_args=extra_args) + err = self._check_downloads(r, count) + if err: + errors.append(err) + else: + for s in r.stats: + samples.append(count / r.duration.total_seconds()) + return { + 'count': count, + 'samples': sample_size, + 'speed': mean(samples) if len(samples) else -1, + 'errors': errors + } + + def requests_url(self, url: str, proto: str, count: int): + self.info(f' {url}: ') + props = { + 'serial': self.do_requests(url=url, proto=proto, count=count), + 'par-6': self.do_requests(url=url, proto=proto, count=count, max_parallel=6), + 'par-25': self.do_requests(url=url, proto=proto, count=count, max_parallel=25), + 'par-50': self.do_requests(url=url, proto=proto, count=count, max_parallel=50), + 'par-100': self.do_requests(url=url, proto=proto, count=count, max_parallel=100), + } + self.info(f'ok.\n') + return props + + def requests(self, proto: str, test_httpd: bool = True, + test_caddy: bool = True) -> Dict[str, Any]: + scores = {} + if test_httpd: + if proto == 'h3': + port = self.env.h3_port + via = 'nghttpx' + descr = f'port {port}, proxying httpd' + else: + port = self.env.https_port + via = 'httpd' + descr = f'port {port}' + self.info(f'{via} requests\n') + self._make_docs_file(docs_dir=self.httpd.docs_dir, fname='reqs10.data', fsize=10*1024) + url1 = f'https://{self.env.domain1}:{port}/reqs10.data' + scores[via] = { + 'description': descr, + '10KB': self.requests_url(url=url1, proto=proto, count=10000), + } + if test_caddy and self.caddy: + port = self.caddy.port + via = 'caddy' + descr = f'port {port}' + self.info('caddy requests\n') + self._make_docs_file(docs_dir=self.caddy.docs_dir, fname='req10.data', fsize=10 * 1024) + url1 = f'https://{self.env.domain1}:{port}/req10.data' + scores[via] = { + 'description': descr, + '10KB': self.requests_url(url=url1, proto=proto, count=5000), } return scores def score_proto(self, proto: str, handshakes: bool = True, downloads: bool = True, + requests: bool = True, test_httpd: bool = True, test_caddy: bool = True): self.info(f"scoring {proto}\n") @@ -280,6 +352,10 @@ class ScoreCard: if self.env.curl_uses_lib(lib): p['implementation'] = lib break + elif proto == 'h1' or proto == 'http/1.1': + proto = 'http/1.1' + p['name'] = proto + p['implementation'] = 'hyper' if self.env.curl_uses_lib('hyper') else 'native' else: raise ScoreCardException(f"unknown protocol: {proto}") @@ -298,6 +374,10 @@ class ScoreCard: score['downloads'] = self.downloads(proto=proto, test_httpd=test_httpd, test_caddy=test_caddy) + if requests: + score['requests'] = self.requests(proto=proto, + test_httpd=test_httpd, + test_caddy=test_caddy) self.info("\n") return score @@ -310,44 +390,86 @@ class ScoreCard: def fmt_mbs(self, val): return f'{val/(1024*1024):0.000f} MB/s' if val >= 0 else '--' + def fmt_reqs(self, val): + return f'{val:0.000f} r/s' if val >= 0 else '--' + def print_score(self, score): print(f'{score["protocol"]["name"].upper()} in curl {score["curl"]} ({score["os"]}) via ' f'{score["protocol"]["implementation"]}/{score["protocol"]["version"]} ') if 'handshakes' in score: - print('Handshakes') - print(f' {"Host":<25} {"Connect":>12} {"Handshake":>12} {"Errors":<20}') + print(f'{"Handshakes":<24} {"ipv4":25} {"ipv6":28}') + print(f' {"Host":<17} {"Connect":>12} {"Handshake":>12} ' + f'{"Connect":>12} {"Handshake":>12} {"Errors":<20}') for key, val in score["handshakes"].items(): - print(f' {key:<25} {self.fmt_ms(val["connect"]):>12} ''' - f'{self.fmt_ms(val["handshake"]):>12} {"/".join(val["errors"]):<20}') + print(f' {key:<17} {self.fmt_ms(val["ipv4-connect"]):>12} ' + f'{self.fmt_ms(val["ipv4-handshake"]):>12} ' + f'{self.fmt_ms(val["ipv6-connect"]):>12} ' + f'{self.fmt_ms(val["ipv6-handshake"]):>12} {"/".join(val["ipv4-errors"] + val["ipv6-errors"]):<20}' + ) if 'downloads' in score: print('Downloads') + print(f' {"Server":<8} {"Size":>8} ' + f'{"Single":>12} {"Serial":>12} {"Parallel":>12} {"Errors":<20}') + skeys = {} for dkey, dval in score["downloads"].items(): - print(f' {dkey}: {dval["description"]}') + for k in dval.keys(): + skeys[k] = True + for skey in skeys: + for dkey, dval in score["downloads"].items(): + if skey in dval: + sval = dval[skey] + if isinstance(sval, str): + continue + errors = [] + for key, val in sval.items(): + if 'errors' in val: + errors.extend(val['errors']) + print(f' {dkey:<8} {skey:>8} ' + f'{self.fmt_mbs(sval["single"]["speed"]):>12} ' + f'{self.fmt_mbs(sval["serial"]["speed"]):>12} ' + f'{self.fmt_mbs(sval["parallel"]["speed"]):>12} ' + f' {"/".join(errors):<20}') + if 'requests' in score: + print('Requests, max in parallel') + print(f' {"Server":<8} {"Size":>8} ' + f'{"1 ":>12} {"6 ":>12} {"25 ":>12} ' + f'{"50 ":>12} {"100 ":>12} {"Errors":<20}') + for dkey, dval in score["requests"].items(): for skey, sval in dval.items(): if isinstance(sval, str): continue - print(f' {skey:<13} {"Samples":>10} {"Count":>10} {"Speed":>17} {"Errors":<20}') + errors = [] for key, val in sval.items(): - print(f' {key:<11} {val["samples"]:>10} ''' - f'{val["count"]:>10} {self.fmt_mbs(val["speed"]):>17} ' - f'{"/".join(val["errors"]):<20}') + if 'errors' in val: + errors.extend(val['errors']) + print(f' {dkey:<8} {skey:>8} ' + f'{self.fmt_reqs(sval["serial"]["speed"]):>12} ' + f'{self.fmt_reqs(sval["par-6"]["speed"]):>12} ' + f'{self.fmt_reqs(sval["par-25"]["speed"]):>12} ' + f'{self.fmt_reqs(sval["par-50"]["speed"]):>12} ' + f'{self.fmt_reqs(sval["par-100"]["speed"]):>12} ' + f' {"/".join(errors):<20}') def main(self): parser = argparse.ArgumentParser(prog='scorecard', description=""" Run a range of tests to give a scorecard for a HTTP protocol 'h3' or 'h2' implementation in curl. """) - parser.add_argument("-v", "--verbose", action='count', default=0, + parser.add_argument("-v", "--verbose", action='count', default=1, help="log more output on stderr") - parser.add_argument("-t", "--text", action='store_true', default=False, - help="print text instead of json") + parser.add_argument("-j", "--json", action='store_true', default=False, + help="print json instead of text") + parser.add_argument("-H", "--handshakes", action='store_true', default=False, + help="evaluate handshakes only") parser.add_argument("-d", "--downloads", action='store_true', default=False, help="evaluate downloads only") + parser.add_argument("-r", "--requests", action='store_true', default=False, + help="evaluate requests only") parser.add_argument("--httpd", action='store_true', default=False, help="evaluate httpd server only") parser.add_argument("--caddy", action='store_true', default=False, help="evaluate caddy server only") - parser.add_argument("protocols", nargs='*', help="Name(s) of protocol to score") + parser.add_argument("protocol", default='h2', nargs='?', help="Name of protocol to score") args = parser.parse_args() self.verbose = args.verbose @@ -357,13 +479,21 @@ class ScoreCard: console.setFormatter(logging.Formatter(logging.BASIC_FORMAT)) logging.getLogger('').addHandler(console) - protocols = args.protocols if len(args.protocols) else ['h2', 'h3'] + protocol = args.protocol handshakes = True downloads = True - test_httpd = True + requests = True + test_httpd = protocol != 'h3' test_caddy = True + if args.handshakes: + downloads = False + requests = False if args.downloads: handshakes = False + requests = False + if args.requests: + handshakes = False + downloads = False if args.caddy: test_caddy = True test_httpd = False @@ -383,7 +513,7 @@ class ScoreCard: assert self.httpd.exists(), f'httpd not found: {self.env.httpd}' self.httpd.clear_logs() assert self.httpd.start() - if 'h3' in protocols: + if 'h3' == protocol: self.nghttpx = Nghttpx(env=self.env) self.nghttpx.clear_logs() assert self.nghttpx.start() @@ -392,15 +522,15 @@ class ScoreCard: self.caddy.clear_logs() assert self.caddy.start() - for p in protocols: - score = self.score_proto(proto=p, handshakes=handshakes, - downloads=downloads, - test_caddy=test_caddy, - test_httpd=test_httpd) - if args.text: - self.print_score(score) - else: - print(json.JSONEncoder(indent=2).encode(score)) + score = self.score_proto(proto=protocol, handshakes=handshakes, + downloads=downloads, + requests=requests, + test_caddy=test_caddy, + test_httpd=test_httpd) + if args.json: + print(json.JSONEncoder(indent=2).encode(score)) + else: + self.print_score(score) except ScoreCardException as ex: sys.stderr.write(f"ERROR: {str(ex)}\n") diff --git a/tests/http/test_07_upload.py b/tests/http/test_07_upload.py index c2c7e51971..40f178a7cc 100644 --- a/tests/http/test_07_upload.py +++ b/tests/http/test_07_upload.py @@ -24,6 +24,8 @@ # ########################################################################### # +import difflib +import filecmp import logging import os import pytest @@ -178,11 +180,7 @@ class TestUpload: extra_args=['--parallel']) r.check_exit_code(0) r.check_stats(count=count, exp_status=200) - indata = open(fdata).readlines() - r.check_stats(count=count, exp_status=200) - for i in range(count): - respdata = open(curl.response_file(i)).readlines() - assert respdata == indata + self.check_download(count, fdata, curl) # PUT 100k @pytest.mark.parametrize("proto", ['http/1.1', 'h2', 'h3']) @@ -222,3 +220,14 @@ class TestUpload: respdata = open(curl.response_file(i)).readlines() assert respdata == exp_data + def check_download(self, count, srcfile, curl): + for i in range(count): + dfile = curl.download_file(i) + assert os.path.exists(dfile) + if not filecmp.cmp(srcfile, dfile, shallow=False): + diff = "".join(difflib.unified_diff(a=open(srcfile).readlines(), + b=open(dfile).readlines(), + fromfile=srcfile, + tofile=dfile, + n=1)) + assert False, f'download {dfile} differs:\n{diff}' diff --git a/tests/http/testenv/caddy.py b/tests/http/testenv/caddy.py index d789446f96..ea1343a950 100644 --- a/tests/http/testenv/caddy.py +++ b/tests/http/testenv/caddy.py @@ -126,10 +126,8 @@ class Caddy: r = curl.http_get(url=check_url) if r.exit_code == 0: return True - log.error(f'curl: {r}') - log.debug(f'waiting for caddy to become responsive: {r}') time.sleep(.1) - log.error(f"Server still not responding after {timeout}") + log.error(f"Caddy still not responding after {timeout}") return False def _rmf(self, path): diff --git a/tests/http/testenv/env.py b/tests/http/testenv/env.py index caf9249b1a..9d1a4255f8 100644 --- a/tests/http/testenv/env.py +++ b/tests/http/testenv/env.py @@ -63,6 +63,8 @@ class EnvConfig: self.config = DEF_CONFIG # check cur and its features self.curl = CURL + if 'CURL' in os.environ: + self.curl = os.environ['CURL'] self.curl_props = { 'version': None, 'os': None,