syncrepl changes

- can handle multiple syncinfo
- ldap_sync_search() added
This commit is contained in:
Jong Hyuk Choi 2003-06-11 17:03:57 +00:00
parent 3f8bb27a83
commit f07179ca61
5 changed files with 156 additions and 131 deletions

View File

@ -363,9 +363,10 @@ int backend_startup(Backend *be)
#ifdef LDAP_SYNCREPL
if ( backendDB[i].syncinfo != NULL ) {
syncinfo_t *si = ( syncinfo_t * ) backendDB[i].syncinfo;
si->be = &backendDB[i];
ldap_pvt_thread_mutex_lock( &syncrepl_rq.rq_mutex );
ldap_pvt_runqueue_insert( &syncrepl_rq, si->interval,
do_syncrepl, (void *) &backendDB[i] );
do_syncrepl, (void *) backendDB[i].syncinfo );
ldap_pvt_thread_mutex_unlock( &syncrepl_rq.rq_mutex );
}
#endif

View File

@ -2698,6 +2698,10 @@ add_syncrepl(
si = be->syncinfo = (syncinfo_t *) ch_calloc( 1, sizeof( syncinfo_t ) );
/* set default values; FIXME : add others */
si->tlimit = NULL;
si->slimit = -1;
if ( si == NULL ) {
#ifdef NEW_LOGGING
LDAP_LOG( CONFIG, ERR, "out of memory in add_syncrepl\n", 0, 0,0 );

View File

@ -1139,8 +1139,18 @@ LDAP_SLAPD_V (struct runqueue_s) syncrepl_rq;
LDAP_SLAPD_F (void) init_syncrepl LDAP_P(());
LDAP_SLAPD_F (void*) do_syncrepl LDAP_P((void *, void *));
LDAP_SLAPD_F (char **) str2clist( char **, char *, const char * );
LDAP_SLAPD_F (int) ldap_sync_search LDAP_P((
syncinfo_t *, LDAP *, LDAPControl **, LDAPControl **, int *));
LDAP_SLAPD_F (Entry*) syncrepl_message_to_entry LDAP_P((
syncinfo_t *, LDAP *, Operation *, LDAPMessage *,
Modifications **, int*, struct berval *, struct berval * ));
LDAP_SLAPD_F (int) syncrepl_entry LDAP_P((
syncinfo_t *, LDAP *, Operation*, Entry*,
Modifications*,int, struct berval*, struct berval*, int ));
LDAP_SLAPD_F (void) syncrepl_updateCookie LDAP_P((
syncinfo_t *, LDAP *, Operation *, struct berval *,
struct berval * ));
LDAP_SLAPD_F (char **) str2clist LDAP_P(( char **, char *, const char * ));
#endif
LDAP_END_DECL

View File

@ -1326,7 +1326,7 @@ typedef struct syncinfo_s {
#define LASTMOD_REQ 0
#define LASTMOD_GEN 1
#define LASTMOD_NO 2
int lastmod;
int lastmod;
/* TLS flags */
#define TLS_OFF 0
#define TLS_ON 1

View File

@ -37,35 +37,24 @@
#include "ldap_rq.h"
static Entry*
syncrepl_message_to_entry ( LDAP *, Operation *, LDAPMessage *,
Modifications **, int*, struct berval *, struct berval * );
static int
syncrepl_entry( LDAP *, Operation*, Entry*, Modifications*,
int, struct berval*, struct berval*, int );
static int
syncrepl_del_nonpresent( LDAP *, Operation * );
syncrepl_del_nonpresent( syncinfo_t *, LDAP *, Operation * );
static void
syncrepl_add_glue( LDAP *, Operation*, Entry*, Modifications*, int,
struct berval*, struct berval* );
static void
syncrepl_updateCookie( LDAP *, Operation *, struct berval *, struct berval * );
syncrepl_add_glue( syncinfo_t *, LDAP *, Operation*, Entry*, Modifications*,
int, struct berval*, struct berval* );
static int
slap_mods_check_syncrepl( Operation *, Modifications **,
slap_mods_check_syncrepl( syncinfo_t *, Operation *, Modifications **,
const char **, char *, size_t, void *ctx );
static int
slap_mods_opattrs_syncrepl( Operation *, Modifications *, Modifications **,
const char **, char *, size_t );
slap_mods_opattrs_syncrepl( syncinfo_t *, Operation *, Modifications *,
Modifications **, const char **, char *, size_t );
static int
slap_mods2entry_syncrepl( Modifications *, Entry **, int,
const char **, char *, size_t );
slap_mods2entry_syncrepl( syncinfo_t *, Modifications *, Entry **, int,
const char **, char *, size_t );
/* callback functions */
static int cookie_callback( struct slap_op *, struct slap_rep * );
@ -114,14 +103,83 @@ init_syncrepl()
del_descs_lastmod[3] = NULL;
}
int
ldap_sync_search(
syncinfo_t *si,
LDAP *ld,
LDAPControl **sctrls,
LDAPControl **cctrls,
int *msgidp )
{
BerElement *ber;
int timelimit;
ber_int_t id;
int rc;
BerElement *sync_ber = NULL;
struct berval *sync_bvalp = NULL;
LDAPControl c[2];
LDAPControl **ctrls;
int err;
/* setup LDAP SYNC control */
sync_ber = ber_alloc_t( LBER_USE_DER );
ber_set_option( sync_ber, LBER_OPT_BER_MEMCTX, NULL );
if ( si->syncCookie ) {
ber_printf( sync_ber, "{eO}", abs(si->type), si->syncCookie );
} else {
ber_printf( sync_ber, "{e}", abs(si->type) );
}
if ( ber_flatten( sync_ber, &sync_bvalp ) == LBER_ERROR ) {
ber_free( sync_ber, 1 );
return LBER_ERROR;
}
ber_free( sync_ber, 1 );
ctrls = (LDAPControl**) sl_calloc( 3, sizeof(LDAPControl*), NULL );
c[0].ldctl_oid = LDAP_CONTROL_SYNC;
c[0].ldctl_value = (*sync_bvalp);
c[0].ldctl_iscritical = si->type < 0;
ctrls[0] = &c[0];
if ( si->authzId ) {
c[1].ldctl_oid = LDAP_CONTROL_PROXY_AUTHZ;
c[1].ldctl_value.bv_val = si->authzId;
c[1].ldctl_value.bv_len = strlen( si->authzId );
c[1].ldctl_iscritical = 1;
ctrls[1] = &c[1];
} else {
ctrls[1] = NULL;
}
ctrls[2] = NULL;
err = ldap_set_option( ld, LDAP_OPT_SERVER_CONTROLS, ctrls );
ber_bvfree( sync_bvalp );
ch_free( ctrls );
if ( err != LDAP_OPT_SUCCESS )
fprintf( stderr, "Could not set controls : %d\n", err );
rc = ldap_search_ext( ld, si->base, si->scope, si->filterstr,
si->attrs, si->attrsonly, sctrls, cctrls,
si->tlimit, si->slimit, msgidp );
return rc;
}
void *
do_syncrepl(
void *ctx,
void *arg )
{
struct re_s* rtask = arg;
Backend *be = rtask->arg;
syncinfo_t *si = ( syncinfo_t * ) be->syncinfo;
syncinfo_t *si = ( syncinfo_t * ) rtask->arg;
Backend *be = si->be;
SlapReply rs = {REP_RESULT};
@ -203,32 +261,17 @@ do_syncrepl(
/* Init connection to master */
if ( ldap_is_ldap_url( si->masteruri )) {
rc = ldap_initialize( &ld, si->masteruri );
if ( rc != LDAP_SUCCESS ) {
rc = ldap_initialize( &ld, si->masteruri );
if ( rc != LDAP_SUCCESS ) {
#ifdef NEW_LOGGING
LDAP_LOG( OPERATION, ERR, "do_syncrepl: "
"ldap_initialize failed (%s)\n",
si->masteruri, 0, 0 );
LDAP_LOG( OPERATION, ERR, "do_syncrepl: "
"ldap_initialize failed (%s)\n",
si->masteruri, 0, 0 );
#else
Debug( LDAP_DEBUG_ANY, "do_syncrepl: "
"ldap_initialize failed (%s)\n",
si->masteruri, 0, 0 );
Debug( LDAP_DEBUG_ANY, "do_syncrepl: "
"ldap_initialize failed (%s)\n",
si->masteruri, 0, 0 );
#endif
}
} else {
ld = ldap_init( si->mastername, si->masterport );
if ( ld == NULL ) {
#ifdef NEW_LOGGING
LDAP_LOG( OPERATION, ERR, "do_syncrepl: "
"ldap_init failed (%s:%d)\n",
si->mastername, si->masterport, 0 );
#else
Debug( LDAP_DEBUG_ANY, "do_syncrepl: "
"ldap_init failed (%s:%d)\n",
si->mastername, si->masterport, 0 );
#endif
}
}
op.o_protocol = LDAP_VERSION3;
@ -322,10 +365,21 @@ do_syncrepl(
}
}
/* set thread context in syncinfo */
si->ctx = ctx;
op.o_tmpmemctx = NULL; /* FIXME : to use per-thread mem context */
/* set memory context */
#if 0
#define SLAB_SIZE 1048576
memsiz = SLAB_SIZE;
memctx = sl_mem_create( memsiz, ctx );
op.o_tmpmemctx = memctx;
op.o_tmpmfuncs = &sl_mfuncs;
#else
op.o_tmpmemctx = NULL;
op.o_tmpmfuncs = &ch_mfuncs;
#endif
op.o_tag = LDAP_REQ_SEARCH;
op.o_dn = si->updatedn;
op.o_ndn = si->updatedn;
@ -384,49 +438,6 @@ do_syncrepl(
psub = be->be_nsuffix[0];
/* setup LDAP SYNC control */
sync_ber = ber_alloc_t( LBER_USE_DER );
ber_set_option( sync_ber, LBER_OPT_BER_MEMCTX, &op.o_tmpmemctx );
if ( si->syncCookie ) {
ber_printf( sync_ber, "{eO}", abs(si->type), si->syncCookie );
} else {
ber_printf( sync_ber, "{e}", abs(si->type) );
}
if ( ber_flatten( sync_ber, &sync_bvalp ) == LBER_ERROR ) {
ber_free( sync_ber, 1 );
return NULL;
}
ber_free( sync_ber, 1 );
sctrls = (LDAPControl**) sl_calloc( 3, sizeof(LDAPControl*), op.o_tmpmemctx );
c[0].ldctl_oid = LDAP_CONTROL_SYNC;
c[0].ldctl_value = (*sync_bvalp);
c[0].ldctl_iscritical = si->type < 0;
sctrls[0] = &c[0];
if ( si->authzId ) {
c[1].ldctl_oid = LDAP_CONTROL_PROXY_AUTHZ;
c[1].ldctl_value.bv_val = si->authzId;
c[1].ldctl_value.bv_len = strlen( si->authzId );
c[1].ldctl_iscritical = 1;
sctrls[1] = &c[1];
} else {
sctrls[1] = NULL;
}
sctrls[2] = NULL;
err = ldap_set_option( ld, LDAP_OPT_SERVER_CONTROLS, sctrls );
ber_bvfree( sync_bvalp );
ch_free( sctrls );
if ( err != LDAP_OPT_SUCCESS )
fprintf( stderr, "Could not set controls : %d\n", err );
/* Delete Attributes */
if ( si->lastmod == LASTMOD_REQ ) {
descs = del_descs_lastmod;
@ -470,16 +481,11 @@ do_syncrepl(
si->attrs[ n ] = NULL;
}
/* Send LDAP SYNC search */
rc = ldap_search_ext( ld, si->base, si->scope, si->filterstr,
si->attrs, si->attrsonly, NULL, NULL,
NULL, -1, &msgid );
if( rc != LDAP_SUCCESS ) {
fprintf( stderr, "syncrepl: ldap_search_ext: %s (%d)\n",
ldap_err2string( rc ), rc );
return NULL;
rc = ldap_sync_search( si, ld, NULL, NULL, &msgid );
if( rc != LDAP_SUCCESS ) {
fprintf( stderr, "syncrepl: ldap_search_ext: %s (%d)\n",
ldap_err2string( rc ), rc );
return NULL;
}
while (( rc = ldap_result( ld, LDAP_RES_ANY, LDAP_MSG_ONE, NULL, &res )) > 0 ) {
@ -490,12 +496,12 @@ do_syncrepl(
{
switch( ldap_msgtype( msg ) ) {
case LDAP_RES_SEARCH_ENTRY:
entry = syncrepl_message_to_entry( ld, &op, msg,
entry = syncrepl_message_to_entry( si, ld, &op, msg,
&modlist, &syncstate, &syncUUID, &syncCookie );
rc_efree = syncrepl_entry( ld, &op, entry, modlist,
rc_efree = syncrepl_entry( si, ld, &op, entry, modlist,
syncstate, &syncUUID, &syncCookie, !syncinfo_arrived );
if ( syncCookie.bv_len ) {
syncrepl_updateCookie( ld, &op, &psub, &syncCookie );
syncrepl_updateCookie( si, ld, &op, &psub, &syncCookie );
}
if ( rc_efree )
entry_free( entry );
@ -533,8 +539,7 @@ do_syncrepl(
if (si->type == LDAP_SYNC_REFRESH_AND_PERSIST) {
if ( cancel_response ) {
if ( syncCookie.bv_len ) {
ber_bvfree( si->syncCookie );
si->syncCookie = ber_dupbv( NULL, &syncCookie );
syncrepl_updateCookie( si, ld, &op, &psub, &syncCookie );
}
if ( ctrl_ber )
ber_free( ctrl_ber, 1 );
@ -547,9 +552,9 @@ do_syncrepl(
}
} else {
if ( syncCookie.bv_len ) {
syncrepl_updateCookie( ld, &op, &psub, &syncCookie );
syncrepl_updateCookie( si, ld, &op, &psub, &syncCookie);
}
syncrepl_del_nonpresent( ld, &op );
syncrepl_del_nonpresent( si, ld, &op );
if ( ctrl_ber )
ber_free( ctrl_ber, 1 );
goto done;
@ -565,7 +570,7 @@ do_syncrepl(
ber_scanf( res_ber, "{e" /*"}"*/, &syncstate );
if ( syncstate == LDAP_SYNC_REFRESH_DONE ) {
syncrepl_del_nonpresent( ld, &op );
syncrepl_del_nonpresent( si, ld, &op );
} else if ( syncstate != LDAP_SYNC_NEW_COOKIE ) {
#ifdef NEW_LOGGING
LDAP_LOG( OPERATION, ERR,
@ -580,8 +585,7 @@ do_syncrepl(
== LDAP_SYNC_TAG_COOKIE ) {
ber_scanf( res_ber, /*"{"*/ "o}", &syncCookie );
if ( syncCookie.bv_len ) {
ber_bvfree( si->syncCookie );
si->syncCookie = ber_dupbv( NULL, &syncCookie );
syncrepl_updateCookie( si, ld, &op, &psub, &syncCookie);
}
} else {
if ( syncstate == LDAP_SYNC_NEW_COOKIE ) {
@ -661,8 +665,9 @@ done:
return NULL;
}
static Entry*
Entry*
syncrepl_message_to_entry(
syncinfo_t *si,
LDAP *ld,
Operation *op,
LDAPMessage *msg,
@ -693,8 +698,6 @@ syncrepl_message_to_entry(
int rc;
char *a;
syncinfo_t *si = ( syncinfo_t * ) be->syncinfo;
ber_len_t len;
LDAPControl* rctrlp;
LDAPControl** rctrls = NULL;
@ -822,7 +825,8 @@ syncrepl_message_to_entry(
#endif
}
rc = slap_mods_check_syncrepl( op, modlist, &text, txtbuf, textlen, NULL );
rc = slap_mods_check_syncrepl( si, op, modlist,
&text, txtbuf, textlen, NULL );
if ( rc != LDAP_SUCCESS ) {
#ifdef NEW_LOGGING
@ -835,8 +839,8 @@ syncrepl_message_to_entry(
return NULL;
}
rc = slap_mods_opattrs_syncrepl( op, *modlist, modtail,
&text,txtbuf, textlen );
rc = slap_mods_opattrs_syncrepl( si, op, *modlist, modtail,
&text,txtbuf, textlen );
if( rc != LDAP_SUCCESS ) {
#ifdef NEW_LOGGING
@ -849,7 +853,7 @@ syncrepl_message_to_entry(
return NULL;
}
rc = slap_mods2entry_syncrepl( *modlist, &e, 1, &text, txtbuf, textlen );
rc = slap_mods2entry_syncrepl( si, *modlist, &e, 1, &text, txtbuf, textlen);
if( rc != LDAP_SUCCESS ) {
#ifdef NEW_LOGGING
LDAP_LOG( OPERATION, ERR,
@ -877,8 +881,9 @@ syncuuid_cmp( const void* v_uuid1, const void* v_uuid2 )
return ( strcmp( uuid1->bv_val, uuid2->bv_val ) );
}
static int
int
syncrepl_entry(
syncinfo_t* si,
LDAP *ld,
Operation *op,
Entry* e,
@ -890,7 +895,6 @@ syncrepl_entry(
)
{
Backend *be = op->o_bd;
syncinfo_t *si = ( syncinfo_t * ) be->syncinfo;
slap_callback cb;
struct berval csn_bv = {0, NULL};
struct berval *syncuuid_bv = NULL;
@ -953,6 +957,7 @@ syncrepl_entry(
ch_free( op->ors_filterstr.bv_val );
cb.sc_response = null_callback;
cb.sc_private = si;
rc = LDAP_SUCCESS;
@ -990,7 +995,7 @@ syncrepl_entry(
rc = be->be_modify( op, &rs );
} else if ( rc == LDAP_REFERRAL ||
rc == LDAP_NO_SUCH_OBJECT ) {
syncrepl_add_glue(ld, op, e,
syncrepl_add_glue( si, ld, op, e,
modlist, syncstate,
syncUUID, syncCookie);
} else {
@ -1038,12 +1043,12 @@ syncrepl_entry(
static int
syncrepl_del_nonpresent(
syncinfo_t *si,
LDAP *ld,
Operation *op
)
{
Backend* be = op->o_bd;
syncinfo_t *si = ( syncinfo_t * ) be->syncinfo;
slap_callback cb;
struct berval base_bv = {0, NULL};
Filter *filter;
@ -1105,6 +1110,7 @@ syncrepl_del_nonpresent(
static void
syncrepl_add_glue(
syncinfo_t *si,
LDAP *ld,
Operation* op,
Entry *e,
@ -1115,7 +1121,6 @@ syncrepl_add_glue(
)
{
Backend *be = op->o_bd;
syncinfo_t *si = op->o_callback->sc_private;
struct berval uuid_bv = {0, NULL};
slap_callback cb;
Attribute *a;
@ -1212,8 +1217,9 @@ syncrepl_add_glue(
return;
}
static void
void
syncrepl_updateCookie(
syncinfo_t *si,
LDAP *ld,
Operation *op,
struct berval *pdn,
@ -1221,7 +1227,6 @@ syncrepl_updateCookie(
)
{
Backend *be = op->o_bd;
syncinfo_t *si = ( syncinfo_t * ) be->syncinfo;
Modifications *ml;
Modifications *mlnext;
Modifications *mod;
@ -1319,7 +1324,8 @@ syncrepl_updateCookie(
*modtail = mod;
modtail = &mod->sml_next;
rc = slap_mods_check_syncrepl( op, &modlist, &text, txtbuf, textlen, NULL );
rc = slap_mods_check_syncrepl( si, op, &modlist,
&text, txtbuf, textlen, NULL );
if ( rc != LDAP_SUCCESS ) {
#ifdef NEW_LOGGING
@ -1332,7 +1338,8 @@ syncrepl_updateCookie(
}
op->o_tag = LDAP_REQ_ADD;
rc = slap_mods_opattrs_syncrepl( op, modlist, modtail, &text,txtbuf, textlen );
rc = slap_mods_opattrs_syncrepl( si, op, modlist, modtail,
&text,txtbuf, textlen );
if( rc != LDAP_SUCCESS ) {
#ifdef NEW_LOGGING
@ -1353,7 +1360,7 @@ syncrepl_updateCookie(
e->e_attrs = NULL;
rc = slap_mods2entry_syncrepl( modlist, &e, 1, &text, txtbuf, textlen );
rc = slap_mods2entry_syncrepl( si, modlist, &e, 1, &text, txtbuf, textlen );
if( rc != LDAP_SUCCESS ) {
#ifdef NEW_LOGGING
@ -1438,6 +1445,7 @@ done :
static
int slap_mods_check_syncrepl(
syncinfo_t* si,
Operation *op,
Modifications **mlp,
const char **text,
@ -1447,7 +1455,6 @@ int slap_mods_check_syncrepl(
{
int rc;
Backend *be = op->o_bd;
syncinfo_t *si = ( syncinfo_t * ) be->syncinfo;
AttributeDescription** descs;
int i;
Modifications *prevml = NULL;
@ -1629,6 +1636,7 @@ int slap_mods_check_syncrepl(
static
int slap_mods_opattrs_syncrepl(
syncinfo_t *si,
Operation *op,
Modifications *mods,
Modifications **modtail,
@ -1643,7 +1651,6 @@ int slap_mods_opattrs_syncrepl(
char csnbuf[ LDAP_LUTIL_CSNSTR_BUFSIZE ];
Modifications *mod;
Backend *be = op->o_bd;
syncinfo_t *si = ( syncinfo_t * ) be->syncinfo;
int mop = LDAP_MOD_REPLACE;
@ -1788,6 +1795,7 @@ int slap_mods_opattrs_syncrepl(
static
int slap_mods2entry_syncrepl(
syncinfo_t *si,
Modifications *mods,
Entry **e,
int repl_user,
@ -2005,6 +2013,8 @@ null_callback(
SlapReply* rs
)
{
syncinfo_t *si = op->o_callback->sc_private;
if ( rs->sr_err != LDAP_SUCCESS &&
rs->sr_err != LDAP_REFERRAL &&
rs->sr_err != LDAP_ALREADY_EXISTS &&