More search optimizations

This commit is contained in:
Howard Chu 2011-09-06 02:21:19 -07:00
parent 1e32fcf099
commit 6c8e4f2671
5 changed files with 139 additions and 76 deletions

View File

@ -163,6 +163,19 @@ typedef struct IndexRec {
AttrList *attrs;
} IndexRec;
#define MAXRDNS SLAP_LDAPDN_MAXLEN/4
typedef struct IdScopes {
MDB_txn *mt;
MDB_cursor *mc;
ID id;
ID *scopes;
int numrdns;
int nscope;
struct berval rdns[MAXRDNS];
struct berval nrdns[MAXRDNS];
} IdScopes;
#include "proto-mdb.h"
#endif /* _BACK_MDB_H_ */

View File

@ -719,40 +719,50 @@ mdb_idscope(
int
mdb_idscopes(
Operation *op,
MDB_txn *txn,
MDB_cursor **cursp,
ID base,
ID *scopes )
IdScopes *isc )
{
struct mdb_info *mdb = (struct mdb_info *) op->o_bd->be_private;
MDB_dbi dbi = mdb->mi_dn2id;
MDB_val key, data;
MDB_cursor *cursor;
ID id;
char *ptr;
int rc;
unsigned int x;
unsigned int nrlen, rlen;
diskNode *d;
key.mv_size = sizeof(ID);
if ( !*cursp ) {
rc = mdb_cursor_open( txn, dbi, cursp );
if ( !isc->mc ) {
rc = mdb_cursor_open( isc->mt, dbi, &isc->mc );
if ( rc ) return rc;
}
cursor = *cursp;
id = base;
id = isc->id;
while (id) {
key.mv_data = &id;
rc = mdb_cursor_get( cursor, &key, &data, MDB_SET );
rc = mdb_cursor_get( isc->mc, &key, &data, MDB_SET );
if ( rc )
break;
/* save RDN info */
d = data.mv_data;
nrlen = (d->nrdnlen[0] << 8) | d->nrdnlen[1];
rlen = data.mv_size - sizeof(diskNode) - nrlen;
isc->nrdns[isc->numrdns].bv_len = nrlen;
isc->nrdns[isc->numrdns].bv_val = d->nrdn;
isc->rdns[isc->numrdns].bv_len = rlen;
isc->rdns[isc->numrdns].bv_val = d->nrdn+nrlen+1;
isc->numrdns++;
ptr = data.mv_data;
ptr += data.mv_size - sizeof(ID);
memcpy( &id, ptr, sizeof(ID) );
x = mdb_idl_search( scopes, id );
if ( scopes[x] == id )
x = mdb_idl_search( isc->scopes, id );
if ( isc->scopes[x] == id ) {
isc->nscope = x;
return MDB_SUCCESS;
}
if ( op->ors_scope == LDAP_SCOPE_ONELEVEL )
break;
}

View File

@ -476,6 +476,7 @@ static int mdb_entry_partsize(struct mdb_info *mdb, MDB_txn *txn, Entry *e,
len += entry_lenlen(0); /* 0 nvals */
}
}
len += entry_lenlen(e->e_ocflags);
len += entry_lenlen(nat);
len += entry_lenlen(nval);
eh->bv.bv_len = len;
@ -502,6 +503,9 @@ static int mdb_entry_encode(Operation *op, MDB_txn *txn, Entry *e, MDB_val *data
Debug( LDAP_DEBUG_TRACE, "=> mdb_entry_encode(0x%08lx): %s\n",
(long) e->e_id, e->e_dn, 0 );
if (is_entry_referral(e))
; /* empty */
rc = mdb_entry_partsize( mdb, txn, e, &eh );
data->mv_size = eh.bv.bv_len;
@ -509,6 +513,7 @@ static int mdb_entry_encode(Operation *op, MDB_txn *txn, Entry *e, MDB_val *data
ptr = (unsigned char *)data->mv_data;
mdb_entry_putlen(&ptr, eh.nattrs);
mdb_entry_putlen(&ptr, eh.nvals);
mdb_entry_putlen(&ptr, e->e_ocflags);
for (a=e->e_attrs; a; a=a->a_next) {
mdb_entry_putlen(&ptr, mdb->mi_adxs[a->a_desc->ad_index]);
@ -570,6 +575,7 @@ int mdb_entry_decode(Operation *op, MDB_val *data, Entry **e)
nattrs = mdb_entry_getlen(&ptr);
nvals = mdb_entry_getlen(&ptr);
x = entry_alloc();
x->e_ocflags = mdb_entry_getlen(&ptr);
x->e_attrs = attrs_alloc( nattrs );
x->e_bv.bv_len = nvals * sizeof(struct berval);
x->e_bv.bv_val = op->o_tmpalloc(x->e_bv.bv_len, op->o_tmpmemctx);

View File

@ -129,10 +129,7 @@ int mdb_idscope(
int mdb_idscopes(
Operation *op,
MDB_txn *txn,
MDB_cursor **cursp,
ID base,
ID *scopes );
IdScopes *isc );
MDB_cmp_func mdb_dup_compare;

View File

@ -284,18 +284,17 @@ mdb_search( Operation *op, SlapReply *rs )
ID lastid = NOID;
ID candidates[MDB_IDL_UM_SIZE];
ID scopes[MDB_IDL_DB_SIZE];
Entry *e = NULL, base;
Entry *e = NULL, *base = NULL;
Entry *matched = NULL;
AttributeName *attrs;
struct berval realbase = BER_BVNULL;
slap_mask_t mask;
time_t stoptime;
int manageDSAit;
int tentries = 0;
IdScopes isc;
mdb_op_info opinfo = {0}, *moi = &opinfo;
MDB_txn *ltid = NULL;
MDB_cursor *idcursor = NULL;
Debug( LDAP_DEBUG_TRACE, "=> " LDAP_XSTRING(mdb_search) "\n", 0, 0, 0);
attrs = op->oq_search.rs_attrs;
@ -312,6 +311,8 @@ mdb_search( Operation *op, SlapReply *rs )
}
ltid = moi->moi_txn;
isc.mt = ltid;
isc.mc = NULL;
if ( op->ors_deref & LDAP_DEREF_FINDING ) {
MDB_IDL_ZERO(candidates);
@ -474,28 +475,19 @@ dn2entry_retry:
/* compute it anyway; root does not use it */
stoptime = op->o_time + op->ors_tlimit;
/* need normalized dn below */
ber_dupbv( &realbase, &e->e_nname );
base = e;
/* Copy info to base, must free entry before accessing the database
* in search_candidates, to avoid deadlocks.
*/
base.e_private = e->e_private;
base.e_nname = realbase;
base.e_id = e->e_id;
mdb_entry_return(e);
e = NULL;
/* select candidates */
if ( op->oq_search.rs_scope == LDAP_SCOPE_BASE ) {
rs->sr_err = base_candidate( op->o_bd, &base, candidates );
rs->sr_err = base_candidate( op->o_bd, base, candidates );
} else {
MDB_IDL_ZERO( candidates );
MDB_IDL_ZERO( scopes );
mdb_idl_insert( scopes, base.e_id );
rs->sr_err = search_candidates( op, rs, &base,
mdb_idl_insert( scopes, base->e_id );
rs->sr_err = search_candidates( op, rs, base,
ltid, candidates, scopes );
}
@ -560,6 +552,8 @@ dn2entry_retry:
goto loop_begin;
}
isc.scopes = scopes;
for ( id = mdb_idl_first( candidates, &cursor );
id != NOID ; id = mdb_idl_next( candidates, &cursor ) )
{
@ -594,43 +588,48 @@ loop_begin:
goto done;
}
/* get the entry */
rs->sr_err = mdb_id2entry( op, ltid, id, &e );
if ( id == base->e_id ) {
e = base;
} else {
if (rs->sr_err == LDAP_BUSY) {
rs->sr_text = "ldap server busy";
send_ldap_result( op, rs );
goto done;
/* get the entry */
rs->sr_err = mdb_id2entry( op, ltid, id, &e );
} else if ( rs->sr_err == LDAP_OTHER ) {
rs->sr_text = "internal error";
send_ldap_result( op, rs );
goto done;
}
if (rs->sr_err == LDAP_BUSY) {
rs->sr_text = "ldap server busy";
send_ldap_result( op, rs );
goto done;
if ( e == NULL ) {
if( !MDB_IDL_IS_RANGE(candidates) ) {
/* only complain for non-range IDLs */
Debug( LDAP_DEBUG_TRACE,
LDAP_XSTRING(mdb_search)
": candidate %ld not found\n",
(long) id, 0, 0 );
} else {
/* get the next ID from the DB */
rs->sr_err = mdb_get_nextid( mdb, ltid, &cursor );
if ( rs->sr_err == MDB_NOTFOUND ) {
break;
}
if ( rs->sr_err ) {
rs->sr_err = LDAP_OTHER;
rs->sr_text = "internal error in get_nextid";
send_ldap_result( op, rs );
goto done;
}
cursor--;
} else if ( rs->sr_err == LDAP_OTHER ) {
rs->sr_text = "internal error";
send_ldap_result( op, rs );
goto done;
}
goto loop_continue;
if ( e == NULL ) {
if( !MDB_IDL_IS_RANGE(candidates) ) {
/* only complain for non-range IDLs */
Debug( LDAP_DEBUG_TRACE,
LDAP_XSTRING(mdb_search)
": candidate %ld not found\n",
(long) id, 0, 0 );
} else {
/* get the next ID from the DB */
rs->sr_err = mdb_get_nextid( mdb, ltid, &cursor );
if ( rs->sr_err == MDB_NOTFOUND ) {
break;
}
if ( rs->sr_err ) {
rs->sr_err = LDAP_OTHER;
rs->sr_text = "internal error in get_nextid";
send_ldap_result( op, rs );
goto done;
}
cursor--;
}
goto loop_continue;
}
}
if ( is_entry_subentry( e ) ) {
@ -655,25 +654,27 @@ loop_begin:
/* Does this candidate actually satisfy the search scope?
*/
scopeok = 0;
isc.numrdns = 0;
switch( op->ors_scope ) {
case LDAP_SCOPE_BASE:
/* This is always true, yes? */
if ( id == base.e_id ) scopeok = 1;
if ( id == base->e_id ) scopeok = 1;
break;
#ifdef LDAP_SCOPE_CHILDREN
case LDAP_SCOPE_CHILDREN:
if ( id == base.e_id ) break;
if ( id == base->e_id ) break;
/* Fall-thru */
#endif
case LDAP_SCOPE_SUBTREE:
if ( id == base.e_id ) {
if ( id == base->e_id ) {
scopeok = 1;
break;
}
/* Fall-thru */
case LDAP_SCOPE_ONELEVEL:
if ( mdb_idscopes( op, ltid, &idcursor, id, scopes ) == MDB_SUCCESS ) scopeok = 1;
isc.id = id;
if ( mdb_idscopes( op, &isc ) == MDB_SUCCESS ) scopeok = 1;
break;
}
@ -704,8 +705,42 @@ loop_begin:
goto loop_continue;
}
mdb_id2name( op, ltid, &idcursor, e->e_id,
&e->e_name, &e->e_nname );
if (e != base) {
struct berval pdn, pndn;
char *d, *n;
int i;
/* child of base, just append RDNs to base->e_name */
if ( isc.nscope == 1 ) {
pdn = base->e_name;
pndn = base->e_nname;
} else {
mdb_id2name( op, ltid, &isc.mc, scopes[isc.nscope], &pdn, &pndn );
}
e->e_name.bv_len = pdn.bv_len;
e->e_nname.bv_len = pndn.bv_len;
for (i=0; i<isc.numrdns; i++) {
e->e_name.bv_len += isc.rdns[i].bv_len + 1;
e->e_nname.bv_len += isc.nrdns[i].bv_len + 1;
}
e->e_name.bv_val = op->o_tmpalloc(e->e_name.bv_len + 1, op->o_tmpmemctx);
e->e_nname.bv_val = op->o_tmpalloc(e->e_nname.bv_len + 1, op->o_tmpmemctx);
d = e->e_name.bv_val;
n = e->e_nname.bv_val;
for (i=0; i<isc.numrdns; i++) {
memcpy(d, isc.rdns[i].bv_val, isc.rdns[i].bv_len);
d += isc.rdns[i].bv_len;
*d++ = ',';
memcpy(n, isc.nrdns[i].bv_val, isc.nrdns[i].bv_len);
n += isc.nrdns[i].bv_len;
*n++ = ',';
}
memcpy(d, pdn.bv_val, pdn.bv_len+1);
memcpy(n, pndn.bv_val, pndn.bv_len+1);
if (isc.nscope != 1) {
op->o_tmpfree(pndn.bv_val, op->o_tmpmemctx);
op->o_tmpfree(pdn.bv_val, op->o_tmpmemctx);
}
}
/*
* if it's a referral, add it to the list of referrals. only do
@ -763,7 +798,8 @@ loop_begin:
rs->sr_err = send_search_entry( op, rs );
rs->sr_attrs = NULL;
rs->sr_entry = NULL;
mdb_entry_return( e );
if (e != base)
mdb_entry_return( e );
e = NULL;
switch ( rs->sr_err ) {
@ -794,8 +830,8 @@ loop_begin:
loop_continue:
if( e != NULL ) {
/* free reader lock */
mdb_entry_return( e );
if ( e != base )
mdb_entry_return( e );
RS_ASSERT( rs->sr_entry == NULL );
e = NULL;
rs->sr_entry = NULL;
@ -820,13 +856,14 @@ done:
mdb_txn_reset( moi->moi_txn );
LDAP_SLIST_REMOVE( &op->o_extra, &moi->moi_oe, OpExtra, oe_next );
}
if( idcursor )
mdb_cursor_close( idcursor );
if( isc.mc )
mdb_cursor_close( isc.mc );
if( rs->sr_v2ref ) {
ber_bvarray_free( rs->sr_v2ref );
rs->sr_v2ref = NULL;
}
if( realbase.bv_val ) ch_free( realbase.bv_val );
if (base)
mdb_entry_return(base);
return rs->sr_err;
}