transfer: Curl_sendrecv() and event related improvements

- Renames Curl_readwrite() to Curl_sendrecv() to reflect that it
  is mainly about talking to the server, not reads or writes to the
  client. Add a `nowp` parameter since the single caller already
  has this.
- Curl_sendrecv() now runs all possible operations whenever it is
  called and either it had been polling sockets or the 'select_bits'
  are set.
  POLL_IN/POLL_OUT are not always directly related to send/recv
  operations. Filters like HTTP/2, QUIC or TLS may monitor reverse
  directions. If a transfer does not want to send (KEEP_SEND), it
  will not do so, as before. Same for receives.
- Curl_update_timer() now checks the absolute timestamp of an expiry
  and the last/new timeout to determine if the application needs
  to stop/start/restart its timer. This fixes edge cases where
  updates did not happen as they should have.
- improved --test-event curl_easy_perform() simulation to handle
  situations where no sockets are registered but a timeout is
  in place.
- fixed bug in events_socket() that complained about removing
  a socket that was unknown, when indeed it had removed the socket
  just before, only it was the last in the list
- fixed conncache's internal handle to carry the multi instance
  (where the cache has one) so that operations on the closure handle
  trigger event callbacks correctly.
- fixed conncache to not POLL_REMOVE a socket twice when a conneciton
  was closed.

Closes #14561
This commit is contained in:
Stefan Eissing 2024-08-15 13:16:53 +02:00 committed by Daniel Stenberg
parent 432f2fd9ac
commit a58b50fca6
No known key found for this signature in database
GPG Key ID: 5CC908FDB71E12C2
12 changed files with 320 additions and 285 deletions

View File

