ngtcp2: accept upload via callback

Closes #4256
This commit is contained in:
Daniel Stenberg 2019-08-22 14:08:18 +02:00
parent 32d64b2e87
commit 0a5d28fa2e
No known key found for this signature in database
GPG Key ID: 5CC908FDB71E12C2
5 changed files with 153 additions and 19 deletions

View File

@ -126,6 +126,10 @@ CURLcode Curl_http_auth_act(struct connectdata *conn);
#endif /* CURL_DISABLE_HTTP */
#ifdef USE_NGHTTP3
struct h3out; /* see ngtcp2 */
#endif
/****************************************************************************
* HTTP unique setup
***************************************************************************/
@ -196,6 +200,10 @@ struct HTTP {
int64_t stream3_id; /* stream we are interested in */
bool firstbody; /* FALSE until body arrives */
bool h3req; /* FALSE until request is issued */
bool upload_done;
#endif
#ifdef USE_NGHTTP3
struct h3out *h3out; /* per-stream buffers for upload */
#endif
};

View File

@ -44,7 +44,10 @@ CURLcode Curl_quic_is_connected(struct connectdata *conn,
curl_socket_t sockfd,
bool *connected);
int Curl_quic_ver(char *p, size_t len);
CURLcode Curl_quic_done_sending(struct connectdata *conn);
#endif
#else /* ENABLE_QUIC */
#define Curl_quic_done_sending(x)
#endif /* !ENABLE_QUIC */
#endif /* HEADER_CURL_QUIC_H */

View File

