Always queue psearch responses (ITS#3671 revisited)

This commit is contained in:
Howard Chu 2005-10-02 03:58:00 +00:00
parent 62f7c85be4
commit 47a055b59b

View File

@ -26,6 +26,7 @@
#include "lutil.h"
#include "slap.h"
#include "config.h"
#include "ldap_rq.h"
/* A modify request on a particular entry */
typedef struct modinst {
@ -63,6 +64,7 @@ typedef struct syncops {
int s_inuse; /* reference count */
struct syncres *s_res;
struct syncres *s_restail;
void *s_qtask; /* task for playing psearch responses */
ldap_pvt_thread_mutex_t s_mutex;
} syncops;
@ -680,12 +682,178 @@ again:
return rc;
}
/* Send a persistent search response */
static int
syncprov_sendresp( Operation *op, opcookie *opc, syncops *so, Entry **e, int mode)
{
slap_overinst *on = opc->son;
SlapReply rs = { REP_SEARCH };
LDAPControl *ctrls[2];
struct berval cookie;
Entry e_uuid = {0};
Attribute a_uuid = {0};
if ( so->s_op->o_abandon )
return SLAPD_ABANDON;
ctrls[1] = NULL;
slap_compose_sync_cookie( op, &cookie, &opc->sctxcsn, so->s_rid );
e_uuid.e_attrs = &a_uuid;
a_uuid.a_desc = slap_schema.si_ad_entryUUID;
a_uuid.a_nvals = &opc->suuid;
rs.sr_err = syncprov_state_ctrl( op, &rs, &e_uuid,
mode, ctrls, 0, 1, &cookie );
rs.sr_ctrls = ctrls;
op->o_bd->bd_info = (BackendInfo *)on->on_info;
switch( mode ) {
case LDAP_SYNC_ADD:
rs.sr_entry = *e;
if ( rs.sr_entry->e_private )
rs.sr_flags = REP_ENTRY_MUSTRELEASE;
if ( opc->sreference ) {
rs.sr_ref = get_entry_referrals( op, rs.sr_entry );
send_search_reference( op, &rs );
ber_bvarray_free( rs.sr_ref );
if ( !rs.sr_entry )
*e = NULL;
break;
}
/* fallthru */
case LDAP_SYNC_MODIFY:
rs.sr_entry = *e;
if ( rs.sr_entry->e_private )
rs.sr_flags = REP_ENTRY_MUSTRELEASE;
rs.sr_attrs = op->ors_attrs;
send_search_entry( op, &rs );
if ( !rs.sr_entry )
*e = NULL;
break;
case LDAP_SYNC_DELETE:
e_uuid.e_attrs = NULL;
e_uuid.e_name = opc->sdn;
e_uuid.e_nname = opc->sndn;
rs.sr_entry = &e_uuid;
if ( opc->sreference ) {
struct berval bv = BER_BVNULL;
rs.sr_ref = &bv;
send_search_reference( op, &rs );
} else {
send_search_entry( op, &rs );
}
break;
default:
assert(0);
}
op->o_tmpfree( rs.sr_ctrls[0], op->o_tmpmemctx );
rs.sr_ctrls = NULL;
return rs.sr_err;
}
/* Play back queued responses */
static int
syncprov_qplay( Operation *op, slap_overinst *on, syncops *so )
{
syncres *sr;
Entry *e;
opcookie opc;
int rc;
opc.son = on;
op->o_bd->bd_info = (BackendInfo *)on->on_info;
for (;;) {
ldap_pvt_thread_mutex_lock( &so->s_mutex );
sr = so->s_res;
if ( sr )
so->s_res = sr->s_next;
if ( !so->s_res )
so->s_restail = NULL;
ldap_pvt_thread_mutex_unlock( &so->s_mutex );
if ( !sr )
break;
if ( !so->s_op->o_abandon ) {
opc.sdn = sr->s_dn;
opc.sndn = sr->s_ndn;
opc.suuid = sr->s_uuid;
opc.sctxcsn = sr->s_csn;
opc.sreference = sr->s_isreference;
e = NULL;
if ( sr->s_mode != LDAP_SYNC_DELETE ) {
rc = be_entry_get_rw( op, &opc.sndn, NULL, NULL, 0, &e );
if ( rc ) {
ch_free( sr );
continue;
}
}
rc = syncprov_sendresp( op, &opc, so, &e, sr->s_mode );
if ( e ) {
be_entry_release_rw( op, e, 0 );
}
if ( rc )
break;
}
ch_free( sr );
}
op->o_bd->bd_info = (BackendInfo *)on;
return rc;
}
/* runqueue task for playing back queued responses */
static void *
syncprov_qtask( void *ctx, void *arg )
{
struct re_s *rtask = arg;
syncops *so = rtask->arg;
slap_overinst *on = so->s_op->o_private;
char opbuf[OPERATION_BUFFER_SIZE];
Operation *op;
BackendDB be;
op = (Operation *)opbuf;
memset( op, 0, sizeof(opbuf));
op->o_hdr = (Opheader *)(op+1);
op->o_controls = (void **)(op->o_hdr+1);
*op->o_hdr = *so->s_op->o_hdr;
op->o_tmpmemctx = slap_sl_mem_create(SLAP_SLAB_SIZE, SLAP_SLAB_STACK, ctx);
op->o_tmpmfuncs = &slap_sl_mfuncs;
op->o_threadctx = ctx;
/* syncprov_qplay expects a fake db */
be = *so->s_op->o_bd;
be.be_flags |= SLAP_DBFLAG_OVERLAY;
op->o_bd = &be;
op->o_private = NULL;
op->o_callback = NULL;
syncprov_qplay( op, on, so );
/* wait until we get explicitly scheduled again */
ldap_pvt_thread_mutex_lock( &slapd_rq.rq_mutex );
ldap_pvt_runqueue_stoptask( &slapd_rq, so->s_qtask );
ldap_pvt_runqueue_resched( &slapd_rq, so->s_qtask, 1 );
ldap_pvt_thread_mutex_unlock( &slapd_rq.rq_mutex );
return NULL;
}
/* Queue a persistent search response */
static int
syncprov_qresp( opcookie *opc, syncops *so, int mode )
{
syncres *sr;
ldap_pvt_thread_mutex_lock( &so->s_mutex );
sr = ch_malloc(sizeof(syncres) + opc->suuid.bv_len + 1 +
opc->sdn.bv_len + 1 + opc->sndn.bv_len + 1 + opc->sctxcsn.bv_len + 1 );
sr->s_next = NULL;
@ -709,178 +877,21 @@ syncprov_qresp( opcookie *opc, syncops *so, int mode )
so->s_restail->s_next = sr;
}
so->s_restail = sr;
ldap_pvt_thread_mutex_unlock( &so->s_mutex );
return LDAP_SUCCESS;
}
/* Play back queued responses */
static int
syncprov_sendresp( Operation *op, opcookie *opc, syncops *so, Entry **e, int mode, int queue );
static int
syncprov_qplay( Operation *op, slap_overinst *on, syncops *so )
{
syncres *sr, *srnext;
Entry *e;
opcookie opc;
int rc;
opc.son = on;
op->o_bd->bd_info = (BackendInfo *)on->on_info;
for (sr = so->s_res; sr; sr=srnext) {
srnext = sr->s_next;
opc.sdn = sr->s_dn;
opc.sndn = sr->s_ndn;
opc.suuid = sr->s_uuid;
opc.sctxcsn = sr->s_csn;
opc.sreference = sr->s_isreference;
e = NULL;
if ( sr->s_mode != LDAP_SYNC_DELETE ) {
rc = be_entry_get_rw( op, &opc.sndn, NULL, NULL, 0, &e );
if ( rc ) {
ch_free( sr );
so->s_res = srnext;
continue;
if ( so->s_flags & PS_IS_DETACHED ) {
ldap_pvt_thread_mutex_lock( &slapd_rq.rq_mutex );
if ( !so->s_qtask ) {
so->s_qtask = ldap_pvt_runqueue_insert( &slapd_rq, 0,
syncprov_qtask, so, "syncprov_qtask",
so->s_op->o_conn->c_peer_name.bv_val );
} else {
if (!ldap_pvt_runqueue_isrunning( &slapd_rq, so->s_qtask )) {
ldap_pvt_runqueue_resched( &slapd_rq, so->s_qtask, 0 );
}
}
rc = syncprov_sendresp( op, &opc, so, &e, sr->s_mode, 0 );
if ( e ) {
be_entry_release_rw( op, e, 0 );
}
if ( rc )
break;
ch_free( sr );
so->s_res = srnext;
ldap_pvt_thread_mutex_unlock( &slapd_rq.rq_mutex );
}
op->o_bd->bd_info = (BackendInfo *)on;
if ( !so->s_res )
so->s_restail = NULL;
return rc;
}
/* Send a persistent search response */
static int
syncprov_sendresp( Operation *op, opcookie *opc, syncops *so, Entry **e, int mode, int queue )
{
slap_overinst *on = opc->son;
SlapReply rs = { REP_SEARCH };
LDAPControl *ctrls[2];
struct berval cookie;
Entry e_uuid = {0};
Attribute a_uuid = {0};
Operation sop = *so->s_op;
Opheader ohdr;
if ( so->s_op->o_abandon )
return SLAPD_ABANDON;
ohdr = *sop.o_hdr;
sop.o_hdr = &ohdr;
sop.o_tmpmemctx = op->o_tmpmemctx;
sop.o_bd = op->o_bd;
sop.o_controls = op->o_controls;
sop.o_private = op->o_private;
sop.o_callback = NULL;
/* If queueing is allowed */
if ( queue ) {
ldap_pvt_thread_mutex_lock( &so->s_mutex );
/* If we're still in refresh mode, must queue */
if (so->s_flags & PS_IS_REFRESHING) {
return syncprov_qresp( opc, so, mode );
}
/* If connection is free but queue is non-empty,
* try to flush the queue.
*/
if ( so->s_res ) {
rs.sr_err = syncprov_qplay( &sop, on, so );
}
/* If the connection is busy, must queue */
if ( sop.o_conn->c_writewaiter || rs.sr_err == LDAP_BUSY ) {
return syncprov_qresp( opc, so, mode );
}
ldap_pvt_thread_mutex_unlock( &so->s_mutex );
/* If syncprov_qplay returned any other error, bail out. */
if ( rs.sr_err ) {
return rs.sr_err;
}
} else {
/* Queueing not allowed and conn is busy, give up */
if ( sop.o_conn->c_writewaiter )
return LDAP_BUSY;
}
ctrls[1] = NULL;
slap_compose_sync_cookie( op, &cookie, &opc->sctxcsn, so->s_rid );
e_uuid.e_attrs = &a_uuid;
a_uuid.a_desc = slap_schema.si_ad_entryUUID;
a_uuid.a_nvals = &opc->suuid;
rs.sr_err = syncprov_state_ctrl( &sop, &rs, &e_uuid,
mode, ctrls, 0, 1, &cookie );
rs.sr_ctrls = ctrls;
op->o_bd->bd_info = (BackendInfo *)on->on_info;
switch( mode ) {
case LDAP_SYNC_ADD:
rs.sr_entry = *e;
if ( rs.sr_entry->e_private )
rs.sr_flags = REP_ENTRY_MUSTRELEASE;
if ( opc->sreference ) {
rs.sr_ref = get_entry_referrals( &sop, rs.sr_entry );
send_search_reference( &sop, &rs );
ber_bvarray_free( rs.sr_ref );
if ( !rs.sr_entry )
*e = NULL;
break;
}
/* fallthru */
case LDAP_SYNC_MODIFY:
rs.sr_entry = *e;
if ( rs.sr_entry->e_private )
rs.sr_flags = REP_ENTRY_MUSTRELEASE;
rs.sr_attrs = sop.ors_attrs;
send_search_entry( &sop, &rs );
if ( !rs.sr_entry )
*e = NULL;
break;
case LDAP_SYNC_DELETE:
e_uuid.e_attrs = NULL;
e_uuid.e_name = opc->sdn;
e_uuid.e_nname = opc->sndn;
rs.sr_entry = &e_uuid;
if ( opc->sreference ) {
struct berval bv = BER_BVNULL;
rs.sr_ref = &bv;
send_search_reference( &sop, &rs );
} else {
send_search_entry( &sop, &rs );
}
break;
default:
assert(0);
}
op->o_tmpfree( rs.sr_ctrls[0], op->o_tmpmemctx );
op->o_private = sop.o_private;
rs.sr_ctrls = NULL;
/* Check queue again here; if we were hanging in a send and eventually
* recovered, there may be more to send now. But don't check if the
* original psearch has been abandoned.
*/
if ( so->s_op->o_abandon )
return SLAPD_ABANDON;
if ( rs.sr_err == LDAP_SUCCESS && queue && so->s_res ) {
ldap_pvt_thread_mutex_lock( &so->s_mutex );
rs.sr_err = syncprov_qplay( &sop, on, so );
ldap_pvt_thread_mutex_unlock( &so->s_mutex );
}
return rs.sr_err;
ldap_pvt_thread_mutex_unlock( &so->s_mutex );
return LDAP_SUCCESS;
}
static void
@ -1019,12 +1030,10 @@ syncprov_matchops( Operation *op, opcookie *opc, int saveit )
e = op->ora_e;
}
if ( saveit ) {
if ( saveit || op->o_tag == LDAP_REQ_ADD ) {
ber_dupbv_x( &opc->sdn, &e->e_name, op->o_tmpmemctx );
ber_dupbv_x( &opc->sndn, &e->e_nname, op->o_tmpmemctx );
opc->sreference = is_entry_referral( e );
}
if ( saveit || op->o_tag == LDAP_REQ_ADD ) {
a = attr_find( e->e_attrs, slap_schema.si_ad_entryUUID );
if ( a )
ber_dupbv_x( &opc->suuid, &a->a_nvals[0], op->o_tmpmemctx );
@ -1082,15 +1091,15 @@ syncprov_matchops( Operation *op, opcookie *opc, int saveit )
/* if found send UPDATE else send ADD */
ss->s_inuse++;
ldap_pvt_thread_mutex_unlock( &si->si_ops_mutex );
syncprov_sendresp( op, opc, ss, &e,
found ? LDAP_SYNC_MODIFY : LDAP_SYNC_ADD, 1 );
syncprov_qresp( opc, ss,
found ? LDAP_SYNC_MODIFY : LDAP_SYNC_ADD );
ldap_pvt_thread_mutex_lock( &si->si_ops_mutex );
ss->s_inuse--;
}
} else if ( !saveit && found ) {
/* send DELETE */
ldap_pvt_thread_mutex_unlock( &si->si_ops_mutex );
syncprov_sendresp( op, opc, ss, NULL, LDAP_SYNC_DELETE, 1 );
syncprov_qresp( opc, ss, LDAP_SYNC_DELETE );
ldap_pvt_thread_mutex_lock( &si->si_ops_mutex );
}
}
@ -1444,8 +1453,7 @@ syncprov_op_response( Operation *op, SlapReply *rs )
for ( sm = opc->smatches; sm; sm=sm->sm_next ) {
if ( sm->sm_op->s_op->o_abandon )
continue;
syncprov_sendresp( op, opc, sm->sm_op, NULL,
LDAP_SYNC_DELETE, 1 );
syncprov_qresp( opc, sm->sm_op, LDAP_SYNC_DELETE );
}
break;
}
@ -1633,7 +1641,7 @@ syncprov_search_cleanup( Operation *op, SlapReply *rs )
}
static void
syncprov_detach_op( Operation *op, syncops *so )
syncprov_detach_op( Operation *op, syncops *so, slap_overinst *on )
{
Operation *op2;
int i, alen = 0;
@ -1659,8 +1667,9 @@ syncprov_detach_op( Operation *op, syncops *so )
*op2->o_hdr = *op->o_hdr;
op2->o_tag = op->o_tag;
op2->o_time = op->o_time;
op2->o_bd = op->o_bd;
op2->o_bd = on->on_info->oi_origdb;
op2->o_request = op->o_request;
op2->o_private = on;
if ( i ) {
op2->ors_attrs = (AttributeName *)(op2->o_hdr + 1);
@ -1761,25 +1770,23 @@ syncprov_search_response( Operation *op, SlapReply *rs )
0, 1, &cookie, ss->ss_present ? LDAP_SYNC_REFRESH_PRESENTS :
LDAP_SYNC_REFRESH_DELETES );
} else {
int locked = 0;
/* It's RefreshAndPersist, transition to Persist phase */
syncprov_sendinfo( op, rs, ( ss->ss_present && rs->sr_nentries ) ?
LDAP_TAG_SYNC_REFRESH_PRESENT : LDAP_TAG_SYNC_REFRESH_DELETE,
&cookie, 1, NULL, 0 );
/* Flush any queued persist messages */
if ( ss->ss_so->s_res ) {
ldap_pvt_thread_mutex_lock( &ss->ss_so->s_mutex );
locked = 1;
syncprov_qplay( op, on, ss->ss_so );
}
/* Detach this Op from frontend control */
ldap_pvt_thread_mutex_lock( &ss->ss_so->s_mutex );
/* Turn off the refreshing flag */
ss->ss_so->s_flags ^= PS_IS_REFRESHING;
if ( locked )
ldap_pvt_thread_mutex_unlock( &ss->ss_so->s_mutex );
/* Detach this Op from frontend control */
syncprov_detach_op( op, ss->ss_so );
syncprov_detach_op( op, ss->ss_so, on );
ldap_pvt_thread_mutex_unlock( &ss->ss_so->s_mutex );
return LDAP_SUCCESS;
}