@ -404,6 +404,9 @@ CURLcode Curl_socket_open(struct Curl_easy *data,
static int socket_close(struct Curl_easy *data, struct connectdata *conn,
int use_callback, curl_socket_t sock)
{
if(CURL_SOCKET_BAD == sock)
return 0;
if(use_callback && conn && conn->fclosesocket) {
int rc;
Curl_multi_closed(data, sock);

View File

@ -131,6 +131,12 @@ int Curl_conncache_init(struct conncache *connc,
if(!connc->closure_handle)
return 1; /* bad */
connc->closure_handle->state.internal = true;
/* TODO: this is quirky. We need an internal handle for certain
* operations, but we do not add it to the multi (if there is one).
* But we give it the multi so that socket event operations can work.
* Probably better to have an internal handle owned by the multi that
* can be used for conncache operations. */
connc->closure_handle->multi = multi;
#ifdef DEBUGBUILD
if(getenv("CURL_DEBUG"))
connc->closure_handle->set.verbose = true;
@ -146,6 +152,10 @@ int Curl_conncache_init(struct conncache *connc,
void Curl_conncache_destroy(struct conncache *connc)
{
if(connc) {
if(connc->closure_handle) {
connc->closure_handle->multi = NULL;
Curl_close(&connc->closure_handle);
}
Curl_hash_destroy(&connc->hash);
connc->multi = NULL;
}
@ -611,7 +621,8 @@ static void connc_close_all(struct conncache *connc)
sigpipe_apply(data, &pipe_st);
Curl_hostcache_clean(data, data->dns.hostcache);
Curl_close(&data);
connc->closure_handle->multi = NULL;
Curl_close(&connc->closure_handle);
sigpipe_restore(&pipe_st);
}
@ -980,14 +991,6 @@ static void connc_disconnect(struct Curl_easy *data,
Curl_attach_connection(data, conn);
if(connc && connc->multi && connc->multi->socket_cb) {
struct easy_pollset ps;
/* With an empty pollset, all previously polled sockets will be removed
* via the multi_socket API callback. */
memset(&ps, 0, sizeof(ps));
(void)Curl_multi_pollset_ev(connc->multi, data, &ps, &conn->shutdown_poll);
}
connc_run_conn_shutdown_handler(data, conn);
if(do_shutdown) {
/* Make a last attempt to shutdown handlers and filters, if

View File

@ -391,25 +391,22 @@ struct events {
int running_handles; /* store the returned number */
};
#define DEBUG_EV_POLL 0
/* events_timer
*
* Callback that gets called with a new value when the timeout should be
* updated.
*/
static int events_timer(struct Curl_multi *multi, /* multi handle */
long timeout_ms, /* see above */
void *userp) /* private callback pointer */
{
struct events *ev = userp;
(void)multi;
if(timeout_ms == -1)
/* timeout removed */
timeout_ms = 0;
else if(timeout_ms == 0)
/* timeout is already reached! */
timeout_ms = 1; /* trigger asap */
#if DEBUG_EV_POLL
fprintf(stderr, "events_timer: set timeout %ldms\n", timeout_ms);
#endif
ev->ms = timeout_ms;
ev->msbump = TRUE;
return 0;
@ -463,6 +460,7 @@ static int events_socket(struct Curl_easy *easy, /* easy handle */
struct events *ev = userp;
struct socketmonitor *m;
struct socketmonitor *prev = NULL;
bool found = FALSE;
#if defined(CURL_DISABLE_VERBOSE_STRINGS)
(void) easy;
@ -472,7 +470,7 @@ static int events_socket(struct Curl_easy *easy, /* easy handle */
m = ev->list;
while(m) {
if(m->socket.fd == s) {
found = TRUE;
if(what == CURL_POLL_REMOVE) {
struct socketmonitor *nxt = m->next;
/* remove this node from the list of monitored sockets */
@ -481,7 +479,6 @@ static int events_socket(struct Curl_easy *easy, /* easy handle */
else
ev->list = nxt;
free(m);
m = nxt;
infof(easy, "socket cb: socket %" CURL_FORMAT_SOCKET_T
" REMOVED", s);
}
@ -499,12 +496,13 @@ static int events_socket(struct Curl_easy *easy, /* easy handle */
prev = m;
m = m->next; /* move to next node */
}
if(!m) {
if(!found) {
if(what == CURL_POLL_REMOVE) {
/* this happens a bit too often, libcurl fix perhaps? */
/* fprintf(stderr,
"%s: socket %d asked to be REMOVED but not present!\n",
__func__, s); */
/* should not happen if our logic is correct, but is no drama. */
DEBUGF(infof(easy, "socket cb: asked to REMOVE socket %"
CURL_FORMAT_SOCKET_T "but not present!", s));
DEBUGASSERT(0);
}
else {
m = malloc(sizeof(struct socketmonitor));
@ -565,14 +563,15 @@ static CURLcode wait_or_timeout(struct Curl_multi *multi, struct events *ev)
int pollrc;
int i;
struct curltime before;
struct curltime after;
/* populate the fds[] array */
for(m = ev->list, f = &fds[0]; m; m = m->next) {
f->fd = m->socket.fd;
f->events = m->socket.events;
f->revents = 0;
/* fprintf(stderr, "poll() %d check socket %d\n", numfds, f->fd); */
#if DEBUG_EV_POLL
fprintf(stderr, "poll() %d check socket %d\n", numfds, f->fd);
#endif
f++;
numfds++;
}
@ -580,12 +579,27 @@ static CURLcode wait_or_timeout(struct Curl_multi *multi, struct events *ev)
/* get the time stamp to use to figure out how long poll takes */
before = Curl_now();
/* wait for activity or timeout */
pollrc = Curl_poll(fds, (unsigned int)numfds, ev->ms);
if(pollrc < 0)
return CURLE_UNRECOVERABLE_POLL;
after = Curl_now();
if(numfds) {
/* wait for activity or timeout */
#if DEBUG_EV_POLL
fprintf(stderr, "poll(numfds=%d, timeout=%ldms)\n", numfds, ev->ms);
#endif
pollrc = Curl_poll(fds, (unsigned int)numfds, ev->ms);
#if DEBUG_EV_POLL
fprintf(stderr, "poll(numfds=%d, timeout=%ldms) -> %d\n",
numfds, ev->ms, pollrc);
#endif
if(pollrc < 0)
return CURLE_UNRECOVERABLE_POLL;
}
else {
#if DEBUG_EV_POLL
fprintf(stderr, "poll, but no fds, wait timeout=%ldms\n", ev->ms);
#endif
pollrc = 0;
if(ev->ms > 0)
Curl_wait_ms(ev->ms);
}
ev->msbump = FALSE; /* reset here */
@ -618,12 +632,17 @@ static CURLcode wait_or_timeout(struct Curl_multi *multi, struct events *ev)
}
}
if(!ev->msbump) {
if(!ev->msbump && ev->ms >= 0) {
/* If nothing updated the timeout, we decrease it by the spent time.
* If it was updated, it has the new timeout time stored already.
*/
timediff_t timediff = Curl_timediff(after, before);
timediff_t timediff = Curl_timediff(Curl_now(), before);
if(timediff > 0) {
#if DEBUG_EV_POLL
fprintf(stderr, "poll timeout %ldms not updated, decrease by "
"time spent %ldms\n", ev->ms, (long)timediff);
#endif
if(timediff > ev->ms)
ev->ms = 0;
else
@ -656,7 +675,7 @@ static CURLcode easy_events(struct Curl_multi *multi)
{
/* this struct is made static to allow it to be used after this function
returns and curl_multi_remove_handle() is called */
static struct events evs = {2, FALSE, 0, NULL, 0};
static struct events evs = {-1, FALSE, 0, NULL, 0};
/* if running event-based, do some further multi inits */
events_setup(multi, &evs);
@ -1121,7 +1140,7 @@ CURLcode curl_easy_pause(struct Curl_easy *data, int action)
(data->mstate == MSTATE_PERFORMING ||
data->mstate == MSTATE_RATELIMITING));
/* Unpausing writes is detected on the next run in
* transfer.c:Curl_readwrite(). This is because this may result
* transfer.c:Curl_sendrecv(). This is because this may result
* in a transfer error if the application's callbacks fail */
/* Set the new keepon state, so it takes effect no matter what error

View File

@ -4441,7 +4441,8 @@ static CURLcode cr_exp100_read(struct Curl_easy *data,
}
/* We are now waiting for a reply from the server or
* a timeout on our side IFF the request has been fully sent. */
DEBUGF(infof(data, "cr_exp100_read, start AWAITING_CONTINUE"));
DEBUGF(infof(data, "cr_exp100_read, start AWAITING_CONTINUE, "
"timeout %ldms", data->set.expect_100_timeout));
ctx->state = EXP100_AWAITING_CONTINUE;
ctx->start = Curl_now();
Curl_expire(data, data->set.expect_100_timeout, EXPIRE_100_TIMEOUT);

View File

@ -94,9 +94,12 @@ static CURLMcode add_next_timeout(struct curltime now,
struct Curl_multi *multi,
struct Curl_easy *d);
static CURLMcode multi_timeout(struct Curl_multi *multi,
struct curltime *expire_time,
long *timeout_ms);
static void process_pending_handles(struct Curl_multi *multi);
static void multi_xfer_bufs_free(struct Curl_multi *multi);
static void Curl_expire_ex(struct Curl_easy *data, const struct curltime *nowp,
timediff_t milli, expire_id id);
#ifdef DEBUGBUILD
static const char * const multi_statename[]={
@ -418,6 +421,7 @@ struct Curl_multi *Curl_multi_handle(size_t hashsize, /* socket hash */
multi->multiplexing = TRUE;
multi->max_concurrent_streams = 100;
multi->last_timeout_ms = -1;
#ifdef USE_WINSOCK
multi->wsa_event = WSACreateEvent();
@ -527,18 +531,6 @@ CURLMcode curl_multi_add_handle(struct Curl_multi *multi,
happen. */
Curl_expire(data, 0, EXPIRE_RUN_NOW);
/* A somewhat crude work-around for a little glitch in Curl_update_timer()
that happens if the lastcall time is set to the same time when the handle
is removed as when the next handle is added, as then the check in
Curl_update_timer() that prevents calling the application multiple times
with the same timer info will not trigger and then the new handle's
timeout will not be notified to the app.
The work-around is thus simply to clear the 'lastcall' variable to force
Curl_update_timer() to always trigger a callback to the app when a new
easy handle is added */
memset(&multi->timer_lastcall, 0, sizeof(multi->timer_lastcall));
rc = Curl_update_timer(multi);
if(rc) {
data->multi = NULL; /* not anymore */
@ -799,6 +791,7 @@ CURLMcode curl_multi_remove_handle(struct Curl_multi *multi,
bool premature;
struct Curl_llist_node *e;
CURLMcode rc;
bool removed_timer = FALSE;
/* First, make some basic checks that the CURLM handle is a good handle */
if(!GOOD_MULTI_HANDLE(multi))
@ -849,7 +842,7 @@ CURLMcode curl_multi_remove_handle(struct Curl_multi *multi,
/* The timer must be shut down before data->multi is set to NULL, else the
timenode will remain in the splay tree after curl_easy_cleanup is
called. Do it after multi_done() in case that sets another time! */
Curl_expire_clear(data);
removed_timer = Curl_expire_clear(data);
/* the handle is in a list, remove it from whichever it is */
Curl_node_remove(&data->multi_queue);
@ -930,9 +923,11 @@ CURLMcode curl_multi_remove_handle(struct Curl_multi *multi,
process_pending_handles(multi);
rc = Curl_update_timer(multi);
if(rc)
return rc;
if(removed_timer) {
rc = Curl_update_timer(multi);
if(rc)
return rc;
}
return CURLM_OK;
}
@ -1167,7 +1162,6 @@ CURLMcode curl_multi_fdset(struct Curl_multi *multi,
Some easy handles may not have connected to the remote host yet,
and then we must make sure that is done. */
int this_max_fd = -1;
struct easy_pollset ps;
struct Curl_llist_node *e;
(void)exc_fd_set; /* not used */
@ -1177,23 +1171,22 @@ CURLMcode curl_multi_fdset(struct Curl_multi *multi,
if(multi->in_callback)
return CURLM_RECURSIVE_API_CALL;
memset(&ps, 0, sizeof(ps));
for(e = Curl_llist_head(&multi->process); e; e = Curl_node_next(e)) {
struct Curl_easy *data = Curl_node_elem(e);
unsigned int i;
multi_getsock(data, &ps);
multi_getsock(data, &data->last_poll);
for(i = 0; i < ps.num; i++) {
if(!FDSET_SOCK(ps.sockets[i]))
for(i = 0; i < data->last_poll.num; i++) {
if(!FDSET_SOCK(data->last_poll.sockets[i]))
/* pretend it does not exist */
continue;
if(ps.actions[i] & CURL_POLL_IN)
FD_SET(ps.sockets[i], read_fd_set);
if(ps.actions[i] & CURL_POLL_OUT)
FD_SET(ps.sockets[i], write_fd_set);
if((int)ps.sockets[i] > this_max_fd)
this_max_fd = (int)ps.sockets[i];
if(data->last_poll.actions[i] & CURL_POLL_IN)
FD_SET(data->last_poll.sockets[i], read_fd_set);
if(data->last_poll.actions[i] & CURL_POLL_OUT)
FD_SET(data->last_poll.sockets[i], write_fd_set);
if((int)data->last_poll.sockets[i] > this_max_fd)
this_max_fd = (int)data->last_poll.sockets[i];
}
}
@ -1208,7 +1201,6 @@ CURLMcode curl_multi_waitfds(struct Curl_multi *multi,
unsigned int *fd_count)
{
struct curl_waitfds cwfds;
struct easy_pollset ps;
CURLMcode result = CURLM_OK;
struct Curl_llist_node *e;
@ -1222,11 +1214,10 @@ CURLMcode curl_multi_waitfds(struct Curl_multi *multi,
return CURLM_RECURSIVE_API_CALL;
Curl_waitfds_init(&cwfds, ufds, size);
memset(&ps, 0, sizeof(ps));
for(e = Curl_llist_head(&multi->process); e; e = Curl_node_next(e)) {
struct Curl_easy *data = Curl_node_elem(e);
multi_getsock(data, &ps);
if(Curl_waitfds_add_ps(&cwfds, &ps)) {
multi_getsock(data, &data->last_poll);
if(Curl_waitfds_add_ps(&cwfds, &data->last_poll)) {
result = CURLM_OUT_OF_MEMORY;
goto out;
}
@ -1269,8 +1260,8 @@ static CURLMcode multi_wait(struct Curl_multi *multi,
bool extrawait, /* when no socket, wait */
bool use_wakeup)
{
struct easy_pollset ps;
size_t i;
struct curltime expire_time;
long timeout_internal;
int retcode = 0;
struct pollfd a_few_on_stack[NUM_POLLS_ON_STACK];
@ -1297,14 +1288,13 @@ static CURLMcode multi_wait(struct Curl_multi *multi,
return CURLM_BAD_FUNCTION_ARGUMENT;
Curl_pollfds_init(&cpfds, a_few_on_stack, NUM_POLLS_ON_STACK);
memset(&ps, 0, sizeof(ps));
/* Add the curl handles to our pollfds first */
for(e = Curl_llist_head(&multi->process); e; e = Curl_node_next(e)) {
struct Curl_easy *data = Curl_node_elem(e);
multi_getsock(data, &ps);
if(Curl_pollfds_add_ps(&cpfds, &ps)) {
multi_getsock(data, &data->last_poll);
if(Curl_pollfds_add_ps(&cpfds, &data->last_poll)) {
result = CURLM_OUT_OF_MEMORY;
goto out;
}
@ -1367,7 +1357,7 @@ static CURLMcode multi_wait(struct Curl_multi *multi,
* poll. Collecting the sockets may install new timers by protocols
* and connection filters.
* Use the shorter one of the internal and the caller requested timeout. */
(void)multi_timeout(multi, &timeout_internal);
(void)multi_timeout(multi, &expire_time, &timeout_internal);
if((timeout_internal >= 0) && (timeout_internal < (long)timeout_ms))
timeout_ms = (int)timeout_internal;
@ -1443,16 +1433,15 @@ static CURLMcode multi_wait(struct Curl_multi *multi,
for(e = Curl_llist_head(&multi->process); e && !result;
e = Curl_node_next(e)) {
struct Curl_easy *data = Curl_node_elem(e);
multi_getsock(data, &ps);
for(i = 0; i < ps.num; i++) {
for(i = 0; i < data->last_poll.num; i++) {
wsa_events.lNetworkEvents = 0;
if(WSAEnumNetworkEvents(ps.sockets[i], NULL,
if(WSAEnumNetworkEvents(data->last_poll.sockets[i], NULL,
&wsa_events) == 0) {
if(ret && !pollrc && wsa_events.lNetworkEvents)
retcode++;
}
WSAEventSelect(ps.sockets[i], multi->wsa_event, 0);
WSAEventSelect(data->last_poll.sockets[i], multi->wsa_event, 0);
}
}
}
@ -2397,7 +2386,7 @@ static CURLMcode multi_runsingle(struct Curl_multi *multi,
}
/* read/write data if it is ready to do so */
result = Curl_readwrite(data);
result = Curl_sendrecv(data, nowp);
if(data->req.done || (result == CURLE_RECV_ERROR)) {
/* If CURLE_RECV_ERROR happens early enough, we assume it was a race
@ -3064,11 +3053,15 @@ void Curl_multi_closed(struct Curl_easy *data, curl_socket_t s)
if(data) {
/* if there is still an easy handle associated with this connection */
struct Curl_multi *multi = data->multi;
DEBUGF(infof(data, "Curl_multi_closed, fd=%" CURL_FORMAT_SOCKET_T
" multi is %p", s, (void *)multi));
if(multi) {
/* this is set if this connection is part of a handle that is added to
a multi handle, and only then this is necessary */
struct Curl_sh_entry *entry = sh_getentry(&multi->sockhash, s);
DEBUGF(infof(data, "Curl_multi_closed, fd=%" CURL_FORMAT_SOCKET_T
" entry is %p", s, (void *)entry));
if(entry) {
int rc = 0;
if(multi->socket_cb) {
@ -3145,6 +3138,59 @@ static CURLMcode add_next_timeout(struct curltime now,
return CURLM_OK;
}
struct multi_run_ctx {
struct Curl_multi *multi;
struct curltime now;
size_t run_xfers;
SIGPIPE_MEMBER(pipe_st);
bool run_conn_cache;
};
static CURLMcode multi_run_expired(struct multi_run_ctx *mrc)
{
struct Curl_multi *multi = mrc->multi;
struct Curl_easy *data = NULL;
struct Curl_tree *t = NULL;
CURLMcode result = CURLM_OK;
/*
* The loop following here will go on as long as there are expire-times left
* to process (compared to mrc->now) in the splay and 'data' will be
* re-assigned for every expired handle we deal with.
*/
while(1) {
/* Check if there is one (more) expired timer to deal with! This function
extracts a matching node if there is one */
multi->timetree = Curl_splaygetbest(mrc->now, multi->timetree, &t);
if(!t)
goto out;
data = Curl_splayget(t); /* assign this for next loop */
if(!data)
continue;
(void)add_next_timeout(mrc->now, multi, data);
if(data == multi->conn_cache.closure_handle) {
mrc->run_conn_cache = TRUE;
continue;
}
mrc->run_xfers++;
sigpipe_apply(data, &mrc->pipe_st);
result = multi_runsingle(multi, &mrc->now, data);
if(CURLM_OK >= result) {
/* get the socket(s) and check if the state has been changed since
last */
result = singlesocket(multi, data);
if(result)
goto out;
}
}
out:
return result;
}
static CURLMcode multi_socket(struct Curl_multi *multi,
bool checkall,
curl_socket_t s,
@ -3153,10 +3199,13 @@ static CURLMcode multi_socket(struct Curl_multi *multi,
{
CURLMcode result = CURLM_OK;
struct Curl_easy *data = NULL;
struct Curl_tree *t = NULL;
struct curltime now = Curl_now();
bool run_conn_cache = FALSE;
SIGPIPE_VARIABLE(pipe_st);
struct multi_run_ctx mrc;
(void)ev_bitmask;
memset(&mrc, 0, sizeof(mrc));
mrc.multi = multi;
mrc.now = Curl_now();
sigpipe_init(&mrc.pipe_st);
if(checkall) {
struct Curl_llist_node *e;
@ -3171,10 +3220,10 @@ static CURLMcode multi_socket(struct Curl_multi *multi,
result = singlesocket(multi, Curl_node_elem(e));
}
}
/* or should we fall-through and do the timer-based stuff? */
return result;
mrc.run_conn_cache = TRUE;
goto out;
}
if(s != CURL_SOCKET_TIMEOUT) {
struct Curl_sh_entry *entry = sh_getentry(&multi->sockhash, s);
@ -3201,76 +3250,42 @@ static CURLMcode multi_socket(struct Curl_multi *multi,
DEBUGASSERT(data->magic == CURLEASY_MAGIC_NUMBER);
if(data == multi->conn_cache.closure_handle)
run_conn_cache = TRUE;
mrc.run_conn_cache = TRUE;
else {
if(data->conn && !(data->conn->handler->flags & PROTOPT_DIRLOCK))
/* set socket event bitmask if they are not locked */
data->state.select_bits |= (unsigned char)ev_bitmask;
Curl_expire(data, 0, EXPIRE_RUN_NOW);
/* Expire with out current now, so we will get it below when
* asking the splaytree for expired transfers. */
Curl_expire_ex(data, &mrc.now, 0, EXPIRE_RUN_NOW);
}
}
/* Now we fall-through and do the timer-based stuff, since we do not want
to force the user to have to deal with timeouts as long as at least
one connection in fact has traffic. */
now = Curl_now(); /* get a newer time since the multi_runsingle() loop
may have taken some time */
}
}
else {
/* Asked to run due to time-out. Clear the 'lastcall' variable to force
Curl_update_timer() to trigger a callback to the app again even if the
same timeout is still the one to run after this call. That handles the
case when the application asks libcurl to run the timeout
prematurely. */
memset(&multi->timer_lastcall, 0, sizeof(multi->timer_lastcall));
result = multi_run_expired(&mrc);
if(result)
goto out;
if(mrc.run_xfers) {
/* Running transfers takes time. With a new timestamp, we might catch
* other expires which are due now. Instead of telling the application
* to set a 0 timeout and call us again, we run them here.
* Do that only once or it might be unfair to transfers on other
* sockets. */
mrc.now = Curl_now();
result = multi_run_expired(&mrc);
}
/*
* The loop following here will go on as long as there are expire-times left
* to process in the splay and 'data' will be re-assigned for every expired
* handle we deal with.
*/
sigpipe_init(&pipe_st);
do {
if(data == multi->conn_cache.closure_handle)
run_conn_cache = TRUE;
/* the first loop lap 'data' can be NULL */
else if(data) {
sigpipe_apply(data, &pipe_st);
result = multi_runsingle(multi, &now, data);
if(CURLM_OK >= result) {
/* get the socket(s) and check if the state has been changed since
last */
result = singlesocket(multi, data);
if(result)
break;
}
}
/* Check if there is one (more) expired timer to deal with! This function
extracts a matching node if there is one */
multi->timetree = Curl_splaygetbest(now, multi->timetree, &t);
if(t) {
data = Curl_splayget(t); /* assign this for next loop */
(void)add_next_timeout(now, multi, data);
}
} while(t);
if(run_conn_cache) {
sigpipe_apply(multi->conn_cache.closure_handle, &pipe_st);
out:
if(mrc.run_conn_cache) {
sigpipe_apply(multi->conn_cache.closure_handle, &mrc.pipe_st);
Curl_conncache_multi_perform(multi);
}
sigpipe_restore(&pipe_st);
sigpipe_restore(&mrc.pipe_st);
if(running_handles)
*running_handles = (int)multi->num_alive;
if(CURLM_OK >= result)
result = Curl_update_timer(multi);
return result;
}
@ -3359,39 +3374,28 @@ CURLMcode curl_multi_setopt(struct Curl_multi *multi,
CURLMcode curl_multi_socket(struct Curl_multi *multi, curl_socket_t s,
int *running_handles)
{
CURLMcode result;
if(multi->in_callback)
return CURLM_RECURSIVE_API_CALL;
result = multi_socket(multi, FALSE, s, 0, running_handles);
if(CURLM_OK >= result)
result = Curl_update_timer(multi);
return result;
return multi_socket(multi, FALSE, s, 0, running_handles);
}
CURLMcode curl_multi_socket_action(struct Curl_multi *multi, curl_socket_t s,
int ev_bitmask, int *running_handles)
{
CURLMcode result;
if(multi->in_callback)
return CURLM_RECURSIVE_API_CALL;
result = multi_socket(multi, FALSE, s, ev_bitmask, running_handles);
if(CURLM_OK >= result)
result = Curl_update_timer(multi);
return result;
return multi_socket(multi, FALSE, s, ev_bitmask, running_handles);
}
CURLMcode curl_multi_socket_all(struct Curl_multi *multi, int *running_handles)
{
CURLMcode result;
if(multi->in_callback)
return CURLM_RECURSIVE_API_CALL;
result = multi_socket(multi, TRUE, CURL_SOCKET_BAD, 0, running_handles);
if(CURLM_OK >= result)
result = Curl_update_timer(multi);
return result;
return multi_socket(multi, TRUE, CURL_SOCKET_BAD, 0, running_handles);
}
static CURLMcode multi_timeout(struct Curl_multi *multi,
struct curltime *expire_time,
long *timeout_ms)
{
static const struct curltime tv_zero = {0, 0};
@ -3407,6 +3411,9 @@ static CURLMcode multi_timeout(struct Curl_multi *multi,
/* splay the lowest to the bottom */
multi->timetree = Curl_splay(tv_zero, multi->timetree);
/* this will not return NULL from a non-emtpy tree, but some compilers
* are not convinced of that. Analyzers are hard. */
*expire_time = multi->timetree? multi->timetree->key : tv_zero;
/* 'multi->timetree' will be non-NULL here but the compilers sometimes
yell at us if we assume so */
@ -3418,12 +3425,15 @@ static CURLMcode multi_timeout(struct Curl_multi *multi,
overly long timeouts */
*timeout_ms = (long)diff;
}
else
else {
/* 0 means immediately */
*timeout_ms = 0;
}
}
else
else {
*expire_time = tv_zero;
*timeout_ms = -1;
}
return CURLM_OK;
}
@ -3431,6 +3441,8 @@ static CURLMcode multi_timeout(struct Curl_multi *multi,
CURLMcode curl_multi_timeout(struct Curl_multi *multi,
long *timeout_ms)
{
struct curltime expire_time;
/* First, make some basic checks that the CURLM handle is a good handle */
if(!GOOD_MULTI_HANDLE(multi))
return CURLM_BAD_HANDLE;
@ -3438,56 +3450,79 @@ CURLMcode curl_multi_timeout(struct Curl_multi *multi,
if(multi->in_callback)
return CURLM_RECURSIVE_API_CALL;
return multi_timeout(multi, timeout_ms);
return multi_timeout(multi, &expire_time, timeout_ms);
}
#define DEBUG_UPDATE_TIMER 0
/*
* Tell the application it should update its timers, if it subscribes to the
* update timer callback.
*/
CURLMcode Curl_update_timer(struct Curl_multi *multi)
{
struct curltime expire_ts;
long timeout_ms;
int rc;
bool set_value = FALSE;
if(!multi->timer_cb || multi->dead)
return CURLM_OK;
if(multi_timeout(multi, &timeout_ms)) {
if(multi_timeout(multi, &expire_ts, &timeout_ms)) {
return CURLM_OK;
}
if(timeout_ms < 0) {
static const struct curltime none = {0, 0};
if(Curl_timediff_us(none, multi->timer_lastcall)) {
multi->timer_lastcall = none;
/* there is no timeout now but there was one previously, tell the app to
disable it */
set_in_callback(multi, TRUE);
rc = multi->timer_cb(multi, -1, multi->timer_userp);
set_in_callback(multi, FALSE);
if(rc == -1) {
multi->dead = TRUE;
return CURLM_ABORTED_BY_CALLBACK;
}
return CURLM_OK;
if(timeout_ms < 0 && multi->last_timeout_ms < 0) {
#if DEBUG_UPDATE_TIMER
fprintf(stderr, "Curl_update_timer(), still no timeout, no change\n");
#endif
}
else if(timeout_ms < 0) {
/* there is no timeout now but there was one previously */
#if DEBUG_UPDATE_TIMER
fprintf(stderr, "Curl_update_timer(), remove timeout, "
" last_timeout=%ldms\n", multi->last_timeout_ms);
#endif
timeout_ms = -1; /* normalize */
set_value = TRUE;
}
else if(multi->last_timeout_ms < 0) {
#if DEBUG_UPDATE_TIMER
fprintf(stderr, "Curl_update_timer(), had no timeout, set now\n");
#endif
set_value = TRUE;
}
else if(Curl_timediff_us(multi->last_expire_ts, expire_ts)) {
/* We had a timeout before and have one now, the absolute timestamp
* differs. The relative timeout_ms may be the same, but the starting
* point differs. Let the application restart its timer. */
#if DEBUG_UPDATE_TIMER
fprintf(stderr, "Curl_update_timer(), expire timestamp changed\n");
#endif
set_value = TRUE;
}
else {
/* We have same expire time as previously. Our relative 'timeout_ms'
* may be different now, but the application has the timer running
* and we do not to tell it to start this again. */
#if DEBUG_UPDATE_TIMER
fprintf(stderr, "Curl_update_timer(), same expire timestamp, no change\n");
#endif
}
if(set_value) {
#if DEBUG_UPDATE_TIMER
fprintf(stderr, "Curl_update_timer(), set timeout %ldms\n", timeout_ms);
#endif
multi->last_expire_ts = expire_ts;
multi->last_timeout_ms = timeout_ms;
set_in_callback(multi, TRUE);
rc = multi->timer_cb(multi, timeout_ms, multi->timer_userp);
set_in_callback(multi, FALSE);
if(rc == -1) {
multi->dead = TRUE;
return CURLM_ABORTED_BY_CALLBACK;
}
return CURLM_OK;
}
/* When multi_timeout() is done, multi->timetree points to the node with the
* timeout we got the (relative) time-out time for. We can thus easily check
* if this is the same (fixed) time as we got in a previous call and then
* avoid calling the callback again. */
if(Curl_timediff_us(multi->timetree->key, multi->timer_lastcall) == 0)
return CURLM_OK;
multi->timer_lastcall = multi->timetree->key;
set_in_callback(multi, TRUE);
rc = multi->timer_cb(multi, timeout_ms, multi->timer_userp);
set_in_callback(multi, FALSE);
if(rc == -1) {
multi->dead = TRUE;
return CURLM_ABORTED_BY_CALLBACK;
}
return CURLM_OK;
}
@ -3566,10 +3601,12 @@ multi_addtimeout(struct Curl_easy *data,
*
* Expire replaces a former timeout using the same id if already set.
*/
void Curl_expire(struct Curl_easy *data, timediff_t milli, expire_id id)
static void Curl_expire_ex(struct Curl_easy *data,
const struct curltime *nowp,
timediff_t milli, expire_id id)
{
struct Curl_multi *multi = data->multi;
struct curltime *nowp = &data->state.expiretime;
struct curltime *curr_expire = &data->state.expiretime;
struct curltime set;
/* this is only interesting while there is still an associated multi struct
@ -3579,7 +3616,7 @@ void Curl_expire(struct Curl_easy *data, timediff_t milli, expire_id id)
DEBUGASSERT(id < EXPIRE_LAST);
set = Curl_now();
set = *nowp;
set.tv_sec += (time_t)(milli/1000); /* might be a 64 to 32 bits conversion */
set.tv_usec += (int)(milli%1000)*1000;
@ -3595,11 +3632,11 @@ void Curl_expire(struct Curl_easy *data, timediff_t milli, expire_id id)
in case we need to recompute the minimum timer later. */
multi_addtimeout(data, &set, id);
if(nowp->tv_sec || nowp->tv_usec) {
if(curr_expire->tv_sec || curr_expire->tv_usec) {
/* This means that the struct is added as a node in the splay tree.
Compare if the new time is earlier, and only remove-old/add-new if it
is. */
timediff_t diff = Curl_timediff(set, *nowp);
timediff_t diff = Curl_timediff(set, *curr_expire);
int rc;
if(diff > 0) {
@ -3618,12 +3655,18 @@ void Curl_expire(struct Curl_easy *data, timediff_t milli, expire_id id)
/* Indicate that we are in the splay tree and insert the new timer expiry
value since it is our local minimum. */
*nowp = set;
*curr_expire = set;
Curl_splayset(&data->state.timenode, data);
multi->timetree = Curl_splayinsert(*nowp, multi->timetree,
multi->timetree = Curl_splayinsert(*curr_expire, multi->timetree,
&data->state.timenode);
}
void Curl_expire(struct Curl_easy *data, timediff_t milli, expire_id id)
{
struct curltime now = Curl_now();
Curl_expire_ex(data, &now, milli, id);
}
/*
* Curl_expire_done()
*
@ -3641,7 +3684,7 @@ void Curl_expire_done(struct Curl_easy *data, expire_id id)
*
* Clear ALL timeout values for this handle.
*/
void Curl_expire_clear(struct Curl_easy *data)
bool Curl_expire_clear(struct Curl_easy *data)
{
struct Curl_multi *multi = data->multi;
struct curltime *nowp = &data->state.expiretime;
@ -3649,7 +3692,7 @@ void Curl_expire_clear(struct Curl_easy *data)
/* this is only interesting while there is still an associated multi struct
remaining! */
if(!multi)
return;
return FALSE;
if(nowp->tv_sec || nowp->tv_usec) {
/* Since this is an cleared time, we must remove the previous entry from
@ -3670,12 +3713,11 @@ void Curl_expire_clear(struct Curl_easy *data)
#endif
nowp->tv_sec = 0;
nowp->tv_usec = 0;
return TRUE;
}
return FALSE;
}
CURLMcode curl_multi_assign(struct Curl_multi *multi, curl_socket_t s,
void *hashp)
{

View File

@ -151,8 +151,9 @@ struct Curl_multi {
/* timer callback and user data pointer for the *socket() API */
curl_multi_timer_callback timer_cb;
void *timer_userp;
struct curltime timer_lastcall; /* the fixed time for the timeout for the
previous callback */
long last_timeout_ms; /* the last timeout value set via timer_cb */
struct curltime last_expire_ts; /* timestamp of last expiry */
#ifdef USE_WINSOCK
WSAEVENT wsa_event; /* Winsock event used for waits */
#else

View File

@ -30,7 +30,7 @@
CURLcode Curl_updatesocket(struct Curl_easy *data);
void Curl_expire(struct Curl_easy *data, timediff_t milli, expire_id);
void Curl_expire_clear(struct Curl_easy *data);
bool Curl_expire_clear(struct Curl_easy *data);
void Curl_expire_done(struct Curl_easy *data, expire_id id);
CURLMcode Curl_update_timer(struct Curl_multi *multi) WARN_UNUSED_RESULT;
void Curl_attach_connection(struct Curl_easy *data,

View File

@ -35,6 +35,7 @@ struct sigpipe_ignore {
};
#define SIGPIPE_VARIABLE(x) struct sigpipe_ignore x
#define SIGPIPE_MEMBER(x) struct sigpipe_ignore x
static void sigpipe_init(struct sigpipe_ignore *ig)
{
@ -92,6 +93,7 @@ static void sigpipe_apply(struct Curl_easy *data,
#define sigpipe_init(x) Curl_nop_stmt
#define sigpipe_restore(x) Curl_nop_stmt
#define SIGPIPE_VARIABLE(x)
#define SIGPIPE_MEMBER(x) bool x
#endif
#endif /* HEADER_CURL_SIGPIPE_H */

View File

@ -196,18 +196,6 @@ CURLcode Curl_xfer_send_shutdown(struct Curl_easy *data, bool *done)
return Curl_conn_shutdown(data, sockindex, done);
}
static bool xfer_send_shutdown_started(struct Curl_easy *data)
{
int sockindex;
if(!data || !data->conn)
return CURLE_FAILED_INIT;
if(data->conn->writesockfd == CURL_SOCKET_BAD)
return CURLE_FAILED_INIT;
sockindex = (data->conn->writesockfd == data->conn->sock[SECONDARYSOCKET]);
return Curl_shutdown_started(data, sockindex);
}
/**
* Receive raw response data for the transfer.
* @param data the transfer
@ -261,7 +249,7 @@ static ssize_t Curl_xfer_recv_resp(struct Curl_easy *data,
return -1;
}
}
DEBUGF(infof(data, "readwrite_data: we are done"));
DEBUGF(infof(data, "sendrecv_dl: we are done"));
}
DEBUGASSERT(nread >= 0);
return nread;
@ -272,9 +260,9 @@ static ssize_t Curl_xfer_recv_resp(struct Curl_easy *data,
* the stream was rewound (in which case we have data in a
* buffer)
*/
static CURLcode readwrite_data(struct Curl_easy *data,
struct SingleRequest *k,
int *didwhat)
static CURLcode sendrecv_dl(struct Curl_easy *data,
struct SingleRequest *k,
int *didwhat)
{
struct connectdata *conn = data->conn;
CURLcode result = CURLE_OK;
@ -381,14 +369,14 @@ static CURLcode readwrite_data(struct Curl_easy *data,
out:
Curl_multi_xfer_buf_release(data, xfer_buf);
if(result)
DEBUGF(infof(data, "readwrite_data() -> %d", result));
DEBUGF(infof(data, "sendrecv_dl() -> %d", result));
return result;
}
/*
* Send data to upload to the server, when the socket is writable.
*/
static CURLcode readwrite_upload(struct Curl_easy *data, int *didwhat)
static CURLcode sendrecv_ul(struct Curl_easy *data, int *didwhat)
{
/* We should not get here when the sending is already done. It
* probably means that someone set `data-req.keepon |= KEEP_SEND`
@ -421,66 +409,44 @@ static int select_bits_paused(struct Curl_easy *data, int select_bits)
}
/*
* Curl_readwrite() is the low-level function to be called when data is to
* Curl_sendrecv() is the low-level function to be called when data is to
* be read and written to/from the connection.
*/
CURLcode Curl_readwrite(struct Curl_easy *data)
CURLcode Curl_sendrecv(struct Curl_easy *data, struct curltime *nowp)
{
struct connectdata *conn = data->conn;
struct SingleRequest *k = &data->req;
CURLcode result;
struct curltime now;
CURLcode result = CURLE_OK;
int didwhat = 0;
int select_bits;
int select_bits = 0;
DEBUGASSERT(nowp);
if(data->state.select_bits) {
if(select_bits_paused(data, data->state.select_bits)) {
/* leave the bits unchanged, so they'll tell us what to do when
* this transfer gets unpaused. */
DEBUGF(infof(data, "readwrite, select_bits, early return on PAUSED"));
/* DEBUGF(infof(data, "sendrecv, select_bits, early return on PAUSED"));
*/
result = CURLE_OK;
goto out;
}
select_bits = data->state.select_bits;
data->state.select_bits = 0;
/* DEBUGF(infof(data, "sendrecv, select_bits %x, RUN", select_bits)); */
select_bits = (CURL_CSELECT_OUT|CURL_CSELECT_IN);
}
else if(((k->keepon & KEEP_RECVBITS) == KEEP_RECV) &&
xfer_recv_shutdown_started(data)) {
DEBUGF(infof(data, "readwrite, recv for finishing shutdown"));
select_bits = CURL_CSELECT_IN;
else if(data->last_poll.num) {
/* The transfer wanted something polled. Let's run all available
* send/receives. Worst case we EAGAIN on some. */
/* DEBUGF(infof(data, "sendrecv, had poll sockets, RUN")); */
select_bits = (CURL_CSELECT_OUT|CURL_CSELECT_IN);
}
else if(((k->keepon & KEEP_SENDBITS) == KEEP_SEND) &&
xfer_send_shutdown_started(data)) {
DEBUGF(infof(data, "readwrite, send for finishing shutdown"));
else if(data->req.keepon & KEEP_SEND_TIMED) {
/* DEBUGF(infof(data, "sendrecv, KEEP_SEND_TIMED, RUN ul")); */
select_bits = CURL_CSELECT_OUT;
}
else {
curl_socket_t fd_read;
curl_socket_t fd_write;
/* only use the proper socket if the *_HOLD bit is not set simultaneously
as then we are in rate limiting state in that transfer direction */
if((k->keepon & KEEP_RECVBITS) == KEEP_RECV)
fd_read = conn->sockfd;
else
fd_read = CURL_SOCKET_BAD;
if(Curl_req_want_send(data))
fd_write = conn->writesockfd;
else
fd_write = CURL_SOCKET_BAD;
select_bits = Curl_socket_check(fd_read, CURL_SOCKET_BAD, fd_write, 0);
}
if(select_bits == CURL_CSELECT_ERR) {
failf(data, "select/poll returned error");
result = CURLE_SEND_ERROR;
goto out;
}
#ifdef USE_HYPER
if(conn->datastream) {
result = conn->datastream(data, conn, &didwhat, select_bits);
if(data->conn->datastream) {
result = data->conn->datastream(data, data->conn, &didwhat, select_bits);
if(result || data->req.done)
goto out;
}
@ -490,17 +456,15 @@ CURLcode Curl_readwrite(struct Curl_easy *data)
the stream was rewound (in which case we have data in a
buffer) */
if((k->keepon & KEEP_RECV) && (select_bits & CURL_CSELECT_IN)) {
result = readwrite_data(data, k, &didwhat);
result = sendrecv_dl(data, k, &didwhat);
if(result || data->req.done)
goto out;
}
/* If we still have writing to do, we check if we have a writable socket. */
if((Curl_req_want_send(data) && (select_bits & CURL_CSELECT_OUT)) ||
(k->keepon & KEEP_SEND_TIMED)) {
/* write */
result = readwrite_upload(data, &didwhat);
if((Curl_req_want_send(data) || (data->req.keepon & KEEP_SEND_TIMED)) &&
(select_bits & CURL_CSELECT_OUT)) {
result = sendrecv_ul(data, &didwhat);
if(result)
goto out;
}
@ -508,8 +472,8 @@ CURLcode Curl_readwrite(struct Curl_easy *data)
}
#endif
now = Curl_now();
if(!didwhat) {
if(select_bits && !didwhat) {
/* Transfer wanted to send/recv, but nothing was possible. */
result = Curl_conn_ev_data_idle(data);
if(result)
goto out;
@ -518,23 +482,23 @@ CURLcode Curl_readwrite(struct Curl_easy *data)
if(Curl_pgrsUpdate(data))
result = CURLE_ABORTED_BY_CALLBACK;
else
result = Curl_speedcheck(data, now);
result = Curl_speedcheck(data, *nowp);
if(result)
goto out;
if(k->keepon) {
if(0 > Curl_timeleft(data, &now, FALSE)) {
if(0 > Curl_timeleft(data, nowp, FALSE)) {
if(k->size != -1) {
failf(data, "Operation timed out after %" CURL_FORMAT_TIMEDIFF_T
" milliseconds with %" CURL_FORMAT_CURL_OFF_T " out of %"
CURL_FORMAT_CURL_OFF_T " bytes received",
Curl_timediff(now, data->progress.t_startsingle),
Curl_timediff(*nowp, data->progress.t_startsingle),
k->bytecount, k->size);
}
else {
failf(data, "Operation timed out after %" CURL_FORMAT_TIMEDIFF_T
" milliseconds with %" CURL_FORMAT_CURL_OFF_T " bytes received",
Curl_timediff(now, data->progress.t_startsingle),
Curl_timediff(*nowp, data->progress.t_startsingle),
k->bytecount);
}
result = CURLE_OPERATION_TIMEDOUT;
@ -573,7 +537,7 @@ CURLcode Curl_readwrite(struct Curl_easy *data)
out:
if(result)
DEBUGF(infof(data, "Curl_readwrite() -> %d", result));
DEBUGF(infof(data, "Curl_sendrecv() -> %d", result));
return result;
}

View File

@ -44,7 +44,7 @@ typedef enum {
CURLcode Curl_follow(struct Curl_easy *data, char *newurl,
followtype type);
CURLcode Curl_readwrite(struct Curl_easy *data);
CURLcode Curl_sendrecv(struct Curl_easy *data, struct curltime *nowp);
int Curl_single_getsock(struct Curl_easy *data,
struct connectdata *conn, curl_socket_t *socks);
CURLcode Curl_retry_request(struct Curl_easy *data, char **url);

View File

@ -234,8 +234,6 @@ CURLcode Curl_close(struct Curl_easy **datap)
data = *datap;
*datap = NULL;
Curl_expire_clear(data); /* shut off timers */
/* Detach connection if any is left. This should not be normal, but can be
the case for example with CONNECT_ONLY + recv/send (test 556) */
Curl_detach_connection(data);
@ -253,6 +251,8 @@ CURLcode Curl_close(struct Curl_easy **datap)
}
}
Curl_expire_clear(data); /* shut off any timers left */
data->magic = 0; /* force a clear AFTER the possibly enforced removal from
the multi handle, since that function uses the magic
field! */

View File

@ -14,7 +14,7 @@
fun:zstd_unencode_write
fun:Curl_unencode_write
fun:readwrite_data
fun:Curl_readwrite
fun:Curl_sendrecv
fun:multi_runsingle
fun:curl_multi_perform
fun:easy_transfer
@ -31,7 +31,7 @@
Memcheck:Cond
fun:ZSTD_decompressStream
fun:zstd_unencode_write
fun:Curl_readwrite
fun:Curl_sendrecv
fun:multi_runsingle
fun:curl_multi_perform
fun:curl_easy_perform