From 7173e4726ea7e45dcc61cb90d23ddb7bf2522f96 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ond=C5=99ej=20Kuzn=C3=ADk?= Date: Fri, 21 Aug 2020 11:09:10 +0200 Subject: [PATCH] ITS#9598 Factor out upstream checking --- servers/lloadd/backend.c | 194 ++++++++++++++++++++++------------- servers/lloadd/bind.c | 7 +- servers/lloadd/client.c | 9 +- servers/lloadd/proto-lload.h | 4 +- 4 files changed, 134 insertions(+), 80 deletions(-) diff --git a/servers/lloadd/backend.c b/servers/lloadd/backend.c index 4df22ac14d..dd7a4cbc83 100644 --- a/servers/lloadd/backend.c +++ b/servers/lloadd/backend.c @@ -284,10 +284,116 @@ fail: epoch_leave( epoch ); } -LloadConnection * -backend_select( LloadOperation *op, int *res ) +int +try_upstream( + LloadBackend *b, + lload_c_head *head, + LloadOperation *op, + LloadConnection *c, + int *res, + char **message ) +{ + assert_locked( &b->b_mutex ); + + checked_lock( &c->c_io_mutex ); + CONNECTION_LOCK(c); + if ( c->c_state == LLOAD_C_READY && !c->c_pendingber && + ( b->b_max_conn_pending == 0 || + c->c_n_ops_executing < b->b_max_conn_pending ) ) { + Debug( LDAP_DEBUG_CONNS, "try_upstream: " + "selected connection connid=%lu for client " + "connid=%lu msgid=%d\n", + c->c_connid, op->o_client_connid, op->o_client_msgid ); + + /* c_state is DYING if we're about to be unlinked */ + assert( IS_ALIVE( c, c_live ) ); + + if ( head ) { + /* + * Round-robin step: + * Rotate the queue to put this connection at the end. + */ + LDAP_CIRCLEQ_MAKE_TAIL( head, c, c_next ); + } + + b->b_n_ops_executing++; + if ( op->o_tag == LDAP_REQ_BIND ) { + b->b_counters[LLOAD_STATS_OPS_BIND].lc_ops_received++; + } else { + b->b_counters[LLOAD_STATS_OPS_OTHER].lc_ops_received++; + } + c->c_n_ops_executing++; + c->c_counters.lc_ops_received++; + + *res = LDAP_SUCCESS; + CONNECTION_ASSERT_LOCKED(c); + assert_locked( &c->c_io_mutex ); + return 1; + } + CONNECTION_UNLOCK(c); + checked_unlock( &c->c_io_mutex ); + return 0; +} + +int +backend_select( + LloadBackend *b, + LloadOperation *op, + LloadConnection **cp, + int *res, + char **message ) +{ + lload_c_head *head; + LloadConnection *c; + + assert_locked( &b->b_mutex ); + if ( b->b_max_pending && b->b_n_ops_executing >= b->b_max_pending ) { + Debug( LDAP_DEBUG_CONNS, "backend_select: " + "backend %s too busy\n", + b->b_uri.bv_val ); + *res = LDAP_BUSY; + *message = "server busy"; + return 1; + } + + if ( op->o_tag == LDAP_REQ_BIND +#ifdef LDAP_API_FEATURE_VERIFY_CREDENTIALS + && !(lload_features & LLOAD_FEATURE_VC) +#endif /* LDAP_API_FEATURE_VERIFY_CREDENTIALS */ + ) { + head = &b->b_bindconns; + } else { + head = &b->b_conns; + } + + if ( LDAP_CIRCLEQ_EMPTY( head ) ) { + return 0; + } + + *res = LDAP_BUSY; + *message = "server busy"; + + LDAP_CIRCLEQ_FOREACH( c, head, c_next ) { + if ( try_upstream( b, head, op, c, res, message ) ) { + *cp = c; + CONNECTION_ASSERT_LOCKED(c); + assert_locked( &c->c_io_mutex ); + return 1; + } + } + + return 1; +} + +int +upstream_select( + LloadOperation *op, + LloadConnection **cp, + int *res, + char **message ) { LloadBackend *b, *first, *next; + int rc = 0; checked_lock( &backend_mutex ); first = b = current_backend; @@ -302,84 +408,28 @@ backend_select( LloadOperation *op, int *res ) /* TODO: Two runs, one with trylock, then one actually locked if we don't * find anything? */ do { - lload_c_head *head; - LloadConnection *c; - checked_lock( &b->b_mutex ); next = LDAP_CIRCLEQ_LOOP_NEXT( &backend, b, b_next ); - if ( b->b_max_pending && b->b_n_ops_executing >= b->b_max_pending ) { - Debug( LDAP_DEBUG_CONNS, "backend_select: " - "backend %s too busy\n", - b->b_uri.bv_val ); - checked_unlock( &b->b_mutex ); - b = next; - *res = LDAP_BUSY; - continue; - } - - if ( op->o_tag == LDAP_REQ_BIND -#ifdef LDAP_API_FEATURE_VERIFY_CREDENTIALS - && !(lload_features & LLOAD_FEATURE_VC) -#endif /* LDAP_API_FEATURE_VERIFY_CREDENTIALS */ - ) { - head = &b->b_bindconns; - } else { - head = &b->b_conns; - } - if ( !LDAP_CIRCLEQ_EMPTY( head ) ) { - *res = LDAP_BUSY; - } - - LDAP_CIRCLEQ_FOREACH ( c, head, c_next ) { - checked_lock( &c->c_io_mutex ); - CONNECTION_LOCK(c); - if ( c->c_state == LLOAD_C_READY && !c->c_pendingber && - ( b->b_max_conn_pending == 0 || - c->c_n_ops_executing < b->b_max_conn_pending ) ) { - Debug( LDAP_DEBUG_CONNS, "backend_select: " - "selected connection connid=%lu for client " - "connid=%lu msgid=%d\n", - c->c_connid, op->o_client_connid, op->o_client_msgid ); - - /* c_state is DYING if we're about to be unlinked */ - assert( IS_ALIVE( c, c_live ) ); - - /* - * Round-robin step: - * Rotate the queue to put this connection at the end, same for - * the backend. - */ - LDAP_CIRCLEQ_MAKE_TAIL( head, c, c_next ); - - checked_lock( &backend_mutex ); - current_backend = next; - checked_unlock( &backend_mutex ); - - b->b_n_ops_executing++; - if ( op->o_tag == LDAP_REQ_BIND ) { - b->b_counters[LLOAD_STATS_OPS_BIND].lc_ops_received++; - } else { - b->b_counters[LLOAD_STATS_OPS_OTHER].lc_ops_received++; - } - c->c_n_ops_executing++; - c->c_counters.lc_ops_received++; - - checked_unlock( &b->b_mutex ); - *res = LDAP_SUCCESS; - CONNECTION_ASSERT_LOCKED(c); - assert_locked( &c->c_io_mutex ); - return c; - } - CONNECTION_UNLOCK(c); - checked_unlock( &c->c_io_mutex ); - } + rc = backend_select( b, op, cp, res, message ); checked_unlock( &b->b_mutex ); + if ( rc && *cp ) { + /* + * Round-robin step: + * Rotate the queue to put this backend at the end. The race here + * is acceptable. + */ + checked_lock( &backend_mutex ); + current_backend = next; + checked_unlock( &backend_mutex ); + return rc; + } + b = next; } while ( b != first ); - return NULL; + return rc; } /* diff --git a/servers/lloadd/bind.c b/servers/lloadd/bind.c index e20f2e497d..30db61db49 100644 --- a/servers/lloadd/bind.c +++ b/servers/lloadd/bind.c @@ -195,7 +195,8 @@ request_bind( LloadConnection *client, LloadOperation *op ) ber_int_t version; ber_tag_t tag; unsigned long pin; - int res, rc = LDAP_SUCCESS; + int res = LDAP_UNAVAILABLE, rc = LDAP_SUCCESS; + char *message = "no connections available"; CONNECTION_LOCK(client); pin = client->c_pin_id; @@ -357,7 +358,7 @@ request_bind( LloadConnection *client, LloadOperation *op ) if ( upstream ) { /* No need to do anything */ } else if ( !pin ) { - upstream = backend_select( op, &res ); + upstream_select( op, &upstream, &res, &message ); } else { Debug( LDAP_DEBUG_STATS, "request_bind: " "connid=%lu, msgid=%d pinned upstream lost\n", @@ -372,7 +373,7 @@ request_bind( LloadConnection *client, LloadOperation *op ) Debug( LDAP_DEBUG_STATS, "request_bind: " "connid=%lu, msgid=%d no available connection found\n", op->o_client_connid, op->o_client_msgid ); - operation_send_reject( op, res, "no connections available", 1 ); + operation_send_reject( op, res, message, 1 ); assert( client->c_pin_id == 0 ); goto done; } diff --git a/servers/lloadd/client.c b/servers/lloadd/client.c index 7ea107d5ea..9567d2033d 100644 --- a/servers/lloadd/client.c +++ b/servers/lloadd/client.c @@ -89,17 +89,18 @@ int request_process( LloadConnection *client, LloadOperation *op ) { BerElement *output; - LloadConnection *upstream; + LloadConnection *upstream = NULL; ber_int_t msgid; - int res, rc = LDAP_SUCCESS; + int res = LDAP_UNAVAILABLE, rc = LDAP_SUCCESS; + char *message = "no connections available"; - upstream = backend_select( op, &res ); + upstream_select( op, &upstream, &res, &message ); if ( !upstream ) { Debug( LDAP_DEBUG_STATS, "request_process: " "connid=%lu, msgid=%d no available connection found\n", op->o_client_connid, op->o_client_msgid ); - operation_send_reject( op, res, "no connections available", 1 ); + operation_send_reject( op, res, message, 1 ); goto fail; } CONNECTION_ASSERT_LOCKED(upstream); diff --git a/servers/lloadd/proto-lload.h b/servers/lloadd/proto-lload.h index ffa15de365..b750f553a3 100644 --- a/servers/lloadd/proto-lload.h +++ b/servers/lloadd/proto-lload.h @@ -40,7 +40,9 @@ LDAP_BEGIN_DECL LDAP_SLAPD_F (void) backend_connect( evutil_socket_t s, short what, void *arg ); LDAP_SLAPD_F (void *) backend_connect_task( void *ctx, void *arg ); LDAP_SLAPD_F (void) backend_retry( LloadBackend *b ); -LDAP_SLAPD_F (LloadConnection *) backend_select( LloadOperation *op, int *res ); +LDAP_SLAPD_F (int) upstream_select( LloadOperation *op, LloadConnection **c, int *res, char **message ); +LDAP_SLAPD_F (int) backend_select( LloadBackend *b, LloadOperation *op, LloadConnection **c, int *res, char **message ); +LDAP_SLAPD_F (int) try_upstream( LloadBackend *b, lload_c_head *head, LloadOperation *op, LloadConnection *c, int *res, char **message ); LDAP_SLAPD_F (void) backend_reset( LloadBackend *b, int gentle ); LDAP_SLAPD_F (void) lload_backend_destroy( LloadBackend *b ); LDAP_SLAPD_F (void) lload_backends_destroy( void );