mirror of
https://git.openldap.org/openldap/openldap.git
synced 2025-01-06 10:46:21 +08:00
Rework client_read_cb along the lines of upstream
This commit is contained in:
parent
028f28690f
commit
837a6068e0
@ -246,22 +246,14 @@ fail:
|
||||
return 1;
|
||||
}
|
||||
|
||||
void *
|
||||
client_reset( void *ctx, void *arg )
|
||||
void
|
||||
client_reset( Connection *c )
|
||||
{
|
||||
Operation *op = arg;
|
||||
Connection *c = op->o_client;
|
||||
TAvlnode *root;
|
||||
int freed, destroy = 1;
|
||||
int freed;
|
||||
|
||||
CONNECTION_LOCK(c);
|
||||
root = c->c_ops;
|
||||
c->c_ops = NULL;
|
||||
c->c_state = SLAP_C_CLOSING;
|
||||
if ( op->o_tag == LDAP_REQ_BIND ) {
|
||||
c->c_state = SLAP_C_BINDING;
|
||||
destroy = 0;
|
||||
}
|
||||
if ( !BER_BVISNULL( &c->c_auth ) ) {
|
||||
ch_free( c->c_auth.bv_val );
|
||||
BER_BVZERO( &c->c_auth );
|
||||
@ -272,36 +264,28 @@ client_reset( void *ctx, void *arg )
|
||||
}
|
||||
CONNECTION_UNLOCK_INCREF(c);
|
||||
|
||||
tavl_delete( &root, op, operation_client_cmp );
|
||||
freed = tavl_free( root, (AVL_FREE)operation_abandon );
|
||||
|
||||
Debug( LDAP_DEBUG_TRACE, "client_reset: "
|
||||
"dropped %d operations\n",
|
||||
freed );
|
||||
|
||||
if ( destroy ) {
|
||||
operation_destroy( op );
|
||||
CLIENT_LOCK_DESTROY(c);
|
||||
} else {
|
||||
CONNECTION_LOCK_DECREF(c);
|
||||
CLIENT_UNLOCK_OR_DESTROY(c);
|
||||
}
|
||||
|
||||
return NULL;
|
||||
CONNECTION_LOCK_DECREF(c);
|
||||
}
|
||||
|
||||
void *
|
||||
client_bind( void *ctx, void *arg )
|
||||
int
|
||||
client_bind( Connection *client, Operation *op )
|
||||
{
|
||||
Operation *op = arg;
|
||||
Connection *upstream, *client = op->o_client;
|
||||
int rc = 0;
|
||||
Connection *upstream;
|
||||
int rc = LDAP_SUCCESS;
|
||||
|
||||
CONNECTION_LOCK(client);
|
||||
/* protect the Bind operation */
|
||||
tavl_delete( &client->c_ops, op, operation_client_cmp );
|
||||
client->c_state = SLAP_C_BINDING;
|
||||
|
||||
client_reset( client );
|
||||
CONNECTION_UNLOCK_INCREF(client);
|
||||
|
||||
client_reset( ctx, arg );
|
||||
|
||||
upstream = backend_select( op );
|
||||
if ( !upstream ) {
|
||||
Debug( LDAP_DEBUG_STATS, "client_bind: "
|
||||
@ -309,8 +293,7 @@ client_bind( void *ctx, void *arg )
|
||||
operation_send_reject(
|
||||
op, LDAP_UNAVAILABLE, "no connections available", 1 );
|
||||
CONNECTION_LOCK_DECREF(client);
|
||||
CLIENT_UNLOCK_OR_DESTROY(client);
|
||||
return NULL;
|
||||
return rc;
|
||||
}
|
||||
|
||||
op->o_upstream = upstream;
|
||||
@ -324,15 +307,15 @@ client_bind( void *ctx, void *arg )
|
||||
CONNECTION_LOCK_DECREF(upstream);
|
||||
UPSTREAM_UNLOCK_OR_DESTROY(upstream);
|
||||
|
||||
CONNECTION_LOCK_DECREF(client);
|
||||
if ( rc ) {
|
||||
CLIENT_LOCK_DESTROY(client);
|
||||
return NULL;
|
||||
CLIENT_DESTROY(client);
|
||||
return -1;
|
||||
}
|
||||
|
||||
CONNECTION_LOCK_DECREF(client);
|
||||
rc = tavl_insert( &client->c_ops, op, operation_client_cmp, avl_dup_error );
|
||||
assert( rc == LDAP_SUCCESS );
|
||||
CLIENT_UNLOCK_OR_DESTROY(client);
|
||||
|
||||
return NULL;
|
||||
return rc;
|
||||
}
|
||||
|
@ -24,15 +24,15 @@
|
||||
#include "lutil.h"
|
||||
#include "slap.h"
|
||||
|
||||
typedef int (*RequestHandler)( Connection *c, Operation *op );
|
||||
|
||||
static void
|
||||
client_read_cb( evutil_socket_t s, short what, void *arg )
|
||||
{
|
||||
Connection *c = arg;
|
||||
BerElement *ber;
|
||||
Operation *op = NULL;
|
||||
ber_tag_t tag;
|
||||
ber_len_t len;
|
||||
int rc = 0;
|
||||
|
||||
/* What if the shutdown is already in progress and we get to lock the
|
||||
* connection? */
|
||||
@ -47,8 +47,9 @@ client_read_cb( evutil_socket_t s, short what, void *arg )
|
||||
Debug( LDAP_DEBUG_ANY, "client_read_cb: "
|
||||
"ber_alloc failed\n" );
|
||||
CLIENT_DESTROY(c);
|
||||
goto fail;
|
||||
return;
|
||||
}
|
||||
c->c_currentber = ber;
|
||||
|
||||
tag = ber_get_next( c->c_sb, &len, ber );
|
||||
if ( tag != LDAP_TAG_MESSAGE ) {
|
||||
@ -61,72 +62,134 @@ client_read_cb( evutil_socket_t s, short what, void *arg )
|
||||
c->c_fd, err, sock_errstr( err, ebuf, sizeof(ebuf) ) );
|
||||
|
||||
c->c_currentber = NULL;
|
||||
ber_free( ber, 1 );
|
||||
CLIENT_DESTROY(c);
|
||||
goto fail;
|
||||
return;
|
||||
}
|
||||
c->c_currentber = ber;
|
||||
event_add( c->c_read_event, NULL );
|
||||
CONNECTION_UNLOCK(c);
|
||||
return;
|
||||
}
|
||||
|
||||
if ( !slap_conn_max_pdus_per_cycle ||
|
||||
ldap_pvt_thread_pool_submit(
|
||||
&connection_pool, handle_requests, c ) ) {
|
||||
/* If we're overloaded or configured as such, process one and resume in
|
||||
* the next cycle.
|
||||
*
|
||||
* handle_one_request re-locks the mutex in the
|
||||
* process, need to test it's still alive */
|
||||
if ( handle_one_request( c ) == LDAP_SUCCESS ) {
|
||||
CLIENT_UNLOCK_OR_DESTROY(c);
|
||||
}
|
||||
return;
|
||||
}
|
||||
event_del( c->c_read_event );
|
||||
|
||||
CONNECTION_UNLOCK(c);
|
||||
return;
|
||||
}
|
||||
|
||||
void *
|
||||
handle_requests( void *ctx, void *arg )
|
||||
{
|
||||
Connection *c = arg;
|
||||
int requests_handled = 0;
|
||||
|
||||
CONNECTION_LOCK(c);
|
||||
for ( ; requests_handled < slap_conn_max_pdus_per_cycle;
|
||||
requests_handled++ ) {
|
||||
BerElement *ber;
|
||||
ber_tag_t tag;
|
||||
ber_len_t len;
|
||||
|
||||
/* handle_one_response may unlock the connection in the process, we
|
||||
* need to expect that might be our responsibility to destroy it */
|
||||
if ( handle_one_request( c ) ) {
|
||||
/* Error, connection is unlocked and might already have been
|
||||
* destroyed */
|
||||
return NULL;
|
||||
}
|
||||
/* Otherwise, handle_one_request leaves the connection locked */
|
||||
|
||||
if ( (ber = ber_alloc()) == NULL ) {
|
||||
Debug( LDAP_DEBUG_ANY, "client_read_cb: "
|
||||
"ber_alloc failed\n" );
|
||||
CLIENT_DESTROY(c);
|
||||
return NULL;
|
||||
}
|
||||
c->c_currentber = ber;
|
||||
|
||||
tag = ber_get_next( c->c_sb, &len, ber );
|
||||
if ( tag != LDAP_TAG_MESSAGE ) {
|
||||
int err = sock_errno();
|
||||
|
||||
if ( err != EWOULDBLOCK && err != EAGAIN ) {
|
||||
char ebuf[128];
|
||||
Debug( LDAP_DEBUG_ANY, "handle_requests: "
|
||||
"ber_get_next on fd %d failed errno=%d (%s)\n",
|
||||
c->c_fd, err,
|
||||
sock_errstr( err, ebuf, sizeof(ebuf) ) );
|
||||
|
||||
c->c_currentber = NULL;
|
||||
ber_free( ber, 1 );
|
||||
CLIENT_DESTROY(c);
|
||||
return NULL;
|
||||
}
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
event_add( c->c_read_event, NULL );
|
||||
CLIENT_UNLOCK_OR_DESTROY(c);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
int
|
||||
handle_one_request( Connection *c )
|
||||
{
|
||||
BerElement *ber;
|
||||
Operation *op = NULL;
|
||||
RequestHandler handler = NULL;
|
||||
|
||||
ber = c->c_currentber;
|
||||
c->c_currentber = NULL;
|
||||
|
||||
op = operation_init( c, ber );
|
||||
if ( !op ) {
|
||||
Debug( LDAP_DEBUG_ANY, "client_read_cb: "
|
||||
Debug( LDAP_DEBUG_ANY, "handle_one_request: "
|
||||
"operation_init failed\n" );
|
||||
CLIENT_DESTROY(c);
|
||||
goto fail;
|
||||
ber_free( ber, 1 );
|
||||
return -1;
|
||||
}
|
||||
|
||||
switch ( op->o_tag ) {
|
||||
case LDAP_REQ_UNBIND:
|
||||
/* We do not expect anything more from the client. Also, we are the
|
||||
* read event, so don't need to unlock */
|
||||
event_del( c->c_read_event );
|
||||
|
||||
rc = ldap_pvt_thread_pool_submit(
|
||||
&connection_pool, client_reset, op );
|
||||
if ( rc ) {
|
||||
CONNECTION_UNLOCK(c);
|
||||
client_reset( NULL, op );
|
||||
return;
|
||||
}
|
||||
break;
|
||||
c->c_state = SLAP_C_CLOSING;
|
||||
CLIENT_DESTROY(c);
|
||||
return -1;
|
||||
case LDAP_REQ_BIND:
|
||||
rc = ldap_pvt_thread_pool_submit(
|
||||
&connection_pool, client_bind, op );
|
||||
handler = client_bind;
|
||||
break;
|
||||
case LDAP_REQ_ABANDON:
|
||||
/* FIXME: We need to be able to abandon a Bind request, handling
|
||||
* ExOps (esp. Cancel) will be different */
|
||||
handler = request_abandon;
|
||||
break;
|
||||
default:
|
||||
if ( c->c_state == SLAP_C_BINDING ) {
|
||||
CONNECTION_UNLOCK(c);
|
||||
CONNECTION_UNLOCK_INCREF(c);
|
||||
operation_send_reject(
|
||||
op, LDAP_PROTOCOL_ERROR, "bind in progress", 0 );
|
||||
return;
|
||||
CONNECTION_LOCK_DECREF(c);
|
||||
return LDAP_SUCCESS;
|
||||
}
|
||||
rc = ldap_pvt_thread_pool_submit(
|
||||
&connection_pool, request_process, op );
|
||||
handler = request_process;
|
||||
break;
|
||||
}
|
||||
|
||||
/* FIXME: unlocks in this function need more thought when we refcount
|
||||
* operations */
|
||||
CONNECTION_UNLOCK(c);
|
||||
|
||||
if ( !rc ) {
|
||||
return;
|
||||
}
|
||||
|
||||
fail:
|
||||
if ( op ) {
|
||||
operation_send_reject(
|
||||
op, LDAP_OTHER, "server error or overloaded", 1 );
|
||||
operation_destroy( op );
|
||||
} else if ( ber ) {
|
||||
ber_free( ber, 1 );
|
||||
}
|
||||
|
||||
return;
|
||||
return handler( c, op );
|
||||
}
|
||||
|
||||
void
|
||||
|
@ -204,62 +204,93 @@ fail:
|
||||
void
|
||||
operation_abandon( Operation *op )
|
||||
{
|
||||
Connection *c = op->o_upstream;
|
||||
BerElement *ber;
|
||||
Backend *b;
|
||||
int rc;
|
||||
|
||||
if ( op->o_upstream ) {
|
||||
Connection *c = op->o_upstream;
|
||||
BerElement *ber;
|
||||
Backend *b;
|
||||
if ( !c ) {
|
||||
c = op->o_client;
|
||||
|
||||
CONNECTION_LOCK(c);
|
||||
rc = ( tavl_delete( &c->c_ops, op, operation_upstream_cmp ) == NULL );
|
||||
if ( !rc ) {
|
||||
c->c_n_ops_executing--;
|
||||
}
|
||||
b = (Backend *)c->c_private;
|
||||
CONNECTION_UNLOCK_INCREF(c);
|
||||
|
||||
if ( rc ) {
|
||||
/* The operation has already been abandoned or finished */
|
||||
goto done;
|
||||
}
|
||||
|
||||
ldap_pvt_thread_mutex_lock( &b->b_mutex );
|
||||
b->b_n_ops_executing--;
|
||||
ldap_pvt_thread_mutex_unlock( &b->b_mutex );
|
||||
|
||||
ldap_pvt_thread_mutex_lock( &c->c_io_mutex );
|
||||
|
||||
ber = c->c_pendingber;
|
||||
if ( ber == NULL && (ber = ber_alloc()) == NULL ) {
|
||||
Debug( LDAP_DEBUG_ANY, "operation_abandon: "
|
||||
"ber_alloc failed\n" );
|
||||
CONNECTION_LOCK_DECREF(c);
|
||||
ldap_pvt_thread_mutex_unlock( &c->c_io_mutex );
|
||||
UPSTREAM_UNLOCK_OR_DESTROY(c);
|
||||
goto done;
|
||||
}
|
||||
c->c_pendingber = ber;
|
||||
|
||||
rc = ber_printf( ber, "t{titi}", LDAP_TAG_MESSAGE,
|
||||
LDAP_TAG_MSGID, c->c_next_msgid++,
|
||||
LDAP_REQ_ABANDON, op->o_upstream_msgid );
|
||||
|
||||
if ( rc == -1 ) {
|
||||
ber_free( ber, 1 );
|
||||
}
|
||||
|
||||
CONNECTION_LOCK_DECREF(c);
|
||||
ldap_pvt_thread_mutex_unlock( &c->c_io_mutex );
|
||||
UPSTREAM_UNLOCK_OR_DESTROY(c);
|
||||
|
||||
if ( rc != -1 ) {
|
||||
upstream_write_cb( -1, 0, c );
|
||||
}
|
||||
CLIENT_UNLOCK_OR_DESTROY(c);
|
||||
operation_destroy( op );
|
||||
return;
|
||||
}
|
||||
|
||||
CONNECTION_LOCK(c);
|
||||
if ( tavl_delete( &c->c_ops, op, operation_upstream_cmp ) == NULL ) {
|
||||
/* The operation has already been abandoned or finished */
|
||||
goto done;
|
||||
}
|
||||
c->c_n_ops_executing--;
|
||||
b = (Backend *)c->c_private;
|
||||
CONNECTION_UNLOCK_INCREF(c);
|
||||
|
||||
ldap_pvt_thread_mutex_lock( &b->b_mutex );
|
||||
b->b_n_ops_executing--;
|
||||
ldap_pvt_thread_mutex_unlock( &b->b_mutex );
|
||||
|
||||
ldap_pvt_thread_mutex_lock( &c->c_io_mutex );
|
||||
|
||||
ber = c->c_pendingber;
|
||||
if ( ber == NULL && (ber = ber_alloc()) == NULL ) {
|
||||
Debug( LDAP_DEBUG_ANY, "operation_abandon: "
|
||||
"ber_alloc failed\n" );
|
||||
ldap_pvt_thread_mutex_unlock( &c->c_io_mutex );
|
||||
CONNECTION_LOCK_DECREF(c);
|
||||
goto done;
|
||||
}
|
||||
c->c_pendingber = ber;
|
||||
|
||||
rc = ber_printf( ber, "t{titi}", LDAP_TAG_MESSAGE,
|
||||
LDAP_TAG_MSGID, c->c_next_msgid++,
|
||||
LDAP_REQ_ABANDON, op->o_upstream_msgid );
|
||||
|
||||
if ( rc == -1 ) {
|
||||
ber_free( ber, 1 );
|
||||
c->c_pendingber = NULL;
|
||||
}
|
||||
|
||||
ldap_pvt_thread_mutex_unlock( &c->c_io_mutex );
|
||||
|
||||
if ( rc != -1 ) {
|
||||
upstream_write_cb( -1, 0, c );
|
||||
}
|
||||
|
||||
CONNECTION_LOCK_DECREF(c);
|
||||
done:
|
||||
UPSTREAM_UNLOCK_OR_DESTROY(c);
|
||||
operation_destroy( op );
|
||||
}
|
||||
|
||||
int
|
||||
request_abandon( Connection *c, Operation *op )
|
||||
{
|
||||
Operation *request, needle = { .o_client = c };
|
||||
ber_tag_t tag;
|
||||
int rc = -1;
|
||||
|
||||
tag = ber_get_int( op->o_ber, &needle.o_client_msgid );
|
||||
if ( tag != LDAP_REQ_ABANDON ) {
|
||||
/* How would that happen if we already got the tag for the op? */
|
||||
assert(0);
|
||||
goto done;
|
||||
}
|
||||
|
||||
request = tavl_find( c->c_ops, &needle, operation_client_cmp );
|
||||
if ( !request ) {
|
||||
goto done;
|
||||
}
|
||||
|
||||
CONNECTION_UNLOCK_INCREF(c);
|
||||
operation_abandon( request );
|
||||
CONNECTION_LOCK_DECREF(c);
|
||||
|
||||
rc = LDAP_SUCCESS;
|
||||
done:
|
||||
operation_destroy( op );
|
||||
return rc;
|
||||
}
|
||||
|
||||
void
|
||||
@ -317,14 +348,15 @@ operation_lost_upstream( Operation *op )
|
||||
operation_destroy( op );
|
||||
}
|
||||
|
||||
void *
|
||||
request_process( void *ctx, void *arg )
|
||||
int
|
||||
request_process( Connection *client, Operation *op )
|
||||
{
|
||||
Operation *op = arg;
|
||||
BerElement *output;
|
||||
Connection *client = op->o_client, *upstream;
|
||||
Connection *upstream;
|
||||
ber_int_t msgid;
|
||||
int rc;
|
||||
int rc = LDAP_SUCCESS;
|
||||
|
||||
CONNECTION_UNLOCK_INCREF(client);
|
||||
|
||||
upstream = backend_select( op );
|
||||
if ( !upstream ) {
|
||||
@ -346,10 +378,16 @@ request_process( void *ctx, void *arg )
|
||||
rc = tavl_insert(
|
||||
&upstream->c_ops, op, operation_upstream_cmp, avl_dup_error );
|
||||
CONNECTION_UNLOCK_INCREF(upstream);
|
||||
|
||||
Debug( LDAP_DEBUG_TRACE, "request_process: "
|
||||
"client connid=%lu added %s msgid=%d to upstream connid=%lu as "
|
||||
"msgid=%d\n",
|
||||
op->o_client_connid, slap_msgtype2str( op->o_tag ),
|
||||
op->o_client_msgid, op->o_upstream_connid, op->o_upstream_msgid );
|
||||
assert( rc == LDAP_SUCCESS );
|
||||
|
||||
if ( lload_features & LLOAD_FEATURE_PROXYAUTHZ ) {
|
||||
CONNECTION_LOCK(client);
|
||||
CONNECTION_LOCK_DECREF(client);
|
||||
Debug( LDAP_DEBUG_TRACE, "request_process: "
|
||||
"proxying identity %s to upstream\n",
|
||||
client->c_auth.bv_val );
|
||||
@ -358,7 +396,7 @@ request_process( void *ctx, void *arg )
|
||||
op->o_tag, &op->o_request,
|
||||
LDAP_TAG_CONTROLS,
|
||||
LDAP_CONTROL_PROXY_AUTHZ, 1, &client->c_auth );
|
||||
CONNECTION_UNLOCK(client);
|
||||
CONNECTION_UNLOCK_INCREF(client);
|
||||
|
||||
if ( !BER_BVISNULL( &op->o_ctrls ) ) {
|
||||
BerElement *control_ber = ber_alloc();
|
||||
@ -387,7 +425,8 @@ request_process( void *ctx, void *arg )
|
||||
CONNECTION_LOCK_DECREF(upstream);
|
||||
UPSTREAM_UNLOCK_OR_DESTROY(upstream);
|
||||
|
||||
return NULL;
|
||||
CONNECTION_LOCK_DECREF(client);
|
||||
return rc;
|
||||
|
||||
fail:
|
||||
if ( upstream ) {
|
||||
@ -396,5 +435,6 @@ fail:
|
||||
UPSTREAM_UNLOCK_OR_DESTROY(upstream);
|
||||
}
|
||||
operation_send_reject( op, LDAP_OTHER, "internal error", 0 );
|
||||
return NULL;
|
||||
CONNECTION_LOCK_DECREF(client);
|
||||
return rc;
|
||||
}
|
||||
|
@ -63,12 +63,14 @@ LDAP_SLAPD_F (void) ch_free( void * );
|
||||
/*
|
||||
* bind.c
|
||||
*/
|
||||
LDAP_SLAPD_F (void *) client_reset( void *ctx, void *arg );
|
||||
LDAP_SLAPD_F (void *) client_bind( void *ctx, void *arg );
|
||||
LDAP_SLAPD_F (void) client_reset( Connection *c );
|
||||
LDAP_SLAPD_F (int) client_bind( Connection *c, Operation *op );
|
||||
|
||||
/*
|
||||
* client.c
|
||||
*/
|
||||
LDAP_SLAPD_F (void *) handle_requests( void *ctx, void *arg );
|
||||
LDAP_SLAPD_F (int) handle_one_request( Connection *c );
|
||||
LDAP_SLAPD_F (Connection *) client_init( ber_socket_t s, Listener *url, const char *peername, struct event_base *base, int use_tls );
|
||||
LDAP_SLAPD_F (void) client_write_cb( evutil_socket_t s, short what, void *arg );
|
||||
LDAP_SLAPD_F (void) client_destroy( Connection *c );
|
||||
@ -154,7 +156,8 @@ LDAP_SLAPD_F (void) operation_abandon( Operation *op );
|
||||
LDAP_SLAPD_F (void) operation_send_reject( Operation *op, int result, const char *msg, int send_anyway );
|
||||
LDAP_SLAPD_F (void) operation_lost_upstream( Operation *op );
|
||||
LDAP_SLAPD_F (void) operation_destroy( Operation *op );
|
||||
LDAP_SLAPD_F (void *) request_process( void *ctx, void *arg );
|
||||
LDAP_SLAPD_F (int) request_abandon( Connection *c, Operation *op );
|
||||
LDAP_SLAPD_F (int) request_process( Connection *c, Operation *op );
|
||||
|
||||
/*
|
||||
* sl_malloc.c
|
||||
|
Loading…
Reference in New Issue
Block a user