ITS#9598 Per OID restrictions

This commit is contained in:
Ondřej Kuzník 2020-08-10 17:14:07 +02:00
parent 0190f18b4c
commit ddc9430727
10 changed files with 525 additions and 42 deletions

View File

@ -347,10 +347,65 @@ Specify the number of seconds after a write operation is finished that
.B lloadd
will direct operations exclusively to the last selected backend. A write
operation is anything not handled internally (certain exops, abandon),
excepting search, compare and bind operations. Bind operations also reset this
except search, compare and bind operations. Bind operations also reset this
restriction. The default is 0, write operations do not restrict selection. When
negative, the restriction is not time limited and will persist until the next
bind.
.TP
.B restrict_exop <OID> <action>
Tell
.B lloadd
that extended operation with a given OID should be handled in a specific way.
OID
.B 1.1
is special, setting a default (only for operations not handled internally).
The meaning of the
.B <action>
argument is the same as in
.B restrict_control
below.
.TP
.B restrict_control <OID> <action>
Tell
.B lloadd
that a control with a given OID attached to any operation should be handled in
a specific way according to the
.B <action>
argument. At the moment, only operations passed intact are inspected in
this way, in particular, controls on bind and extended operations are
.B not
checked.
In order of descending priority (the control with highest priority action
wins), this is the action made:
.RS
.RS
.PD 0
.TP
.B reject
operations that carry this control will be rejected.
.TP
.B connection
once an upstream is selected, every future operation from this client will be
directed to the same connection. Useful when state is shared between client and
upstream that the load balancer doesn't track.
.TP
.B backend
like
.B write
except this does not time out.
.TP
.B write
this is treated like a write operation (see
.BR write_coherence )
above.
.TP
.B ignore
does not influence restrictions, useful when changing the global exop default.
This is the default handling for exops/controls not handled by the load balancer
internally.
.PD
.RE
.SH TLS OPTIONS
If
@ -804,6 +859,21 @@ Here is a short example of a configuration file:
argsfile LOCALSTATEDIR/run/lloadd.args
pidfile LOCALSTATEDIR/run/lloadd.pid
# cancel not supported yet
restrict_exop 1.3.6.1.1.8 reject
# turn not supported
restrict_exop 1.3.6.1.1.19 reject
# TXN Exop if desired, otherwise reject
restrict_exop 1.3.6.1.1.21.1 connection
# Paged results control
restrict_control 1.2.840.113556.1.4.319 connection
# VLV control
restrict_control 2.16.840.1.113730.3.4.9 connection
bindconf
bindmethod=simple
binddn=cn=test

View File

