Added sessionlog support. consumer needs work...

This commit is contained in:
Howard Chu 2004-12-08 00:47:24 +00:00
parent 99f2d0b49b
commit 8bad70d957
2 changed files with 301 additions and 31 deletions

View File

@ -56,8 +56,8 @@ typedef struct syncops {
struct berval s_base; /* ndn of search base */
ID s_eid; /* entryID of search base */
Operation *s_op; /* search op */
long s_sid;
long s_rid;
int s_sid;
int s_rid;
struct berval s_filterstr;
int s_flags; /* search status */
int s_inuse; /* reference count */
@ -91,6 +91,25 @@ typedef struct syncmatches {
syncops *sm_op;
} syncmatches;
/* Session log data */
typedef struct slog_entry {
struct slog_entry *se_next;
struct berval se_uuid;
struct berval se_csn;
ber_tag_t se_tag;
} slog_entry;
typedef struct sessionlog {
struct sessionlog *sl_next;
int sl_sid;
struct berval sl_mincsn;
int sl_num;
int sl_size;
slog_entry *sl_head;
slog_entry *sl_tail;
ldap_pvt_thread_mutex_t sl_mutex;
} sessionlog;
/* The main state for this overlay */
typedef struct syncprov_info_t {
syncops *si_ops;
@ -100,6 +119,7 @@ typedef struct syncprov_info_t {
int si_numops; /* number of ops since last checkpoint */
time_t si_chklast; /* time of last checkpoint */
Avlnode *si_mods; /* entries being modified */
sessionlog *si_logs;
ldap_pvt_thread_mutex_t si_csn_mutex;
ldap_pvt_thread_mutex_t si_ops_mutex;
ldap_pvt_thread_mutex_t si_mods_mutex;
@ -539,11 +559,12 @@ findpres_cb( Operation *op, SlapReply *rs )
{
slap_callback *sc = op->o_callback;
fpres_cookie *pc = sc->sc_private;
Attribute *a;
int ret = SLAP_CB_CONTINUE;
if ( rs->sr_type == REP_SEARCH ) {
Attribute *a = attr_find( rs->sr_entry->e_attrs,
slap_schema.si_ad_entryUUID );
switch ( rs->sr_type ) {
case REP_SEARCH:
a = attr_find( rs->sr_entry->e_attrs, slap_schema.si_ad_entryUUID );
if ( a ) {
pc->uuids[pc->num].bv_val = pc->last;
AC_MEMCPY( pc->uuids[pc->num].bv_val, a->a_nvals[0].bv_val,
@ -553,15 +574,10 @@ findpres_cb( Operation *op, SlapReply *rs )
pc->uuids[pc->num].bv_val = NULL;
}
ret = LDAP_SUCCESS;
if ( pc->num == SLAP_SYNCUUID_SET_SIZE ) {
ret = syncprov_sendinfo( op, rs, LDAP_TAG_SYNC_ID_SET, NULL,
0, pc->uuids, 0 );
pc->uuids[pc->num].bv_val = pc->last;
pc->num = 0;
pc->last = pc->uuids[0].bv_val;
}
} else if ( rs->sr_type == REP_RESULT ) {
if ( pc->num != SLAP_SYNCUUID_SET_SIZE )
break;
/* FALLTHRU */
case REP_RESULT:
ret = rs->sr_err;
if ( pc->num ) {
ret = syncprov_sendinfo( op, rs, LDAP_TAG_SYNC_ID_SET, NULL,
@ -570,6 +586,9 @@ findpres_cb( Operation *op, SlapReply *rs )
pc->num = 0;
pc->last = pc->uuids[0].bv_val;
}
break;
default:
break;
}
return ret;
}
@ -1056,6 +1075,183 @@ syncprov_checkpoint( Operation *op, SlapReply *rs, slap_overinst *on )
opm.o_bd->be_modify( &opm, rs );
}
static void
syncprov_add_slog( Operation *op, struct berval *csn )
{
opcookie *opc = op->o_callback->sc_private;
slap_overinst *on = opc->son;
syncprov_info_t *si = on->on_bi.bi_private;
sessionlog *sl;
slog_entry *se;
for ( sl = si->si_logs; sl; sl=sl->sl_next ) {
/* Allocate a record. UUIDs are not NUL-terminated. */
se = ch_malloc( sizeof( slog_entry ) + opc->suuid.bv_len +
csn->bv_len + 1 );
se->se_next = NULL;
se->se_tag = op->o_tag;
se->se_uuid.bv_val = (char *)(se+1);
se->se_csn.bv_val = se->se_uuid.bv_val + opc->suuid.bv_len + 1;
AC_MEMCPY( se->se_uuid.bv_val, opc->suuid.bv_val, opc->suuid.bv_len );
se->se_uuid.bv_len = opc->suuid.bv_len;
AC_MEMCPY( se->se_csn.bv_val, csn->bv_val, csn->bv_len );
se->se_csn.bv_val[csn->bv_len] = '\0';
se->se_csn.bv_len = csn->bv_len;
ldap_pvt_thread_mutex_lock( &sl->sl_mutex );
if ( sl->sl_head ) {
sl->sl_tail->se_next = se;
} else {
sl->sl_head = se;
}
sl->sl_tail = se;
sl->sl_num++;
while ( sl->sl_num > sl->sl_size ) {
se = sl->sl_head;
sl->sl_head = se->se_next;
strcpy( sl->sl_mincsn.bv_val, se->se_csn.bv_val );
sl->sl_mincsn.bv_len = se->se_csn.bv_len;
ch_free( se );
sl->sl_num--;
if ( !sl->sl_head ) {
sl->sl_tail = NULL;
}
}
ldap_pvt_thread_mutex_unlock( &sl->sl_mutex );
}
}
/* Just set a flag if we found the matching entry */
static int
playlog_cb( Operation *op, SlapReply *rs )
{
if ( rs->sr_type == REP_SEARCH ) {
op->o_callback->sc_private = (void *)1;
}
return rs->sr_err;
}
/* enter with sl->sl_mutex locked, release before returning */
static void
syncprov_playlog( Operation *op, SlapReply *rs, sessionlog *sl,
struct berval *oldcsn, struct berval *ctxcsn )
{
slap_overinst *on = (slap_overinst *)op->o_bd->bd_info;
syncprov_info_t *si = on->on_bi.bi_private;
slog_entry *se;
int i, j, ndel, num, nmods, mmods;
BerVarray uuids;
num = sl->sl_num;
i = 0;
nmods = 0;
uuids = op->o_tmpalloc( (num+1) * sizeof( struct berval ) +
num * UUID_LEN, op->o_tmpmemctx );
uuids[0].bv_val = (char *)(uuids + num + 1);
/* Make a copy of the relevant UUIDs. Put the Deletes up front
* and everything else at the end. Do this first so we can
* unlock the list mutex.
*/
for ( se=sl->sl_head; se; se=se->se_next ) {
if ( ber_bvcmp( &se->se_csn, oldcsn ) < 0 ) continue;
if ( ber_bvcmp( &se->se_csn, ctxcsn ) > 0 ) break;
if ( se->se_tag == LDAP_REQ_DELETE ) {
j = i;
i++;
} else {
nmods++;
j = num - nmods;
}
uuids[j].bv_val = uuids[0].bv_val + (j * UUID_LEN);
AC_MEMCPY(uuids[j].bv_val, se->se_uuid.bv_val, UUID_LEN);
uuids[j].bv_len = UUID_LEN;
}
ldap_pvt_thread_mutex_unlock( &sl->sl_mutex );
ndel = i;
/* Mods must be validated to see if they belong in this delete set.
*/
mmods = nmods;
/* Strip any duplicates */
for ( i=0; i<nmods; i++ ) {
for ( j=0; j<ndel; j++ ) {
if ( bvmatch( &uuids[j], &uuids[num - 1 - i] )) {
uuids[num - 1 - i].bv_len = 0;
mmods --;
break;
}
}
if ( uuids[num - 1 - i].bv_len == 0 ) continue;
for ( j=0; j<i; j++ ) {
if ( bvmatch( &uuids[num - 1 - j], &uuids[num - 1 - i] )) {
uuids[num - 1 - i].bv_len = 0;
mmods --;
break;
}
}
}
if ( mmods ) {
Operation fop;
SlapReply frs = { REP_RESULT };
int rc;
Filter mf, af;
AttributeAssertion eq;
slap_callback cb = {0};
fop = *op;
fop.o_sync_mode = 0;
fop.o_callback = &cb;
fop.ors_limit = NULL;
fop.ors_slimit = 1;
fop.ors_tlimit = SLAP_NO_LIMIT;
fop.ors_attrs = slap_anlist_all_attributes;
fop.ors_attrsonly = 0;
fop.o_managedsait = SLAP_CONTROL_CRITICAL;
af.f_choice = LDAP_FILTER_AND;
af.f_next = NULL;
af.f_and = &mf;
mf.f_choice = LDAP_FILTER_EQUALITY;
mf.f_ava = &eq;
mf.f_av_desc = slap_schema.si_ad_entryUUID;
mf.f_next = fop.ors_filter;
fop.ors_filter = &af;
cb.sc_response = playlog_cb;
for ( i=0; i<nmods; i++ ) {
if ( uuids[num - 1 - 1].bv_len == 0 ) continue;
mf.f_av_value = uuids[num -1 -i];
filter2bv_x( &fop, fop.ors_filter, &fop.ors_filterstr );
fop.o_bd->bd_info = on->on_info->oi_orig;
cb.sc_private = NULL;
rc = fop.o_bd->be_search( &fop, &frs );
fop.o_bd->bd_info = (BackendInfo *)on;
op->o_tmpfree( fop.ors_filterstr.bv_val, op->o_tmpmemctx );
/* If entry was not found, add to delete list */
if ( !cb.sc_private ) {
uuids[ndel++] = uuids[num - 1 - i];
}
}
}
if ( ndel ) {
uuids[ndel].bv_val = NULL;
syncprov_sendinfo( op, rs, LDAP_TAG_SYNC_ID_SET, NULL, 0, uuids, 1 );
}
}
static int
syncprov_op_response( Operation *op, SlapReply *rs )
{
@ -1066,13 +1262,13 @@ syncprov_op_response( Operation *op, SlapReply *rs )
if ( rs->sr_err == LDAP_SUCCESS )
{
struct berval maxcsn;
struct berval maxcsn = BER_BVNULL, curcsn = BER_BVNULL;
char cbuf[LDAP_LUTIL_CSNSTR_BUFSIZE];
/* Update our context CSN */
cbuf[0] = '\0';
ldap_pvt_thread_mutex_lock( &si->si_csn_mutex );
slap_get_commit_csn( op, &maxcsn );
slap_get_commit_csn( op, &maxcsn, &curcsn );
if ( !BER_BVISNULL( &maxcsn ) ) {
strcpy( cbuf, maxcsn.bv_val );
if ( ber_bvcmp( &maxcsn, &si->si_ctxcsn ) > 0 ) {
@ -1127,6 +1323,11 @@ syncprov_op_response( Operation *op, SlapReply *rs )
}
}
/* Add any log records */
if ( si->si_logs && op->o_tag != LDAP_REQ_ADD ) {
syncprov_add_slog( op, &curcsn );
}
}
return SLAP_CB_CONTINUE;
}
@ -1255,11 +1456,12 @@ syncprov_op_mod( Operation *op, SlapReply *rs )
avl_insert( &si->si_mods, mt, sp_avl_cmp, avl_dup_error );
ldap_pvt_thread_mutex_unlock( &si->si_mods_mutex );
}
if ( op->o_tag != LDAP_REQ_ADD )
syncprov_matchops( op, opc, 1 );
}
if (( si->si_ops || si->si_logs ) && op->o_tag != LDAP_REQ_ADD )
syncprov_matchops( op, opc, 1 );
return SLAP_CB_CONTINUE;
}
@ -1275,6 +1477,7 @@ syncprov_op_extended( Operation *op, SlapReply *rs )
typedef struct searchstate {
slap_overinst *ss_on;
syncops *ss_so;
int ss_present;
} searchstate;
static int
@ -1390,11 +1593,12 @@ syncprov_search_response( Operation *op, SlapReply *rs )
op->o_tmpmemctx );
rs->sr_ctrls[1] = NULL;
rs->sr_err = syncprov_done_ctrl( op, rs, rs->sr_ctrls,
0, 1, &cookie, LDAP_SYNC_REFRESH_PRESENTS );
0, 1, &cookie, ss->ss_present ? LDAP_SYNC_REFRESH_PRESENTS :
LDAP_SYNC_REFRESH_DELETES );
} else {
int locked = 0;
/* It's RefreshAndPersist, transition to Persist phase */
syncprov_sendinfo( op, rs, rs->sr_nentries ?
syncprov_sendinfo( op, rs, ss->ss_present ?
LDAP_TAG_SYNC_REFRESH_PRESENT : LDAP_TAG_SYNC_REFRESH_DELETE,
&cookie, 1, NULL, 0 );
/* Flush any queued persist messages */
@ -1457,11 +1661,13 @@ syncprov_op_search( Operation *op, SlapReply *rs )
slap_overinst *on = (slap_overinst *)op->o_bd->bd_info;
syncprov_info_t *si = (syncprov_info_t *)on->on_bi.bi_private;
slap_callback *cb;
int gotstate = 0, nochange = 0;
int gotstate = 0, nochange = 0, do_present = 1;
Filter *fand, *fava;
syncops *sop = NULL;
searchstate *ss;
sync_control *srs;
struct berval ctxcsn;
char csnbuf[LDAP_LUTIL_CSNSTR_BUFSIZE];
if ( !(op->o_sync_mode & SLAP_SYNC_REFRESH) ) return SLAP_CB_CONTINUE;
@ -1508,16 +1714,24 @@ syncprov_op_search( Operation *op, SlapReply *rs )
ldap_pvt_thread_mutex_unlock( &si->si_ops_mutex );
}
/* If we have a cookie, handle the PRESENT lookups
*/
/* snapshot the ctxcsn */
ldap_pvt_thread_mutex_lock( &si->si_csn_mutex );
strcpy( csnbuf, si->si_ctxcsnbuf );
ctxcsn.bv_len = si->si_ctxcsn.bv_len;
ldap_pvt_thread_mutex_unlock( &si->si_csn_mutex );
ctxcsn.bv_val = csnbuf;
/* If we have a cookie, handle the PRESENT lookups */
if ( srs->sr_state.ctxcsn ) {
sessionlog *sl;
/* Is the CSN in a valid format? */
if ( srs->sr_state.ctxcsn->bv_len >= LDAP_LUTIL_CSNSTR_BUFSIZE ) {
send_ldap_error( op, rs, LDAP_OTHER, "invalid sync cookie" );
return rs->sr_err;
}
/* If just Refreshing and nothing has changed, shortcut it */
if ( bvmatch( srs->sr_state.ctxcsn, &si->si_ctxcsn )) {
if ( bvmatch( srs->sr_state.ctxcsn, &ctxcsn )) {
nochange = 1;
if ( !(op->o_sync_mode & SLAP_SYNC_PERSIST) ) {
LDAPControl *ctrls[2];
@ -1534,6 +1748,19 @@ syncprov_op_search( Operation *op, SlapReply *rs )
}
goto shortcut;
}
/* Do we have a sessionlog for this search? */
for ( sl=si->si_logs; sl; sl=sl->sl_next )
if ( sl->sl_sid == srs->sr_state.sid ) break;
if ( sl ) {
ldap_pvt_thread_mutex_lock( &sl->sl_mutex );
if ( ber_bvcmp( srs->sr_state.ctxcsn, &sl->sl_mincsn ) >= 0 ) {
do_present = 0;
/* mutex is unlocked in playlog */
syncprov_playlog( op, rs, sl, srs->sr_state.ctxcsn, &ctxcsn );
} else {
ldap_pvt_thread_mutex_unlock( &sl->sl_mutex );
}
}
/* Is the CSN still present in the database? */
if ( syncprov_findcsn( op, FIND_CSN ) != LDAP_SUCCESS ) {
/* No, so a reload is required */
@ -1545,8 +1772,9 @@ syncprov_op_search( Operation *op, SlapReply *rs )
#endif
} else {
gotstate = 1;
/* If context has changed, check for Present UUIDs */
if ( syncprov_findcsn( op, FIND_PRESENT ) != LDAP_SUCCESS ) {
/* If changed and doing Present lookup, send Present UUIDs */
if ( do_present && syncprov_findcsn( op, FIND_PRESENT ) !=
LDAP_SUCCESS ) {
send_ldap_result( op, rs );
return rs->sr_err;
}
@ -1567,9 +1795,7 @@ syncprov_op_search( Operation *op, SlapReply *rs )
fava->f_choice = LDAP_FILTER_LE;
fava->f_ava = op->o_tmpalloc( sizeof(AttributeAssertion), op->o_tmpmemctx );
fava->f_ava->aa_desc = slap_schema.si_ad_entryCSN;
ldap_pvt_thread_mutex_lock( &si->si_csn_mutex );
ber_dupbv_x( &fava->f_ava->aa_value, &si->si_ctxcsn, op->o_tmpmemctx );
ldap_pvt_thread_mutex_unlock( &si->si_csn_mutex );
ber_dupbv_x( &fava->f_ava->aa_value, &ctxcsn, op->o_tmpmemctx );
fand->f_and = fava;
if ( gotstate ) {
fava->f_next = op->o_tmpalloc( sizeof(Filter), op->o_tmpmemctx );
@ -1589,6 +1815,7 @@ shortcut:
ss = (searchstate *)(cb+1);
ss->ss_on = on;
ss->ss_so = sop;
ss->ss_present = do_present;
cb->sc_response = syncprov_search_response;
cb->sc_cleanup = syncprov_search_cleanup;
cb->sc_private = ss;
@ -1679,6 +1906,47 @@ syncprov_db_config(
si->si_chktime = atoi( argv[2] ) * 60;
return 0;
} else if ( strcasecmp( argv[0], "syncprov-sessionlog" ) == 0 ) {
sessionlog *sl;
int sid, size;
if ( argc != 3 ) {
fprintf( stderr, "%s: line %d: wrong number of arguments in "
"\"syncprov-sessionlog <sid> <size>\"\n", fname, lineno );
return -1;
}
sid = atoi( argv[1] );
if ( sid < 0 || sid > 999 ) {
fprintf( stderr,
"%s: line %d: session log id %d is out of range [0..999]\n",
fname, lineno, sid );
return -1;
}
size = atoi( argv[2] );
if ( size < 0 ) {
fprintf( stderr,
"%s: line %d: session log size %d is negative\n",
fname, lineno, size );
return -1;
}
for ( sl = si->si_logs; sl; sl=sl->sl_next ) {
if ( sl->sl_sid == sid ) {
sl->sl_size = size;
break;
}
}
if ( !sl ) {
sl = ch_malloc( sizeof( sessionlog ) + LDAP_LUTIL_CSNSTR_BUFSIZE );
sl->sl_mincsn.bv_val = (char *)(sl+1);
sl->sl_mincsn.bv_len = 0;
sl->sl_sid = sid;
sl->sl_size = size;
sl->sl_num = 0;
sl->sl_head = sl->sl_tail = NULL;
sl->sl_next = si->si_logs;
ldap_pvt_thread_mutex_init( &sl->sl_mutex );
si->si_logs = sl;
}
return 0;
}
return SLAP_CONF_UNKNOWN;

View File

@ -2065,11 +2065,12 @@ typedef struct slap_paged_state {
#define LDAP_PSEARCH_BY_SCOPEOUT 0x05
#define LDAP_PSEARCH_BY_PREDELETE 0x06
struct psid_entry {
struct psid_entry { /* DELETE ME */
struct slap_op *ps_op;
LDAP_LIST_ENTRY(psid_entry) ps_link;
};
#if 0 /* DELETE ME */
struct slog_entry {
struct berval sl_uuid;
struct berval sl_name;
@ -2084,6 +2085,7 @@ struct slap_session_entry {
struct berval se_spec;
LDAP_LIST_ENTRY( slap_session_entry ) se_link;
};
#endif
struct slap_csn_entry {
struct berval ce_csn;