diff --git a/servers/lloadd/client.c b/servers/lloadd/client.c index faee2637f5..c1d94fcbfa 100644 --- a/servers/lloadd/client.c +++ b/servers/lloadd/client.c @@ -30,12 +30,65 @@ static void client_read_cb( evutil_socket_t s, short what, void *arg ) { Connection *c = arg; + BerElement *ber; + Operation *op; + ber_tag_t tag; + ber_len_t len; ldap_pvt_thread_mutex_lock( &c->c_mutex ); Debug( LDAP_DEBUG_CONNS, "client_read_cb: " "connection %lu ready to read\n", c->c_connid ); + + ber = c->c_currentber; + if ( ber == NULL && (ber = ber_alloc()) == NULL ) { + Debug( LDAP_DEBUG_ANY, "ber_alloc failed\n" ); + goto fail; + } + + tag = ber_get_next( c->c_sb, &len, ber ); + if ( tag != LDAP_TAG_MESSAGE ) { + int err = sock_errno(); + + if ( err != EWOULDBLOCK && err != EAGAIN ) { + char ebuf[128]; + Debug( LDAP_DEBUG_ANY, "ber_get_next on fd %d failed errno=%d (%s)\n", c->c_fd, + err, sock_errstr( err, ebuf, sizeof(ebuf) ) ); + + c->c_currentber = NULL; + goto fail; + } + c->c_currentber = ber; + ldap_pvt_thread_mutex_unlock( &c->c_mutex ); + return; + } + + c->c_currentber = NULL; + + op = operation_init( c, ber ); + if ( !op ) { + Debug( LDAP_DEBUG_ANY, "operation_init failed\n" ); + goto fail; + } + + if ( ldap_pvt_thread_pool_submit( + &connection_pool, operation_process, op ) ) { + /* what could have happened? */ + ldap_pvt_thread_mutex_unlock( &c->c_mutex ); + operation_destroy( op ); + ldap_pvt_thread_mutex_lock( &c->c_mutex ); + goto fail; + } + + ldap_pvt_thread_mutex_unlock( &c->c_mutex ); + return; +fail: client_destroy( c ); + + if ( ber ) { + ber_free( ber, 1 ); + } + return; } void diff --git a/servers/lloadd/upstream.c b/servers/lloadd/upstream.c index 453b9ab0db..4d87ee5fc5 100644 --- a/servers/lloadd/upstream.c +++ b/servers/lloadd/upstream.c @@ -30,8 +30,111 @@ void upstream_read_cb( evutil_socket_t s, short what, void *arg ) { Connection *c = arg; + BerElement *ber; + ber_tag_t tag; + Operation *op, needle = { .o_upstream = c }; + ber_len_t len; + int finished = 0; ldap_pvt_thread_mutex_lock( &c->c_mutex ); + Debug( LDAP_DEBUG_CONNS, "upstream_read_cb: " + "connection %lu ready to read\n", + c->c_connid ); + + ber = c->c_currentber; + if ( ber == NULL && (ber = ber_alloc()) == NULL ) { + Debug( LDAP_DEBUG_ANY, "ber_alloc failed\n" ); + ldap_pvt_thread_mutex_unlock( &c->c_mutex ); + return; + } + + tag = ber_get_next( c->c_sb, &len, ber ); + if ( tag != LDAP_TAG_MESSAGE ) { + int err = sock_errno(); + + if ( err != EWOULDBLOCK && err != EAGAIN ) { + char ebuf[128]; + Debug( LDAP_DEBUG_ANY, "ber_get_next on fd %d failed errno=%d (%s)\n", c->c_fd, + err, sock_errstr( err, ebuf, sizeof(ebuf) ) ); + + c->c_currentber = NULL; + goto fail; + } + c->c_currentber = ber; + ldap_pvt_thread_mutex_unlock( &c->c_mutex ); + return; + } + + c->c_currentber = NULL; + + tag = ber_get_int( ber, &needle.o_upstream_msgid ); + if ( tag != LDAP_TAG_MSGID || needle.o_upstream_msgid == 0 ) { + goto fail; + } + + op = tavl_find( c->c_ops, &needle, operation_upstream_cmp ); + if ( !op ) { + ber_free( ber, 1 ); + } else { + Connection *client = op->o_client; + BerElement *output; + BerValue response, controls; + ber_tag_t type; + + type = ber_skip_element( ber, &response ); + switch ( type ) { + case LDAP_RES_SEARCH_ENTRY: + case LDAP_RES_SEARCH_REFERENCE: + case LDAP_RES_INTERMEDIATE: + break; + default: + finished = 1; + tavl_delete( &c->c_ops, op, operation_upstream_cmp ); + break; + } + ldap_pvt_thread_mutex_unlock( &c->c_mutex ); + + tag = ber_peek_tag( ber, &len ); + if ( tag == LDAP_TAG_CONTROLS ) { + tag = ber_skip_element( ber, &controls ); + } + + output = ber_alloc(); + if ( !output ) { + goto fail; + } + + ber_start_seq( output, LDAP_TAG_MESSAGE ); + ber_put_int( output, op->o_client_msgid, LDAP_TAG_MSGID ); + ber_put_berval( output, &response, type ); + if ( tag == LDAP_TAG_CONTROLS ) { + ber_put_berval( output, &controls, LDAP_TAG_CONTROLS ); + } + ber_put_seq( output ); + + if ( finished ) { + ldap_pvt_thread_mutex_lock( &client->c_mutex ); + tavl_delete( &client->c_ops, op, operation_client_cmp ); + ldap_pvt_thread_mutex_unlock( &client->c_mutex ); + operation_destroy( op ); + } + + ldap_pvt_thread_mutex_lock( &client->c_io_mutex ); + client->c_pendingber = output; + ldap_pvt_thread_mutex_unlock( &client->c_io_mutex ); + + client_write_cb( -1, 0, client ); + return; + } + + ldap_pvt_thread_mutex_unlock( &c->c_mutex ); + + return; +fail: + Debug( LDAP_DEBUG_ANY, "upstream_read_cb: " + "error on processing a response on upstream connection %ld\n", + c->c_connid ); + ber_free( ber, 1 ); upstream_destroy( c ); }