@ -197,6 +197,7 @@ request_bind( LloadConnection *client, LloadOperation *op )
unsigned long pin;
int res = LDAP_UNAVAILABLE, rc = LDAP_SUCCESS;
char *message = "no connections available";
enum op_restriction client_restricted;
CONNECTION_LOCK(client);
pin = client->c_pin_id;
@ -336,26 +337,22 @@ request_bind( LloadConnection *client, LloadOperation *op )
assert( rc == LDAP_SUCCESS );
client->c_n_ops_executing++;
if ( client->c_backend ) {
assert( client->c_restricted_inflight == 0 );
client->c_backend = NULL;
client->c_restricted_at = 0;
}
client_restricted = client->c_restricted;
CONNECTION_UNLOCK(client);
if ( pin ) {
checked_lock( &op->o_link_mutex );
upstream = op->o_upstream;
checked_unlock( &op->o_link_mutex );
}
if ( upstream ) {
checked_lock( &upstream->c_io_mutex );
CONNECTION_LOCK(upstream);
if ( !IS_ALIVE( upstream, c_live ) ) {
CONNECTION_UNLOCK(upstream);
checked_unlock( &upstream->c_io_mutex );
upstream = NULL;
}
if ( upstream ) {
checked_lock( &upstream->c_io_mutex );
CONNECTION_LOCK(upstream);
if ( !IS_ALIVE( upstream, c_live ) ) {
CONNECTION_UNLOCK(upstream);
checked_unlock( &upstream->c_io_mutex );
upstream = NULL;
}
}
@ -363,7 +360,7 @@ request_bind( LloadConnection *client, LloadOperation *op )
* have to reject the op and clear pin */
if ( upstream ) {
/* No need to do anything */
} else if ( !pin ) {
} else if ( !pin && client_restricted != LLOAD_OP_RESTRICTED_ISOLATE ) {
upstream_select( op, &upstream, &res, &message );
} else {
Debug( LDAP_DEBUG_STATS, "request_bind: "

View File

@ -94,19 +94,97 @@ request_process( LloadConnection *client, LloadOperation *op )
ber_int_t msgid;
int res = LDAP_UNAVAILABLE, rc = LDAP_SUCCESS;
char *message = "no connections available";
enum op_restriction client_restricted;
if ( lload_write_coherence ) {
CONNECTION_LOCK(client);
if ( client->c_restricted_inflight || client->c_restricted_at < 0 ||
client->c_restricted_at + lload_write_coherence >= op->o_start ) {
b = client->c_backend;
} else {
client->c_backend = NULL;
if ( lload_control_actions && !BER_BVISNULL( &op->o_ctrls ) ) {
BerElementBuffer copy_berbuf;
BerElement *copy = (BerElement *)&copy_berbuf;
struct berval control;
ber_init2( copy, &op->o_ctrls, 0 );
while ( ber_skip_element( copy, &control ) == LBER_SEQUENCE ) {
struct restriction_entry *entry, needle = {};
BerElementBuffer control_berbuf;
BerElement *control_ber = (BerElement *)&control_berbuf;
ber_init2( control_ber, &control, 0 );
if ( ber_skip_element( control_ber, &needle.oid ) == LBER_ERROR ) {
res = LDAP_PROTOCOL_ERROR;
message = "invalid control";
operation_send_reject( op, res, message, 1 );
goto fail;
}
entry = ldap_tavl_find(
lload_control_actions, &needle, lload_restriction_cmp );
if ( entry && op->o_restricted < entry->action ) {
op->o_restricted = entry->action;
}
}
CONNECTION_UNLOCK(client);
}
if ( op->o_restricted < LLOAD_OP_RESTRICTED_WRITE &&
lload_write_coherence &&
op->o_tag != LDAP_REQ_SEARCH &&
op->o_tag != LDAP_REQ_COMPARE ) {
op->o_restricted = LLOAD_OP_RESTRICTED_WRITE;
}
if ( b ) {
if ( op->o_restricted == LLOAD_OP_RESTRICTED_REJECT ) {
res = LDAP_UNWILLING_TO_PERFORM;
message = "extended operation or control disallowed";
operation_send_reject( op, res, message, 1 );
goto fail;
}
CONNECTION_LOCK(client);
client_restricted = client->c_restricted;
if ( client_restricted ) {
if ( client_restricted == LLOAD_OP_RESTRICTED_WRITE &&
client->c_restricted_inflight == 0 &&
client->c_restricted_at >= 0 &&
client->c_restricted_at + lload_write_coherence <
op->o_start ) {
Debug( LDAP_DEBUG_TRACE, "request_process: "
"connid=%lu write coherence to backend '%s' expired\n",
client->c_connid, client->c_backend->b_name.bv_val );
client->c_backend = NULL;
client_restricted = client->c_restricted = LLOAD_OP_NOT_RESTRICTED;
}
switch ( client_restricted ) {
case LLOAD_OP_NOT_RESTRICTED:
break;
case LLOAD_OP_RESTRICTED_WRITE:
case LLOAD_OP_RESTRICTED_BACKEND:
b = client->c_backend;
assert( b );
break;
case LLOAD_OP_RESTRICTED_UPSTREAM:
case LLOAD_OP_RESTRICTED_ISOLATE:
upstream = client->c_linked_upstream;
assert( upstream );
break;
default:
assert(0);
break;
}
}
if ( op->o_restricted < client_restricted ) {
op->o_restricted = client_restricted;
}
CONNECTION_UNLOCK(client);
if ( upstream ) {
b = upstream->c_backend;
checked_lock( &b->b_mutex );
if ( !try_upstream( b, NULL, op, upstream, &res, &message ) ) {
upstream = NULL;
}
checked_unlock( &b->b_mutex );
} else if ( b ) {
backend_select( b, op, &upstream, &res, &message );
} else {
upstream_select( op, &upstream, &res, &message );
@ -169,9 +247,18 @@ request_process( LloadConnection *client, LloadOperation *op )
}
upstream->c_pendingber = output;
if ( client_restricted < LLOAD_OP_RESTRICTED_UPSTREAM &&
op->o_restricted >= LLOAD_OP_RESTRICTED_UPSTREAM ) {
rc = ldap_tavl_insert(
&upstream->c_linked, client, lload_upstream_entry_cmp,
ldap_avl_dup_error );
assert( rc == LDAP_SUCCESS );
}
op->o_upstream_msgid = msgid = upstream->c_next_msgid++;
rc = ldap_tavl_insert(
&upstream->c_ops, op, operation_upstream_cmp, ldap_avl_dup_error );
CONNECTION_UNLOCK(upstream);
Debug( LDAP_DEBUG_TRACE, "request_process: "
@ -183,18 +270,27 @@ request_process( LloadConnection *client, LloadOperation *op )
lload_stats.counters[LLOAD_STATS_OPS_OTHER].lc_ops_forwarded++;
if ( lload_write_coherence && !b &&
op->o_tag != LDAP_REQ_SEARCH &&
op->o_tag != LDAP_REQ_COMPARE ) {
/*
* TODO: There can't be more than one thread receiving a new request,
* so we could drop the lock. We'd still need some atomics for the
* counters.
*/
if ( op->o_restricted > client_restricted ||
client_restricted == LLOAD_OP_RESTRICTED_WRITE ) {
CONNECTION_LOCK(client);
client->c_backend = upstream->c_backend;
client->c_restricted_inflight++;
op->o_restricted = 1;
if ( op->o_restricted > client_restricted ) {
client->c_restricted = op->o_restricted;
}
if ( op->o_restricted == LLOAD_OP_RESTRICTED_WRITE ) {
client->c_restricted_inflight++;
}
if ( op->o_restricted >= LLOAD_OP_RESTRICTED_UPSTREAM ) {
if ( client_restricted < LLOAD_OP_RESTRICTED_UPSTREAM ) {
client->c_linked_upstream = upstream;
}
assert( client->c_linked_upstream == upstream );
client->c_backend = NULL;
} else if ( op->o_restricted >= LLOAD_OP_RESTRICTED_WRITE ) {
if ( client_restricted < LLOAD_OP_RESTRICTED_WRITE ) {
client->c_backend = upstream->c_backend;
}
assert( client->c_backend == upstream->c_backend );
}
CONNECTION_UNLOCK(client);
}
@ -540,6 +636,8 @@ client_reset( LloadConnection *c )
{
TAvlnode *root;
long freed = 0, executing;
LloadConnection *linked_upstream = NULL;
enum op_restriction restricted = c->c_restricted;
CONNECTION_ASSERT_LOCKED(c);
root = c->c_ops;
@ -555,6 +653,20 @@ client_reset( LloadConnection *c )
ch_free( c->c_sasl_bind_mech.bv_val );
BER_BVZERO( &c->c_sasl_bind_mech );
}
if ( restricted && restricted < LLOAD_OP_RESTRICTED_ISOLATE ) {
if ( c->c_backend ) {
assert( c->c_restricted <= LLOAD_OP_RESTRICTED_BACKEND );
assert( c->c_restricted_inflight == 0 );
c->c_backend = NULL;
c->c_restricted_at = 0;
} else {
assert( c->c_restricted == LLOAD_OP_RESTRICTED_UPSTREAM );
assert( c->c_linked_upstream != NULL );
linked_upstream = c->c_linked_upstream;
c->c_linked_upstream = NULL;
}
}
CONNECTION_UNLOCK(c);
if ( root ) {
@ -565,6 +677,12 @@ client_reset( LloadConnection *c )
}
assert( freed == executing );
if ( linked_upstream && restricted == LLOAD_OP_RESTRICTED_UPSTREAM ) {
LloadConnection *removed = ldap_tavl_delete(
&linked_upstream->c_linked, c, lload_upstream_entry_cmp );
assert( removed == c );
}
CONNECTION_LOCK(c);
CONNECTION_ASSERT_LOCKED(c);
}
@ -586,6 +704,11 @@ client_unlink( LloadConnection *c )
state = c->c_state;
c->c_state = LLOAD_C_DYING;
if ( c->c_restricted == LLOAD_OP_RESTRICTED_ISOLATE ) {
/* Allow upstream connection to be severed in client_reset() */
c->c_restricted = LLOAD_OP_RESTRICTED_UPSTREAM;
}
read_event = c->c_read_event;
write_event = c->c_write_event;
CONNECTION_UNLOCK(c);

View File

@ -115,6 +115,7 @@ static ConfigDriver config_fname;
static ConfigDriver config_generic;
static ConfigDriver config_backend;
static ConfigDriver config_bindconf;
static ConfigDriver config_restrict_oid;
#ifdef LDAP_TCP_BUFFER
static ConfigDriver config_tcp_buffer;
#endif /* LDAP_TCP_BUFFER */
@ -179,6 +180,8 @@ enum {
CFG_MAX_PENDING_CONNS,
CFG_STARTTLS,
CFG_CLIENT_PENDING,
CFG_RESTRICT_EXOP,
CFG_RESTRICT_CONTROL,
CFG_LAST
};
@ -635,12 +638,33 @@ static ConfigTable config_back_cf_table[] = {
{ "write_coherence", "seconds", 2, 2, 0,
ARG_INT,
&lload_write_coherence,
"( OLcfgBkAt:13.38 "
"( OLcfgBkAt:13.36 "
"NAME 'olcBkLloadWriteCoherence' "
"DESC 'Keep operations to the same backend after a write' "
"EQUALITY integerMatch "
"SYNTAX OMsInteger "
"SINGLE-VALUE )",
NULL,
{ .v_int = 0 }
},
{ "restrict_exop", "OID> <action", 3, 3, 0,
ARG_MAGIC|CFG_RESTRICT_EXOP,
&config_restrict_oid,
"( OLcfgBkAt:13.37 "
"NAME 'olcBkLloadRestrictExop' "
"DESC 'Restrict upstream selection after forwarding an extended operation' "
"EQUALITY caseIgnoreMatch "
"SYNTAX OMsDirectoryString )",
NULL, NULL
},
{ "restrict_control", "OID> <action", 3, 3, 0,
ARG_MAGIC|CFG_RESTRICT_CONTROL,
&config_restrict_oid,
"( OLcfgBkAt:13.38 "
"NAME 'olcBkLloadRestrictControl' "
"DESC 'Restrict upstream selection after forwarding a control' "
"EQUALITY caseIgnoreMatch "
"SYNTAX OMsDirectoryString )",
NULL, NULL
},
@ -763,6 +787,9 @@ static ConfigOCs lloadocs[] = {
"$ olcBkLloadTLSCRLFile "
"$ olcBkLloadTLSShareSlapdCTX "
"$ olcBkLloadClientMaxPending "
"$ olcBkLloadWriteCoherence "
"$ olcBkLloadRestrictExop "
"$ olcBkLloadRestrictControl "
") )",
Cft_Backend, config_back_cf_table,
NULL,
@ -1282,6 +1309,160 @@ config_bindconf( ConfigArgs *c )
return 0;
}
#ifndef BALANCER_MODULE
char *
oidm_find( char *oid )
{
if ( OID_LEADCHAR( *oid ) ) {
return oid;
}
Debug( LDAP_DEBUG_ANY, "oidm_find: "
"full OID parsing only available when compiled as a module\n" );
return NULL;
}
#endif /* !BALANCER_MODULE */
static struct {
const char *name;
enum op_restriction action;
} restrictopts[] = {
{ "ignore", LLOAD_OP_NOT_RESTRICTED },
{ "write", LLOAD_OP_RESTRICTED_WRITE },
{ "backend", LLOAD_OP_RESTRICTED_BACKEND },
{ "connection", LLOAD_OP_RESTRICTED_UPSTREAM },
{ "isolate", LLOAD_OP_RESTRICTED_ISOLATE },
{ "reject", LLOAD_OP_RESTRICTED_REJECT },
{ NULL }
};
static void
restriction_free( struct restriction_entry *restriction )
{
ch_free( restriction->oid.bv_val );
ch_free( restriction );
}
static int
config_restrict_oid( ConfigArgs *c )
{
TAvlnode *node = NULL, **root = ( c->type == CFG_RESTRICT_EXOP ) ?
&lload_exop_actions :
&lload_control_actions;
struct restriction_entry *entry = NULL;
char *parsed_oid;
int i, rc = -1;
if ( c->op == SLAP_CONFIG_EMIT ) {
struct berval bv = { .bv_val = c->cr_msg };
if ( c->type == CFG_RESTRICT_EXOP && lload_default_exop_action ) {
bv.bv_len = snprintf( bv.bv_val, sizeof(c->cr_msg), "1.1 %s",
restrictopts[lload_default_exop_action].name );
value_add_one( &c->rvalue_vals, &bv );
}
for ( node = ldap_tavl_end( *root, TAVL_DIR_LEFT );
node;
node = ldap_tavl_next( node, TAVL_DIR_RIGHT ) ) {
entry = node->avl_data;
bv.bv_len = snprintf( bv.bv_val, sizeof(c->cr_msg), "%s %s",
entry->oid.bv_val, restrictopts[entry->action].name );
value_add_one( &c->rvalue_vals, &bv );
}
return LDAP_SUCCESS;
} else if ( c->op == LDAP_MOD_DELETE ) {
if ( !c->line ) {
ldap_tavl_free( *root, (AVL_FREE)restriction_free );
*root = NULL;
if ( c->type == CFG_RESTRICT_EXOP ) {
lload_default_exop_action = LLOAD_OP_NOT_RESTRICTED;
}
rc = LDAP_SUCCESS;
} else {
struct restriction_entry needle;
parsed_oid = strchr( c->line, ' ' );
if ( !parsed_oid ) {
return rc;
}
memcpy( c->cr_msg, c->line, parsed_oid - c->line );
c->cr_msg[parsed_oid - c->line] = '\0';
needle.oid.bv_val = oidm_find( c->cr_msg );
needle.oid.bv_len = strlen( needle.oid.bv_val );
if ( !needle.oid.bv_val ) {
return rc;
} else if ( c->type == CFG_RESTRICT_EXOP &&
!strcmp( needle.oid.bv_val, "1.1" ) ) {
lload_default_exop_action = LLOAD_OP_NOT_RESTRICTED;
} else {
/* back-config should have checked we have this value */
entry = ldap_tavl_delete( root, &needle,
lload_restriction_cmp );
assert( entry != NULL );
}
rc = LDAP_SUCCESS;
}
return rc;
}
parsed_oid = oidm_find( c->argv[1] );
if ( !parsed_oid ) {
snprintf( c->cr_msg, sizeof(c->cr_msg), "Could not parse oid %s",
c->argv[1] );
goto done;
}
for ( i = 0; restrictopts[i].name; i++ ) {
if ( !strcasecmp( c->argv[2], restrictopts[i].name ) ) {
break;
}
}
if ( !restrictopts[i].name ) {
snprintf( c->cr_msg, sizeof(c->cr_msg), "Could not parse action %s",
c->argv[2] );
goto done;
}
if ( !strcmp( parsed_oid, "1.1" ) ) {
if ( lload_default_exop_action ) {
snprintf( c->cr_msg, sizeof(c->cr_msg), "Default already set" );
goto done;
} else {
lload_default_exop_action = i;
}
}
entry = ch_malloc( sizeof(struct restriction_entry) );
/* Copy only if a reference to argv[1] was returned */
ber_str2bv( parsed_oid, 0, parsed_oid == c->argv[1], &entry->oid );
entry->action = i;
if ( ldap_tavl_insert( root, entry, lload_restriction_cmp,
ldap_avl_dup_error ) ) {
snprintf( c->cr_msg, sizeof(c->cr_msg),
"%s with OID %s already restricted",
c->type == CFG_RESTRICT_EXOP ? "Extended operation" : "Control",
c->argv[1] );
goto done;
}
rc = LDAP_SUCCESS;
done:
if ( rc ) {
Debug( LDAP_DEBUG_ANY, "%s: %s\n", c->log, c->cr_msg );
if ( parsed_oid ) ch_free( parsed_oid );
if ( entry ) ch_free( entry );
}
return rc;
}
static int
config_fname( ConfigArgs *c )
{

View File

@ -1429,6 +1429,41 @@ client_tls_cb( ldap_pvt_thread_start_t *start, void *startarg, void *arg )
}
#endif /* HAVE_TLS */
static int
detach_linked_backend_cb( LloadConnection *client, LloadBackend *b )
{
int rc = LDAP_SUCCESS;
if ( client->c_backend != b ) {
return rc;
}
Debug( LDAP_DEBUG_CONNS, "detach_linked_backend_cb: "
"detaching backend '%s' from connid=%lu%s\n",
b->b_name.bv_val, client->c_connid,
client->c_restricted == LLOAD_OP_RESTRICTED_BACKEND ?
" and closing the connection" :
"" );
/* We were approached from the connection list */
assert( IS_ALIVE( client, c_refcnt ) );
assert( client->c_restricted == LLOAD_OP_RESTRICTED_WRITE ||
client->c_restricted == LLOAD_OP_RESTRICTED_BACKEND );
if ( client->c_restricted == LLOAD_OP_RESTRICTED_BACKEND ) {
int gentle = 1;
CONNECTION_LOCK(client);
rc = lload_connection_close( client, &gentle );
CONNECTION_UNLOCK(client);
}
client->c_restricted = LLOAD_OP_NOT_RESTRICTED;
client->c_restricted_at = 0;
client->c_restricted_inflight = 0;
return rc;
}
void
lload_handle_backend_invalidation( LloadChange *change )
{
@ -1458,6 +1493,13 @@ lload_handle_backend_invalidation( LloadChange *change )
&connection_pool, handle_pdus, backend_conn_cb, b );
ldap_pvt_thread_pool_walk(
&connection_pool, upstream_bind, backend_conn_cb, b );
checked_lock( &clients_mutex );
connections_walk(
&clients_mutex, &clients,
(CONNCB)detach_linked_backend_cb, b );
checked_unlock( &clients_mutex );
lload_backend_destroy( b );
return;
}

View File

@ -125,6 +125,7 @@ int
request_extended( LloadConnection *c, LloadOperation *op )
{
ExopHandler *handler, needle = {};
struct restriction_entry *restriction, rneedle = {};
BerElement *copy;
struct berval bv;
ber_tag_t tag;
@ -158,6 +159,15 @@ request_extended( LloadConnection *c, LloadOperation *op )
}
ber_free( copy, 0 );
rneedle.oid = bv;
restriction = ldap_tavl_find( lload_exop_actions, &rneedle,
lload_restriction_cmp );
if ( restriction ) {
op->o_restricted = restriction->action;
} else {
op->o_restricted = lload_default_exop_action;
}
return request_process( c, op );
}

View File

@ -298,9 +298,18 @@ enum sc_io_state {
/* Tracking whether an operation might cause a client to restrict which
* upstreams are eligible */
enum op_restriction {
LLOAD_OP_NOT_RESTRICTED, /* operation didn't trigger any restriction */
LLOAD_OP_RESTRICTED_BACKEND, /* operation restricts a client to a certain backend */
LLOAD_OP_RESTRICTED_UPSTREAM, /* operation restricts a client to a certain upstream */
LLOAD_OP_NOT_RESTRICTED, /* no restrictions in place */
LLOAD_OP_RESTRICTED_WRITE, /* client is restricted to a certain backend with
* a timeout attached */
LLOAD_OP_RESTRICTED_BACKEND, /* client is restricted to a certain backend,
* without a timeout */
LLOAD_OP_RESTRICTED_UPSTREAM, /* client is restricted to a certain upstream */
LLOAD_OP_RESTRICTED_ISOLATE, /* TODO: client is restricted to a certain
* upstream and removes the upstream from the
* pool */
LLOAD_OP_RESTRICTED_REJECT, /* operation should not be forwarded to any
* backend, either it is processed internally
* or rejected */
};
/*
@ -410,9 +419,13 @@ struct LloadConnection {
long c_n_ops_completed; /* num of ops completed */
lload_counters_t c_counters; /* per connection operation counters */
LloadBackend *c_backend;
enum op_restriction c_restricted;
uintptr_t c_restricted_inflight;
time_t c_restricted_at;
LloadBackend *c_backend;
LloadConnection *c_linked_upstream;
TAvlnode *c_linked;
/*
* Protected by the CIRCLEQ mutex:
@ -470,6 +483,11 @@ struct LloadOperation {
BerValue o_request, o_ctrls;
};
struct restriction_entry {
struct berval oid;
enum op_restriction action;
};
/*
* listener; need to access it from monitor backend
*/

View File

@ -21,6 +21,10 @@
ldap_pvt_thread_mutex_t lload_pin_mutex;
unsigned long lload_next_pin = 1;
TAvlnode *lload_control_actions = NULL;
TAvlnode *lload_exop_actions = NULL;
enum op_restriction lload_default_exop_action = LLOAD_OP_NOT_RESTRICTED;
ber_tag_t
slap_req2res( ber_tag_t tag )
{
@ -84,6 +88,13 @@ lload_msgtype2str( ber_tag_t tag )
return "unknown message";
}
int
lload_restriction_cmp( const void *left, const void *right )
{
const struct restriction_entry *l = left, *r = right;
return ber_bvcmp( &l->oid, &r->oid );
}
int
operation_client_cmp( const void *left, const void *right )
{
@ -276,7 +287,7 @@ operation_unlink_client( LloadOperation *op, LloadConnection *client )
assert( op == removed );
client->c_n_ops_executing--;
if ( op->o_restricted ) {
if ( op->o_restricted == LLOAD_OP_RESTRICTED_WRITE ) {
if ( !--client->c_restricted_inflight && client->c_restricted_at >= 0 ) {
if ( lload_write_coherence < 0 ) {
client->c_restricted_at = -1;

View File

@ -172,6 +172,10 @@ LDAP_SLAPD_F (int) lload_monitor_backend_init( BackendInfo *bi, LloadBackend *b
*/
LDAP_SLAPD_V (ldap_pvt_thread_mutex_t) lload_pin_mutex;
LDAP_SLAPD_V (unsigned long) lload_next_pin;
LDAP_SLAPD_V (TAvlnode *) lload_control_actions;
LDAP_SLAPD_V (TAvlnode *) lload_exop_actions;
LDAP_SLAPD_V (enum op_restriction) lload_default_exop_action;
LDAP_SLAPD_F (int) lload_restriction_cmp( const void *left, const void *right );
LDAP_SLAPD_F (const char *) lload_msgtype2str( ber_tag_t tag );
LDAP_SLAPD_F (int) operation_upstream_cmp( const void *l, const void *r );
LDAP_SLAPD_F (int) operation_client_cmp( const void *l, const void *r );
@ -192,6 +196,7 @@ LDAP_SLAPD_F (void) operation_update_global_rejected( LloadOperation *op );
/*
* upstream.c
*/
LDAP_SLAPD_F (int) lload_upstream_entry_cmp( const void *l, const void *r );
LDAP_SLAPD_F (int) forward_final_response( LloadConnection *client, LloadOperation *op, BerElement *ber );
LDAP_SLAPD_F (int) forward_response( LloadConnection *client, LloadOperation *op, BerElement *ber );
LDAP_SLAPD_F (void *) upstream_bind( void *ctx, void *arg );

View File

@ -40,6 +40,27 @@ static const sasl_callback_t client_callbacks[] = {
static void upstream_unlink( LloadConnection *upstream );
int
lload_upstream_entry_cmp( const void *l, const void *r )
{
return SLAP_PTRCMP( l, r );
}
static void
linked_upstream_lost( LloadConnection *client )
{
int gentle = 1;
CONNECTION_LOCK(client);
assert( client->c_restricted >= LLOAD_OP_RESTRICTED_UPSTREAM );
assert( client->c_linked_upstream );
client->c_restricted = LLOAD_OP_NOT_RESTRICTED;
client->c_linked_upstream = NULL;
CONNECTION_UNLOCK(client);
lload_connection_close( client, &gentle );
}
int
forward_response( LloadConnection *client, LloadOperation *op, BerElement *ber )
{
@ -996,7 +1017,7 @@ upstream_unlink( LloadConnection *c )
{
LloadBackend *b = c->c_backend;
struct event *read_event, *write_event;
TAvlnode *root;
TAvlnode *root, *linked_root;
long freed, executing;
Debug( LDAP_DEBUG_CONNS, "upstream_unlink: "
@ -1017,11 +1038,16 @@ upstream_unlink( LloadConnection *c )
executing = c->c_n_ops_executing;
c->c_n_ops_executing = 0;
linked_root = c->c_linked;
c->c_linked = NULL;
CONNECTION_UNLOCK(c);
freed = ldap_tavl_free( root, (AVL_FREE)operation_lost_upstream );
assert( freed == executing );
ldap_tavl_free( linked_root, (AVL_FREE)linked_upstream_lost );
/*
* Avoid a deadlock:
* event_del will block if the event is currently executing its callback,