ITS#6716 use sorted CSNs, fix sessionlog

track a CSN per SID in the log->sl_mincsn
This commit is contained in:
Howard Chu 2011-06-21 21:42:44 -07:00
parent 249422aa28
commit 6da3e3473c
4 changed files with 148 additions and 91 deletions

View File

@ -228,6 +228,28 @@ slap_sort_csn_sids( BerVarray csns, int *sids, int numcsns, void *memctx )
return rc;
}
void
slap_insert_csn_sids(
struct sync_cookie *ck,
int pos,
int sid,
struct berval *csn
)
{
int i;
ck->numcsns++;
ck->ctxcsn = ch_realloc( ck->ctxcsn,
(ck->numcsns+1) * sizeof(struct berval));
BER_BVZERO( &ck->ctxcsn[ck->numcsns] );
ck->sids = ch_realloc( ck->sids, ck->numcsns * sizeof(int));
for ( i = ck->numcsns-1; i > pos; i-- ) {
ck->ctxcsn[i] = ck->ctxcsn[i-1];
ck->sids[i] = ck->sids[i-1];
}
ck->sids[i] = sid;
ber_dupbv( &ck->ctxcsn[i], csn );
}
int
slap_parse_sync_cookie(
struct sync_cookie *cookie,

View File

@ -113,7 +113,9 @@ typedef struct slog_entry {
} slog_entry;
typedef struct sessionlog {
struct berval sl_mincsn;
BerVarray sl_mincsn;
int *sl_sids;
int sl_numcsns;
int sl_num;
int sl_size;
slog_entry *sl_head;
@ -124,8 +126,8 @@ typedef struct sessionlog {
/* The main state for this overlay */
typedef struct syncprov_info_t {
syncops *si_ops;
BerVarray si_ctxcsn; /* ldapsync context */
struct berval si_contextdn;
BerVarray si_ctxcsn; /* ldapsync context */
int *si_sids;
int si_numcsns;
int si_chkops; /* checkpointing info */
@ -1550,13 +1552,29 @@ syncprov_add_slog( Operation *op )
} else {
sl->sl_head = se;
sl->sl_tail = se;
if ( !sl->sl_mincsn ) {
sl->sl_numcsns = 1;
sl->sl_mincsn = ch_malloc( 2*sizeof( struct berval ));
sl->sl_sids = ch_malloc( sizeof( int ));
sl->sl_sids[0] = se->se_sid;
ber_dupbv( sl->sl_mincsn, &se->se_csn );
BER_BVZERO( &sl->sl_mincsn[1] );
}
}
sl->sl_num++;
while ( sl->sl_num > sl->sl_size ) {
int i, j;
se = sl->sl_head;
sl->sl_head = se->se_next;
AC_MEMCPY( sl->sl_mincsn.bv_val, se->se_csn.bv_val, se->se_csn.bv_len );
sl->sl_mincsn.bv_len = se->se_csn.bv_len;
for ( i=0; i<sl->sl_numcsns; i++ )
if ( sl->sl_sids[i] >= se->se_sid )
break;
if ( i == sl->sl_numcsns || sl->sl_sids[i] != se->se_sid ) {
slap_insert_csn_sids( (struct sync_cookie *)sl,
i, se->se_sid, &se->se_csn );
} else {
ber_bvreplace( &sl->sl_mincsn[i], &se->se_csn );
}
ch_free( se );
sl->sl_num--;
}
@ -1792,6 +1810,8 @@ syncprov_op_response( Operation *op, SlapReply *rs )
#endif
sid = slap_parse_csn_sid( &maxcsn );
for ( i=0; i<si->si_numcsns; i++ ) {
if ( sid < si->si_sids[i] )
break;
if ( sid == si->si_sids[i] ) {
if ( ber_bvcmp( &maxcsn, &si->si_ctxcsn[i] ) > 0 ) {
ber_bvreplace( &si->si_ctxcsn[i], &maxcsn );
@ -1801,13 +1821,10 @@ syncprov_op_response( Operation *op, SlapReply *rs )
}
}
/* It's a new SID for us */
if ( i == si->si_numcsns ) {
value_add_one( &si->si_ctxcsn, &maxcsn );
if ( i == si->si_numcsns || sid != si->si_sids[i] ) {
slap_insert_csn_sids((struct sync_cookie *)&(si->si_ctxcsn),
i, sid, &maxcsn );
csn_changed = 1;
si->si_numcsns++;
si->si_sids = ch_realloc( si->si_sids, si->si_numcsns *
sizeof(int));
si->si_sids[i] = sid;
}
}
@ -1827,6 +1844,8 @@ syncprov_op_response( Operation *op, SlapReply *rs )
for ( i=0; i<mod->sml_numvals; i++ ) {
sid = slap_parse_csn_sid( &mod->sml_values[i] );
for ( j=0; j<si->si_numcsns; j++ ) {
if ( sid < si->si_sids[j] )
break;
if ( sid == si->si_sids[j] ) {
if ( ber_bvcmp( &mod->sml_values[i], &si->si_ctxcsn[j] ) > 0 ) {
ber_bvreplace( &si->si_ctxcsn[j], &mod->sml_values[i] );
@ -1836,12 +1855,9 @@ syncprov_op_response( Operation *op, SlapReply *rs )
}
}
if ( j == si->si_numcsns ) {
value_add_one( &si->si_ctxcsn, &mod->sml_values[i] );
si->si_numcsns++;
si->si_sids = ch_realloc( si->si_sids, si->si_numcsns *
sizeof(int));
si->si_sids[j] = sid;
if ( j == si->si_numcsns || sid != si->si_sids[j] ) {
slap_insert_csn_sids( (struct sync_cookie *)&si->si_ctxcsn,
j, sid, &mod->sml_values[i] );
csn_changed = 1;
}
}
@ -2416,6 +2432,7 @@ syncprov_op_search( Operation *op, SlapReply *rs )
BerVarray ctxcsn;
int i, *sids, numcsns;
struct berval mincsn, maxcsn;
int minsid, maxsid;
int dirty = 0;
if ( !(op->o_sync_mode & SLAP_SYNC_REFRESH) ) return SLAP_CB_CONTINUE;
@ -2511,20 +2528,21 @@ syncprov_op_search( Operation *op, SlapReply *rs )
/* If there are SIDs we don't recognize in the cookie, drop them */
for (i=0; i<srs->sr_state.numcsns; ) {
for (j=0; j<numcsns; j++) {
if ( srs->sr_state.sids[i] == sids[j] ) {
for (j=i; j<numcsns; j++) {
if ( srs->sr_state.sids[i] <= sids[j] ) {
break;
}
}
/* not found */
if ( j == numcsns ) {
struct berval tmp = srs->sr_state.ctxcsn[i];
j = srs->sr_state.numcsns - 1;
srs->sr_state.ctxcsn[i] = srs->sr_state.ctxcsn[j];
tmp.bv_len = 0;
srs->sr_state.ctxcsn[j] = tmp;
srs->sr_state.numcsns = j;
srs->sr_state.sids[i] = srs->sr_state.sids[j];
if ( j == numcsns || srs->sr_state.sids[i] != sids[j] ) {
char *tmp = srs->sr_state.ctxcsn[i].bv_val;
srs->sr_state.numcsns--;
for ( j=i; j<srs->sr_state.numcsns; j++ ) {
srs->sr_state.ctxcsn[j] = srs->sr_state.ctxcsn[j+1];
srs->sr_state.sids[j] = srs->sr_state.sids[j+1];
}
srs->sr_state.ctxcsn[j].bv_val = tmp;
srs->sr_state.ctxcsn[j].bv_len = 0;
continue;
}
i++;
@ -2533,60 +2551,50 @@ syncprov_op_search( Operation *op, SlapReply *rs )
/* Find the smallest CSN which differs from contextCSN */
mincsn.bv_len = 0;
maxcsn.bv_len = 0;
for ( i=0; i<srs->sr_state.numcsns; i++ ) {
for ( j=0; j<numcsns; j++ ) {
if ( srs->sr_state.sids[i] != sids[j] )
continue;
if ( BER_BVISEMPTY( &maxcsn ) || ber_bvcmp( &maxcsn,
&srs->sr_state.ctxcsn[i] ) < 0 ) {
maxcsn = srs->sr_state.ctxcsn[i];
for ( i=0,j=0; i<srs->sr_state.numcsns; i++ ) {
int newer;
while ( srs->sr_state.sids[i] != sids[j] ) j++;
if ( BER_BVISEMPTY( &maxcsn ) || ber_bvcmp( &maxcsn,
&srs->sr_state.ctxcsn[i] ) < 0 ) {
maxcsn = srs->sr_state.ctxcsn[i];
maxsid = sids[j];
}
newer = ber_bvcmp( &srs->sr_state.ctxcsn[i], &ctxcsn[j] );
/* If our state is newer, tell consumer about changes */
if ( newer < 0) {
changed = SS_CHANGED;
if ( BER_BVISEMPTY( &mincsn ) || ber_bvcmp( &mincsn,
&srs->sr_state.ctxcsn[i] ) > 0 ) {
mincsn = srs->sr_state.ctxcsn[i];
minsid = sids[j];
}
if ( ber_bvcmp( &srs->sr_state.ctxcsn[i], &ctxcsn[j] ) < 0) {
if ( BER_BVISEMPTY( &mincsn ) || ber_bvcmp( &mincsn,
&srs->sr_state.ctxcsn[i] ) > 0 ) {
mincsn = srs->sr_state.ctxcsn[i];
}
} else if ( newer > 0 ) {
/* our state is older, complain to consumer */
rs->sr_err = LDAP_UNWILLING_TO_PERFORM;
rs->sr_text = "consumer state is newer than provider!";
bailout:
if ( sop ) {
syncops **sp = &si->si_ops;
ldap_pvt_thread_mutex_lock( &si->si_ops_mutex );
while ( *sp != sop )
sp = &(*sp)->s_next;
*sp = sop->s_next;
ldap_pvt_thread_mutex_unlock( &si->si_ops_mutex );
ch_free( sop );
}
rs->sr_ctrls = NULL;
send_ldap_result( op, rs );
return rs->sr_err;
}
}
if ( BER_BVISEMPTY( &mincsn ))
if ( BER_BVISEMPTY( &mincsn )) {
mincsn = maxcsn;
minsid = maxsid;
}
/* If nothing has changed, shortcut it */
if ( srs->sr_state.numcsns == numcsns ) {
int i, j, newer;
for ( i=0; i<srs->sr_state.numcsns; i++ ) {
for ( j=0; j<numcsns; j++ ) {
if ( srs->sr_state.sids[i] != sids[j] )
continue;
newer = ber_bvcmp( &srs->sr_state.ctxcsn[i], &ctxcsn[j] );
/* If our state is newer, tell consumer about changes */
if ( newer < 0 )
changed = SS_CHANGED;
else if ( newer > 0 ) {
/* our state is older, complain to consumer */
rs->sr_err = LDAP_UNWILLING_TO_PERFORM;
rs->sr_text = "consumer state is newer than provider!";
bailout:
if ( sop ) {
syncops **sp = &si->si_ops;
ldap_pvt_thread_mutex_lock( &si->si_ops_mutex );
while ( *sp != sop )
sp = &(*sp)->s_next;
*sp = sop->s_next;
ldap_pvt_thread_mutex_unlock( &si->si_ops_mutex );
ch_free( sop );
}
rs->sr_ctrls = NULL;
send_ldap_result( op, rs );
return rs->sr_err;
}
break;
}
if ( changed )
break;
}
if ( !changed && !dirty ) {
do_present = 0;
no_change: if ( !(op->o_sync_mode & SLAP_SYNC_PERSIST) ) {
@ -2611,11 +2619,31 @@ no_change: if ( !(op->o_sync_mode & SLAP_SYNC_PERSIST) ) {
/* Do we have a sessionlog for this search? */
sl=si->si_logs;
if ( sl ) {
int do_play = 0;
ldap_pvt_thread_mutex_lock( &sl->sl_mutex );
/* Are there any log entries, and is the consumer state
* present in the session log?
*/
if ( sl->sl_num > 0 && ber_bvcmp( &mincsn, &sl->sl_mincsn ) >= 0 ) {
if ( sl->sl_num > 0 ) {
int i;
for ( i=0; i<sl->sl_numcsns; i++ ) {
/* SID not present == new enough */
if ( minsid < sl->sl_sids[i] ) {
do_play = 1;
break;
}
/* SID present and new enough */
if ( minsid == sl->sl_sids[i]
&& ber_bvcmp( &mincsn, &sl->sl_mincsn[i] ) >= 0 ) {
do_play = 1;
break;
}
}
/* SID not present == new enough */
if ( i == sl->sl_numcsns )
do_play = 1;
}
if ( do_play ) {
do_present = 0;
/* mutex is unlocked in playlog */
syncprov_playlog( op, rs, sl, srs, ctxcsn, numcsns, sids );
@ -2947,10 +2975,11 @@ sp_cf_gen(ConfigArgs *c)
}
sl = si->si_logs;
if ( !sl ) {
sl = ch_malloc( sizeof( sessionlog ) + LDAP_PVT_CSNSTR_BUFSIZE );
sl->sl_mincsn.bv_val = (char *)(sl+1);
sl->sl_mincsn.bv_len = 0;
sl = ch_malloc( sizeof( sessionlog ));
sl->sl_mincsn = NULL;
sl->sl_sids = NULL;
sl->sl_num = 0;
sl->sl_numcsns = 0;
sl->sl_head = sl->sl_tail = NULL;
ldap_pvt_thread_mutex_init( &sl->sl_mutex );
si->si_logs = sl;
@ -2980,6 +3009,7 @@ syncprov_db_otask(
return NULL;
}
/* Read any existing contextCSN from the underlying db.
* Then search for any entries newer than that. If no value exists,
* just generate it. Cache whatever result.
@ -3040,6 +3070,7 @@ syncprov_db_open(
ber_bvarray_dup_x( &si->si_ctxcsn, a->a_vals, NULL );
si->si_numcsns = a->a_numvals;
si->si_sids = slap_parse_csn_sids( si->si_ctxcsn, a->a_numvals, NULL );
slap_sort_csn_sids( si->si_ctxcsn, si->si_sids, si->si_numcsns, NULL );
}
overlay_entry_release_ov( op, e, 0, on );
if ( si->si_ctxcsn && !SLAP_DBCLEAN( be )) {
@ -3079,14 +3110,11 @@ syncprov_db_open(
if ( si->si_logs && si->si_numcsns ) {
sessionlog *sl = si->si_logs;
int i;
/* If there are multiple, find the newest */
for ( i=0; i < si->si_numcsns; i++ ) {
if ( ber_bvcmp( &sl->sl_mincsn, &si->si_ctxcsn[i] ) < 0 ) {
AC_MEMCPY( sl->sl_mincsn.bv_val, si->si_ctxcsn[i].bv_val,
si->si_ctxcsn[i].bv_len );
sl->sl_mincsn.bv_len = si->si_ctxcsn[i].bv_len;
}
}
ber_bvarray_dup_x( &sl->sl_mincsn, si->si_ctxcsn, NULL );
sl->sl_numcsns = si->si_numcsns;
sl->sl_sids = ch_malloc( si->si_numcsns * sizeof(int) );
for ( i=0; i < si->si_numcsns; i++ )
sl->sl_sids[i] = si->si_sids[i];
}
out:
@ -3188,14 +3216,19 @@ syncprov_db_destroy(
if ( si ) {
if ( si->si_logs ) {
slog_entry *se = si->si_logs->sl_head;
sessionlog *sl = si->si_logs;
slog_entry *se = sl->sl_head;
while ( se ) {
slog_entry *se_next = se->se_next;
ch_free( se );
se = se_next;
}
if ( sl->sl_mincsn )
ber_bvarray_free( sl->sl_mincsn );
if ( sl->sl_sids )
ch_free( sl->sl_sids );
ldap_pvt_thread_mutex_destroy(&si->si_logs->sl_mutex);
ch_free( si->si_logs );
}

View File

@ -1179,6 +1179,8 @@ LDAP_SLAPD_F (int *) slap_parse_csn_sids LDAP_P((
BerVarray, int, void *memctx ));
LDAP_SLAPD_F (int) slap_sort_csn_sids LDAP_P((
BerVarray, int *, int, void *memctx ));
LDAP_SLAPD_F (void) slap_insert_csn_sids LDAP_P((
struct sync_cookie *ck, int, int, struct berval * ));
LDAP_SLAPD_F (int) slap_parse_sync_cookie LDAP_P((
struct sync_cookie *, void *memctx ));
LDAP_SLAPD_F (int) slap_init_sync_cookie_ctxcsn LDAP_P((

View File

@ -1750,12 +1750,12 @@ struct syncinfo_s;
#define SLAP_SYNCUUID_SET_SIZE 256
struct sync_cookie {
struct berval *ctxcsn;
struct berval octet_str;
int rid;
int sid;
int numcsns;
BerVarray ctxcsn;
int *sids;
int numcsns;
int rid;
struct berval octet_str;
int sid;
LDAP_STAILQ_ENTRY(sync_cookie) sc_next;
};