Fix abandon checks, syncop use count.

This commit is contained in:
Howard Chu 2005-10-03 22:59:25 +00:00
parent 2233041bd0
commit 59155d9029

View File

@ -697,6 +697,35 @@ again:
return rc; return rc;
} }
static void
syncprov_free_syncop( syncops *so )
{
syncres *sr, *srnext;
GroupAssertion *ga, *gnext;
ldap_pvt_thread_mutex_lock( &so->s_mutex );
if ( --so->s_inuse > 0 ) {
ldap_pvt_thread_mutex_unlock( &so->s_mutex );
return;
}
ldap_pvt_thread_mutex_unlock( &so->s_mutex );
if ( so->s_flags & PS_IS_DETACHED ) {
filter_free( so->s_op->ors_filter );
for ( ga = so->s_op->o_groups; ga; ga=gnext ) {
gnext = ga->ga_next;
ch_free( ga );
}
ch_free( so->s_op );
}
ch_free( so->s_base.bv_val );
for ( sr=so->s_res; sr; sr=srnext ) {
srnext = sr->s_next;
ch_free( sr );
}
ldap_pvt_thread_mutex_destroy( &so->s_mutex );
ch_free( so );
}
/* Send a persistent search response */ /* Send a persistent search response */
static int static int
syncprov_sendresp( Operation *op, opcookie *opc, syncops *so, Entry **e, int mode) syncprov_sendresp( Operation *op, opcookie *opc, syncops *so, Entry **e, int mode)
@ -762,8 +791,11 @@ syncprov_sendresp( Operation *op, opcookie *opc, syncops *so, Entry **e, int mod
default: default:
assert(0); assert(0);
} }
/* In case someone else freed it already? */
if ( rs.sr_ctrls ) {
op->o_tmpfree( rs.sr_ctrls[0], op->o_tmpmemctx ); op->o_tmpfree( rs.sr_ctrls[0], op->o_tmpmemctx );
rs.sr_ctrls = NULL; rs.sr_ctrls = NULL;
}
return rs.sr_err; return rs.sr_err;
} }
@ -789,10 +821,9 @@ syncprov_qplay( Operation *op, slap_overinst *on, syncops *so )
so->s_restail = NULL; so->s_restail = NULL;
ldap_pvt_thread_mutex_unlock( &so->s_mutex ); ldap_pvt_thread_mutex_unlock( &so->s_mutex );
if ( !sr ) if ( !sr || so->s_op->o_abandon )
break; break;
if ( !so->s_op->o_abandon ) {
opc.sdn = sr->s_dn; opc.sdn = sr->s_dn;
opc.sndn = sr->s_ndn; opc.sndn = sr->s_ndn;
opc.suuid = sr->s_uuid; opc.suuid = sr->s_uuid;
@ -812,11 +843,11 @@ syncprov_qplay( Operation *op, slap_overinst *on, syncops *so )
if ( e ) { if ( e ) {
be_entry_release_rw( op, e, 0 ); be_entry_release_rw( op, e, 0 );
} }
if ( rc )
break;
}
ch_free( sr ); ch_free( sr );
if ( rc )
break;
} }
op->o_bd->bd_info = (BackendInfo *)on; op->o_bd->bd_info = (BackendInfo *)on;
return rc; return rc;
@ -854,6 +885,9 @@ syncprov_qtask( void *ctx, void *arg )
syncprov_qplay( op, on, so ); syncprov_qplay( op, on, so );
/* decrement use count... */
syncprov_free_syncop( so );
/* wait until we get explicitly scheduled again */ /* wait until we get explicitly scheduled again */
ldap_pvt_thread_mutex_lock( &slapd_rq.rq_mutex ); ldap_pvt_thread_mutex_lock( &slapd_rq.rq_mutex );
ldap_pvt_runqueue_stoptask( &slapd_rq, so->s_qtask ); ldap_pvt_runqueue_stoptask( &slapd_rq, so->s_qtask );
@ -905,12 +939,14 @@ syncprov_qresp( opcookie *opc, syncops *so, int mode )
so->s_qtask = ldap_pvt_runqueue_insert( &slapd_rq, RUNQ_INTERVAL, so->s_qtask = ldap_pvt_runqueue_insert( &slapd_rq, RUNQ_INTERVAL,
syncprov_qtask, so, "syncprov_qtask", syncprov_qtask, so, "syncprov_qtask",
so->s_op->o_conn->c_peer_name.bv_val ); so->s_op->o_conn->c_peer_name.bv_val );
++so->s_inuse;
} else { } else {
if (!ldap_pvt_runqueue_isrunning( &slapd_rq, so->s_qtask ) && if (!ldap_pvt_runqueue_isrunning( &slapd_rq, so->s_qtask ) &&
!so->s_qtask->next_sched.tv_sec ) { !so->s_qtask->next_sched.tv_sec ) {
so->s_qtask->interval.tv_sec = 0; so->s_qtask->interval.tv_sec = 0;
ldap_pvt_runqueue_resched( &slapd_rq, so->s_qtask, 0 ); ldap_pvt_runqueue_resched( &slapd_rq, so->s_qtask, 0 );
so->s_qtask->interval.tv_sec = RUNQ_INTERVAL; so->s_qtask->interval.tv_sec = RUNQ_INTERVAL;
++so->s_inuse;
} }
} }
ldap_pvt_thread_mutex_unlock( &slapd_rq.rq_mutex ); ldap_pvt_thread_mutex_unlock( &slapd_rq.rq_mutex );
@ -919,36 +955,6 @@ syncprov_qresp( opcookie *opc, syncops *so, int mode )
return LDAP_SUCCESS; return LDAP_SUCCESS;
} }
static void
syncprov_free_syncop( syncops *so )
{
syncres *sr, *srnext;
GroupAssertion *ga, *gnext;
ldap_pvt_thread_mutex_lock( &so->s_mutex );
so->s_inuse--;
if ( so->s_inuse > 0 ) {
ldap_pvt_thread_mutex_unlock( &so->s_mutex );
return;
}
ldap_pvt_thread_mutex_unlock( &so->s_mutex );
if ( so->s_flags & PS_IS_DETACHED ) {
filter_free( so->s_op->ors_filter );
for ( ga = so->s_op->o_groups; ga; ga=gnext ) {
gnext = ga->ga_next;
ch_free( ga );
}
ch_free( so->s_op );
}
ch_free( so->s_base.bv_val );
for ( sr=so->s_res; sr; sr=srnext ) {
srnext = sr->s_next;
ch_free( sr );
}
ldap_pvt_thread_mutex_destroy( &so->s_mutex );
ch_free( so );
}
static int static int
syncprov_drop_psearch( syncops *so, int lock ) syncprov_drop_psearch( syncops *so, int lock )
{ {
@ -1124,14 +1130,14 @@ syncprov_matchops( Operation *op, opcookie *opc, int saveit )
sm = op->o_tmpalloc( sizeof(syncmatches), op->o_tmpmemctx ); sm = op->o_tmpalloc( sizeof(syncmatches), op->o_tmpmemctx );
sm->sm_next = opc->smatches; sm->sm_next = opc->smatches;
sm->sm_op = ss; sm->sm_op = ss;
ss->s_inuse++; ldap_pvt_thread_mutex_lock( &ss->s_mutex );
++ss->s_inuse;
ldap_pvt_thread_mutex_unlock( &ss->s_mutex );
opc->smatches = sm; opc->smatches = sm;
} else { } else {
/* if found send UPDATE else send ADD */ /* if found send UPDATE else send ADD */
ss->s_inuse++;
syncprov_qresp( opc, ss, syncprov_qresp( opc, ss,
found ? LDAP_SYNC_MODIFY : LDAP_SYNC_ADD ); found ? LDAP_SYNC_MODIFY : LDAP_SYNC_ADD );
ss->s_inuse--;
} }
} else if ( !saveit && found ) { } else if ( !saveit && found ) {
/* send DELETE */ /* send DELETE */
@ -1811,7 +1817,10 @@ syncprov_search_response( Operation *op, SlapReply *rs )
&cookie, 1, NULL, 0 ); &cookie, 1, NULL, 0 );
/* Flush any queued persist messages */ /* Flush any queued persist messages */
if ( ss->ss_so->s_res ) { if ( ss->ss_so->s_res ) {
slap_callback *sc = op->o_callback;
op->o_callback = NULL;
syncprov_qplay( op, on, ss->ss_so ); syncprov_qplay( op, on, ss->ss_so );
op->o_callback = sc;
} }
/* Detach this Op from frontend control */ /* Detach this Op from frontend control */