ITS#6368 use dup'd entries in response queue

This commit is contained in:
Howard Chu 2009-11-22 04:42:00 +00:00
parent a7f9086ade
commit c365ac359e

View File

@ -48,6 +48,7 @@ typedef struct modtarget {
/* A queued result of a persistent search */
typedef struct syncres {
struct syncres *s_next;
Entry *s_e;
struct berval s_dn;
struct berval s_ndn;
struct berval s_uuid;
@ -145,6 +146,7 @@ typedef struct opcookie {
slap_overinst *son;
syncmatches *smatches;
modtarget *smt;
Entry *se;
struct berval sdn; /* DN of entry, for deletes */
struct berval sndn;
struct berval suuid; /* UUID of entry */
@ -154,6 +156,11 @@ typedef struct opcookie {
short sreference; /* Is the entry a reference? */
} opcookie;
typedef struct mutexint {
ldap_pvt_thread_mutex_t mi_mutex;
int mi_int;
} mutexint;
typedef struct fbase_cookie {
struct berval *fdn; /* DN of a modified entry, for scope testing */
syncops *fss; /* persistent search we're testing against */
@ -739,6 +746,36 @@ again:
return rc;
}
/* Should find a place to cache these */
static mutexint *get_mutexint()
{
mutexint *mi = ch_malloc( sizeof( mutexint ));
ldap_pvt_thread_mutex_init( &mi->mi_mutex );
mi->mi_int = 1;
return mi;
}
static void inc_mutexint( mutexint *mi )
{
ldap_pvt_thread_mutex_lock( &mi->mi_mutex );
mi->mi_int++;
ldap_pvt_thread_mutex_unlock( &mi->mi_mutex );
}
/* return resulting counter */
static int dec_mutexint( mutexint *mi )
{
int i;
ldap_pvt_thread_mutex_lock( &mi->mi_mutex );
i = --mi->mi_int;
ldap_pvt_thread_mutex_unlock( &mi->mi_mutex );
if ( !i ) {
ldap_pvt_thread_mutex_destroy( &mi->mi_mutex );
ch_free( mi );
}
return i;
}
static void
syncprov_free_syncop( syncops *so )
{
@ -762,6 +799,12 @@ syncprov_free_syncop( syncops *so )
ch_free( so->s_base.bv_val );
for ( sr=so->s_res; sr; sr=srnext ) {
srnext = sr->s_next;
if ( sr->s_e ) {
if ( !dec_mutexint( sr->s_e->e_private )) {
sr->s_e->e_private = NULL;
entry_free( sr->s_e );
}
}
ch_free( sr );
}
ldap_pvt_thread_mutex_destroy( &so->s_mutex );
@ -770,8 +813,7 @@ syncprov_free_syncop( syncops *so )
/* Send a persistent search response */
static int
syncprov_sendresp( Operation *op, opcookie *opc, syncops *so,
Entry **e, int mode )
syncprov_sendresp( Operation *op, opcookie *opc, syncops *so, int mode )
{
slap_overinst *on = opc->son;
@ -811,29 +853,20 @@ syncprov_sendresp( Operation *op, opcookie *opc, syncops *so,
}
rs.sr_ctrls = ctrls;
op->o_bd->bd_info = (BackendInfo *)on->on_info;
switch( mode ) {
case LDAP_SYNC_ADD:
rs.sr_entry = *e;
if ( rs.sr_entry->e_private )
rs.sr_flags = REP_ENTRY_MUSTRELEASE;
rs.sr_entry = opc->se;
if ( opc->sreference && so->s_op->o_managedsait <= SLAP_CONTROL_IGNORED ) {
rs.sr_ref = get_entry_referrals( op, rs.sr_entry );
rs.sr_err = send_search_reference( op, &rs );
ber_bvarray_free( rs.sr_ref );
if ( !rs.sr_entry )
*e = NULL;
break;
}
/* fallthru */
case LDAP_SYNC_MODIFY:
rs.sr_entry = *e;
if ( rs.sr_entry->e_private )
rs.sr_flags = REP_ENTRY_MUSTRELEASE;
rs.sr_entry = opc->se;
rs.sr_attrs = op->ors_attrs;
rs.sr_err = send_search_entry( op, &rs );
if ( !rs.sr_entry )
*e = NULL;
break;
case LDAP_SYNC_DELETE:
e_uuid.e_attrs = NULL;
@ -905,23 +938,16 @@ syncprov_qplay( Operation *op, syncops *so )
opc.suuid = sr->s_uuid;
opc.sctxcsn = sr->s_csn;
opc.sreference = sr->s_isreference;
e = NULL;
opc.se = sr->s_e;
if ( sr->s_mode != LDAP_SYNC_DELETE ) {
rc = overlay_entry_get_ov( op, &opc.sndn, NULL, NULL, 0, &e, on );
if ( rc ) {
Debug( LDAP_DEBUG_SYNC, "syncprov_qplay: failed to get %s, "
"error (%d), ignoring...\n", opc.sndn.bv_val, rc, 0 );
ch_free( sr );
rc = 0;
continue;
rc = syncprov_sendresp( op, &opc, so, sr->s_mode );
if ( opc.se ) {
if ( !dec_mutexint( opc.se->e_private )) {
opc.se->e_private = NULL;
entry_free ( opc.se );
}
}
rc = syncprov_sendresp( op, &opc, so, &e, sr->s_mode );
if ( e ) {
overlay_entry_release_ov( op, e, 0, on );
}
}
ch_free( sr );
@ -1014,6 +1040,10 @@ syncprov_qresp( opcookie *opc, syncops *so, int mode )
srsize += cookie.bv_len + 1;
sr = ch_malloc( srsize );
sr->s_next = NULL;
sr->s_e = opc->se;
/* bump refcount on this entry */
if ( opc->se )
inc_mutexint( opc->se->e_private );
sr->s_dn.bv_val = (char *)(sr + 1);
sr->s_dn.bv_len = opc->sdn.bv_len;
sr->s_mode = mode;
@ -1157,9 +1187,12 @@ syncprov_matchops( Operation *op, opcookie *opc, int saveit )
rc = overlay_entry_get_ov( op, fc.fdn, NULL, NULL, 0, &e, on );
/* If we're sending responses now, make a copy and unlock the DB */
if ( e && !saveit ) {
Entry *e2 = entry_dup( e );
if ( !opc->se ) {
opc->se = entry_dup( e );
opc->se->e_private = get_mutexint();
}
overlay_entry_release_ov( op, e, 0, on );
e = e2;
e = opc->se;
}
if ( rc ) {
op->o_bd = b0;
@ -1167,6 +1200,13 @@ syncprov_matchops( Operation *op, opcookie *opc, int saveit )
}
} else {
e = op->ora_e;
if ( !saveit ) {
if ( !opc->se ) {
opc->se = entry_dup( e );
opc->se->e_private = get_mutexint();
}
e = opc->se;
}
}
if ( saveit || op->o_tag == LDAP_REQ_ADD ) {
@ -1227,7 +1267,6 @@ syncprov_matchops( Operation *op, opcookie *opc, int saveit )
continue;
}
/* If we're sending results now, look for this op in old matches */
if ( !saveit ) {
syncmatches *old;
@ -1299,9 +1338,17 @@ syncprov_matchops( Operation *op, opcookie *opc, int saveit )
if ( !SLAP_ISOVERLAY( op->o_bd )) {
op->o_bd = &db;
}
overlay_entry_release_ov( op, e, 0, on );
if ( saveit )
overlay_entry_release_ov( op, e, 0, on );
op->o_bd = b0;
}
if ( opc->se && !saveit ) {
if ( !dec_mutexint( opc->se->e_private )) {
opc->se->e_private = NULL;
entry_free( opc->se );
opc->se = NULL;
}
}
if ( freefdn ) {
op->o_tmpfree( fc.fdn->bv_val, op->o_tmpmemctx );
}