@ -942,7 +942,9 @@ CURLcode Curl_done_sending(struct connectdata *conn,
{
k->keepon &= ~KEEP_SEND; /* we're done writing */
/* These functions should be moved into the handler struct! */
Curl_http2_done_sending(conn);
Curl_quic_done_sending(conn);
if(conn->bits.rewindaftersend) {
CURLcode result = Curl_readrewind(conn);

View File

@ -50,6 +50,20 @@
#define H3BUGF(x) do { } WHILE_FALSE
#endif
/*
* This holds outgoing HTTP/3 stream data that is used by nghttp3 until acked.
* It is used as a circular buffer. Add new bytes at the end until it reaches
* the far end, then start over at index 0 again.
*/
#define H3_SEND_SIZE (20*1024)
struct h3out {
uint8_t buf[H3_SEND_SIZE];
size_t used; /* number of bytes used in the buffer */
size_t windex; /* index in the buffer where to start writing the next
data block */
};
#define QUIC_MAX_STREAMS (256*1024)
#define QUIC_MAX_DATA (1*1024*1024)
#define QUIC_IDLE_TIMEOUT 60000 /* milliseconds */
@ -63,6 +77,9 @@ static CURLcode ng_process_ingress(struct connectdata *conn,
struct quicsocket *qs);
static CURLcode ng_flush_egress(struct connectdata *conn, int sockfd,
struct quicsocket *qs);
static int cb_h3_acked_stream_data(nghttp3_conn *conn, int64_t stream_id,
size_t datalen, void *user_data,
void *stream_user_data);
static ngtcp2_tstamp timestamp(void)
{
@ -1194,7 +1211,7 @@ static unsigned int ng_conncheck(struct connectdata *conn,
return CONNRESULT_NONE;
}
static const struct Curl_handler Curl_handler_h3_quiche = {
static const struct Curl_handler Curl_handler_http3 = {
"HTTPS", /* scheme */
ZERO_NULL, /* setup_connection */
Curl_http, /* do_it */
@ -1370,7 +1387,7 @@ static int cb_h3_send_stop_sending(nghttp3_conn *conn, int64_t stream_id,
}
static nghttp3_conn_callbacks ngh3_callbacks = {
NULL, /* acked_stream_data */
cb_h3_acked_stream_data, /* acked_stream_data */
cb_h3_stream_close,
cb_h3_recv_data,
cb_h3_deferred_consume,
@ -1386,6 +1403,7 @@ static nghttp3_conn_callbacks ngh3_callbacks = {
NULL, /* http_cancel_push */
cb_h3_send_stop_sending,
NULL, /* push_stream */
NULL, /* end_stream */
};
static int init_ngh3_conn(struct quicsocket *qs)
@ -1451,6 +1469,7 @@ static int init_ngh3_conn(struct quicsocket *qs)
static Curl_recv ngh3_stream_recv;
static Curl_send ngh3_stream_send;
/* incoming data frames on the h3 stream */
static ssize_t ngh3_stream_recv(struct connectdata *conn,
int sockindex,
char *buf,
@ -1497,18 +1516,38 @@ static ssize_t ngh3_stream_recv(struct connectdata *conn,
return -1;
}
/* this amount of data has now been acked on this stream */
static int cb_h3_acked_stream_data(nghttp3_conn *conn, int64_t stream_id,
size_t datalen, void *user_data,
void *stream_user_data)
{
struct Curl_easy *data = stream_user_data;
struct HTTP *stream = data->req.protop;
(void)conn;
(void)stream_id;
(void)user_data;
if(!data->set.postfields) {
stream->h3out->used -= datalen;
fprintf(stderr, "cb_h3_acked_stream_data, %zd bytes, %zd left unacked\n",
datalen, stream->h3out->used);
DEBUGASSERT(stream->h3out->used < H3_SEND_SIZE);
}
return 0;
}
static int cb_h3_readfunction(nghttp3_conn *conn, int64_t stream_id,
const uint8_t **pdata,
size_t *pdatalen, uint32_t *pflags,
void *user_data, void *stream_user_data)
{
struct Curl_easy *data = stream_user_data;
size_t nread;
struct HTTP *stream = data->req.protop;
(void)conn;
(void)stream_id;
(void)user_data;
fprintf(stderr, "called cb_h3_readfunction\n");
if(data->set.postfields) {
*pdata = data->set.postfields;
*pdatalen = data->state.infilesize;
@ -1516,6 +1555,48 @@ static int cb_h3_readfunction(nghttp3_conn *conn, int64_t stream_id,
return 0;
}
nread = CURLMIN(stream->upload_len, H3_SEND_SIZE - stream->h3out->used);
if(nread > 0) {
/* nghttp3 wants us to hold on to the data until it tells us it is okay to
delete it. Append the data at the end of the h3out buffer. Since we can
only return consecutive data, copy the amount that fits and the next
part comes in next invoke. */
struct h3out *out = stream->h3out;
if(nread + out->windex > H3_SEND_SIZE)
nread = H3_SEND_SIZE - out->windex;
memcpy(&out->buf[out->windex], stream->upload_mem, nread);
out->windex += nread;
out->used += nread;
/* that's the chunk we return to nghttp3 */
*pdata = &out->buf[out->windex];
*pdatalen = nread;
if(out->windex == H3_SEND_SIZE)
out->windex = 0; /* wrap */
stream->upload_mem += nread;
stream->upload_len -= nread;
if(data->state.infilesize != -1) {
stream->upload_left -= nread;
if(!stream->upload_left)
*pflags = NGHTTP3_DATA_FLAG_EOF;
}
fprintf(stderr, "cb_h3_readfunction %zd bytes%s (at %zd unacked)\n",
nread, *pflags == NGHTTP3_DATA_FLAG_EOF?" EOF":"",
out->used);
}
if(stream->upload_done && !stream->upload_len &&
(stream->upload_left <= 0)) {
fprintf(stderr, "!!!!!!!!! cb_h3_readfunction sets EOF\n");
*pdata = NULL;
*pdatalen = 0;
*pflags = NGHTTP3_DATA_FLAG_EOF;
}
else if(!nread) {
*pdatalen = 0;
return NGHTTP3_ERR_WOULDBLOCK;
}
return 0;
}
@ -1538,6 +1619,7 @@ static CURLcode http_request(struct connectdata *conn, const void *mem,
nghttp3_nv *nva = NULL;
int64_t stream3_id;
int rc;
struct h3out *h3out = NULL;
rc = ngtcp2_conn_open_bidi_stream(qs->qconn, &stream3_id, NULL);
if(rc) {
@ -1722,6 +1804,13 @@ static CURLcode http_request(struct connectdata *conn, const void *mem,
data_reader.read_data = cb_h3_readfunction;
h3out = calloc(sizeof(struct h3out), 1);
if(!h3out) {
result = CURLE_OUT_OF_MEMORY;
goto fail;
}
stream->h3out = h3out;
rc = nghttp3_conn_submit_request(qs->h3conn, stream->stream3_id,
nva, nheader, &data_reader,
conn->data);
@ -1746,15 +1835,6 @@ static CURLcode http_request(struct connectdata *conn, const void *mem,
Curl_safefree(nva);
if(!stream->upload_left) {
/* done with this stream, FIN it */
rc = nghttp3_conn_end_stream(qs->h3conn, stream->stream3_id);
if(rc) {
result = CURLE_SEND_ERROR;
goto fail;
}
}
infof(data, "Using HTTP/3 Stream ID: %x (easy handle %p)\n",
stream3_id, (void *)data);
@ -1784,8 +1864,17 @@ static ssize_t ngh3_stream_send(struct connectdata *conn,
sent = len;
}
else {
(void)qs;
/* TODO */
fprintf(stderr, "ngh3_stream_send() wants to send %zd bytes\n", len);
if(!stream->upload_len) {
stream->upload_mem = mem;
stream->upload_len = len;
(void)nghttp3_conn_resume_stream(qs->h3conn, stream->stream3_id);
sent = len;
}
else {
*curlcode = CURLE_AGAIN;
return -1;
}
}
if(ng_flush_egress(conn, sockfd, qs)) {
@ -1801,7 +1890,7 @@ static void ng_has_connected(struct connectdata *conn, int tempindex)
{
conn->recv[FIRSTSOCKET] = ngh3_stream_recv;
conn->send[FIRSTSOCKET] = ngh3_stream_send;
conn->handler = &Curl_handler_h3_quiche;
conn->handler = &Curl_handler_http3;
conn->bits.multiplex = TRUE; /* at least potentially multiplexed */
conn->httpversion = 30;
conn->bundle->multiuse = BUNDLE_MULTIPLEX;
@ -2022,4 +2111,22 @@ static CURLcode ng_flush_egress(struct connectdata *conn, int sockfd,
return CURLE_OK;
}
/*
* Called from transfer.c:done_sending when we stop HTTP/3 uploading.
*/
CURLcode Curl_quic_done_sending(struct connectdata *conn)
{
if(conn->handler == &Curl_handler_http3) {
/* only for HTTP/3 transfers */
struct HTTP *stream = conn->data->req.protop;
struct quicsocket *qs = conn->quic;
fprintf(stderr, "!!! Curl_quic_done_sending stream %zu\n",
stream->stream3_id);
stream->upload_done = TRUE;
(void)nghttp3_conn_resume_stream(qs->h3conn, stream->stream3_id);
}
return CURLE_OK;
}
#endif

View File

@ -116,7 +116,7 @@ static CURLcode quiche_do(struct connectdata *conn, bool *done)
return Curl_http(conn, done);
}
static const struct Curl_handler Curl_handler_h3_quiche = {
static const struct Curl_handler Curl_handler_http3 = {
"HTTPS", /* scheme */
ZERO_NULL, /* setup_connection */
quiche_do, /* do_it */
@ -232,7 +232,7 @@ static CURLcode quiche_has_connected(struct connectdata *conn,
conn->recv[sockindex] = h3_stream_recv;
conn->send[sockindex] = h3_stream_send;
conn->handler = &Curl_handler_h3_quiche;
conn->handler = &Curl_handler_http3;
conn->bits.multiplex = TRUE; /* at least potentially multiplexed */
conn->httpversion = 30;
conn->bundle->multiuse = BUNDLE_MULTIPLEX;
@ -750,5 +750,19 @@ fail:
return result;
}
/*
* Called from transfer.c:done_sending when we stop HTTP/3 uploading.
*/
CURLcode Curl_quic_done_sending(struct connectdata *conn)
{
if(conn->handler == &Curl_handler_http3) {
/* only for HTTP/3 transfers */
struct HTTP *stream = conn->data->req.protop;
fprintf(stderr, "!!! Curl_quic_done_sending\n");
stream->upload_done = TRUE;
}
return CURLE_OK;
}
#endif