back-hdb caching updates, use EntryInfo cache for hdb_dn2idl.

This commit is contained in:
Howard Chu 2003-09-22 08:37:32 +00:00
parent 8ba0c3a3aa
commit 0342904699
7 changed files with 186 additions and 75 deletions

View File

@ -94,7 +94,9 @@ typedef struct bdb_entry_info {
struct berval bei_nrdn;
#ifdef BDB_HIER
struct berval bei_rdn;
int bei_modrdns;
int bei_modrdns; /* track renames */
int bei_ckids; /* number of kids cached */
int bei_dkids; /* number of kids on-disk, plus 1 */
#endif
Entry *bei_e;
Avlnode *bei_kids;

View File

@ -193,8 +193,7 @@ static int
bdb_entryinfo_add_internal(
struct bdb_info *bdb,
EntryInfo *ei,
EntryInfo **res,
u_int32_t locker
EntryInfo **res
)
{
EntryInfo *ei2 = NULL;
@ -228,6 +227,9 @@ bdb_entryinfo_add_internal(
ber_dupbv( &ei2->bei_nrdn, &ei->bei_nrdn );
avl_insert( &ei->bei_parent->bei_kids, ei2, bdb_rdn_cmp,
avl_dup_error );
#ifdef BDB_HIER
ei->bei_parent->bei_ckids++;
#endif
}
*res = ei2;
@ -245,8 +247,7 @@ bdb_cache_find_ndn(
Operation *op,
DB_TXN *txn,
struct berval *ndn,
EntryInfo **res,
u_int32_t locker
EntryInfo **res
)
{
struct bdb_info *bdb = (struct bdb_info *) op->o_bd->be_private;
@ -286,8 +287,7 @@ bdb_cache_find_ndn(
/* DN exists but needs to be added to cache */
ei.bei_nrdn.bv_len = len;
rc = bdb_entryinfo_add_internal( bdb, &ei, &ei2,
locker );
rc = bdb_entryinfo_add_internal( bdb, &ei, &ei2 );
/* add_internal left eip and c_rwlock locked */
ldap_pvt_thread_rdwr_wunlock( &bdb->bi_cache.c_rwlock );
if ( rc ) {
@ -429,6 +429,29 @@ hdb_cache_find_parent(
}
return rc;
}
int hdb_entryinfo_load(
struct bdb_info *bdb,
EntryInfo *ei,
EntryInfo **res )
{
EntryInfo *ei2;
int rc;
bdb_cache_entryinfo_lock( ei->bei_parent );
ei2 = (EntryInfo *)avl_find( ei->bei_parent->bei_kids, ei, bdb_rdn_cmp );
bdb_cache_entryinfo_unlock( ei->bei_parent );
if ( !ei2 ) {
rc = bdb_entryinfo_add_internal( bdb, ei, res );
bdb_cache_entryinfo_unlock( ei->bei_parent );
ldap_pvt_thread_rdwr_wunlock( &bdb->bi_cache.c_rwlock );
} else {
*res = ei2;
free( ei->bei_rdn.bv_val );
return 0;
}
return rc;
}
#endif
/* caller must have lru_mutex locked. mutex
@ -548,7 +571,7 @@ again: ldap_pvt_thread_rdwr_rlock( &bdb->bi_cache.c_rwlock );
rc = bdb_id2entry( op->o_bd, tid, id, &ep );
if ( rc == 0 ) {
rc = bdb_cache_find_ndn( op, tid,
&ep->e_nname, eip, locker );
&ep->e_nname, eip );
if ( *eip )
islocked = 1;
if ( rc ) {
@ -676,8 +699,9 @@ bdb_cache_add(
rdn.bv_len = ptr - rdn.bv_val;
}
ber_dupbv( &ei.bei_rdn, &rdn );
if ( eip->bei_dkids ) eip->bei_dkids++;
#endif
rc = bdb_entryinfo_add_internal( bdb, &ei, &new, locker );
rc = bdb_entryinfo_add_internal( bdb, &ei, &new );
new->bei_e = e;
e->e_private = new;
new->bei_state = CACHE_ENTRY_NO_KIDS;
@ -871,6 +895,10 @@ bdb_cache_delete_internal(
{
int rc = 0; /* return code */
#ifdef BDB_HIER
e->bei_parent->bei_ckids--;
if ( e->bei_parent->bei_dkids ) e->bei_parent->bei_dkids--;
#endif
/* dn tree */
if ( avl_delete( &e->bei_parent->bei_kids, (caddr_t) e, bdb_rdn_cmp ) == NULL )
{

View File

@ -40,7 +40,7 @@ bdb_dn2entry(
*e = NULL;
rc = bdb_cache_find_ndn( op, tid, dn, &ei, locker );
rc = bdb_cache_find_ndn( op, tid, dn, &ei );
if ( rc ) {
if ( matched && rc == DB_NOTFOUND ) {
/* Set the return value, whether we have its entry

View File

@ -705,6 +705,7 @@ hdb_dn2id_delete(
return rc;
}
int
hdb_dn2id(
Operation *op,
@ -745,20 +746,44 @@ hdb_dn2id(
data.data = d;
rc = cursor->c_get( cursor, &key, &data, DB_GET_BOTH );
cursor->c_close( cursor );
if ( rc == 0 ) {
AC_MEMCPY( &ei->bei_id, &d->entryID, sizeof(ID) );
ei->bei_id = d->entryID;
ei->bei_rdn.bv_len = data.size - sizeof(diskNode) - nrlen;
ptr = d->nrdn + nrlen + 1;
ei->bei_rdn.bv_val = ch_malloc( ei->bei_rdn.bv_len + 1 );
strcpy( ei->bei_rdn.bv_val, ptr );
ber_str2bv( ptr, ei->bei_rdn.bv_len, 1, &ei->bei_rdn );
if ( !ei->bei_parent->bei_dkids ) {
db_recno_t dkids;
/* How many children does the parent have? */
/* FIXME: do we need to lock the parent
* entryinfo? Seems safe...
*/
cursor->c_count( cursor, &dkids, 0 );
ei->bei_parent->bei_dkids = dkids;
}
}
cursor->c_close( cursor );
op->o_tmpfree( d, op->o_tmpmemctx );
return rc;
}
static void
hdb_dn2ei(
char *buf,
int len,
EntryInfo *ei )
{
diskNode *d = (diskNode *)buf;
char *ptr;
AC_MEMCPY( &ei->bei_id, &d->entryID, sizeof(ID) );
AC_MEMCPY( &ei->bei_nrdn.bv_len, &d->nrdnlen, sizeof(d->nrdnlen) );
ber_str2bv( d->nrdn, ei->bei_nrdn.bv_len, 0, &ei->bei_nrdn );
ei->bei_rdn.bv_len = len - sizeof(diskNode) - ei->bei_nrdn.bv_len;
ptr = d->nrdn + ei->bei_nrdn.bv_len + 1;
ber_str2bv( ptr, ei->bei_rdn.bv_len, 1, &ei->bei_rdn );
}
int
hdb_dn2id_parent(
Operation *op,
@ -792,19 +817,24 @@ hdb_dn2id_parent(
data.data = d;
rc = cursor->c_get( cursor, &key, &data, DB_SET );
cursor->c_close( cursor );
if ( rc == 0 ) {
if (d->nrdnlen >= 0) {
return LDAP_OTHER;
rc = LDAP_OTHER;
} else {
db_recno_t dkids;
*idp = d->entryID;
ei->bei_nrdn.bv_len = 0 - d->nrdnlen;
ber_str2bv( d->nrdn, ei->bei_nrdn.bv_len, 1, &ei->bei_nrdn );
ei->bei_rdn.bv_len = data.size - sizeof(diskNode) -
ei->bei_nrdn.bv_len;
ptr = d->nrdn + ei->bei_nrdn.bv_len + 1;
ber_str2bv( ptr, ei->bei_rdn.bv_len, 1, &ei->bei_rdn );
/* How many children does this node have? */
cursor->c_count( cursor, &dkids, 0 );
ei->bei_dkids = dkids;
}
AC_MEMCPY( idp, &d->entryID, sizeof(ID) );
ei->bei_nrdn.bv_len = 0 - d->nrdnlen;
ber_str2bv( d->nrdn, ei->bei_nrdn.bv_len, 1, &ei->bei_nrdn );
ei->bei_rdn.bv_len = data.size - sizeof(diskNode) -
ei->bei_nrdn.bv_len;
ptr = d->nrdn + ei->bei_nrdn.bv_len + 1;
ber_str2bv( ptr, ei->bei_rdn.bv_len, 1, &ei->bei_rdn );
}
cursor->c_close( cursor );
op->o_tmpfree( d, op->o_tmpmemctx );
return rc;
}
@ -847,7 +877,12 @@ hdb_dn2id_children(
rc = cursor->c_get( cursor, &key, &data, DB_SET );
if ( rc == 0 ) {
rc = cursor->c_get( cursor, &key, &data, DB_NEXT_DUP );
db_recno_t dkids;
rc = cursor->c_count( cursor, &dkids, 0 );
if ( rc == 0 ) {
BEI(e)->bei_dkids = dkids;
if ( dkids < 2 ) rc = DB_NOTFOUND;
}
}
cursor->c_close( cursor );
return rc;
@ -868,6 +903,7 @@ struct dn2id_cookie {
DB *db;
int prefix;
int rc;
EntryInfo *ei;
ID id;
ID dbuf;
ID *ids;
@ -880,11 +916,32 @@ struct dn2id_cookie {
Operation *op;
};
struct apply_arg {
ID *idl;
EntryInfo **ei;
};
static int
apply_func(
void *data,
void *arg )
{
EntryInfo *ei = data;
struct apply_arg *ap = arg;
bdb_idl_insert( ap->idl, ei->bei_id );
if ( ap->ei ) {
*(ap->ei)++ = ei;
}
return 0;
}
static int
hdb_dn2idl_internal(
struct dn2id_cookie *cx
)
{
EntryInfo **eilist = NULL, **ptr;
#ifdef SLAP_IDL_CACHE
if ( cx->bdb->bi_idl_cache_size ) {
cx->rc = bdb_idl_cache_get(cx->bdb, cx->db, &cx->key, cx->tmp);
@ -896,41 +953,75 @@ hdb_dn2idl_internal(
}
}
#endif
cx->rc = cx->db->cursor( cx->db, NULL, &cx->dbc,
cx->bdb->bi_db_opflags );
if ( cx->rc ) return cx->rc;
BDB_IDL_ZERO( cx->tmp );
cx->data.data = &cx->dbuf;
cx->data.ulen = sizeof(ID);
cx->data.dlen = sizeof(ID);
cx->data.flags = DB_DBT_USERMEM | DB_DBT_PARTIAL;
if ( cx->ei->bei_ckids+1 != cx->ei->bei_dkids ) {
EntryInfo ei;
ei.bei_parent = cx->ei;
/* The first item holds the parent ID. Ignore it. */
cx->rc = cx->dbc->c_get( cx->dbc, &cx->key, &cx->data, DB_SET );
if ( cx->rc == DB_NOTFOUND ) goto saveit;
if ( cx->rc ) return cx->rc;
cx->rc = cx->db->cursor( cx->db, NULL, &cx->dbc,
cx->bdb->bi_db_opflags );
if ( cx->rc ) return cx->rc;
cx->data.data = cx->buf;
cx->data.ulen = BDB_IDL_UM_SIZE * sizeof(ID);
cx->data.flags = DB_DBT_USERMEM;
cx->data.data = &cx->dbuf;
cx->data.ulen = sizeof(ID);
cx->data.dlen = sizeof(ID);
cx->data.flags = DB_DBT_USERMEM | DB_DBT_PARTIAL;
/* Fetch the rest of the IDs in a loop... */
while ( (cx->rc = cx->dbc->c_get( cx->dbc, &cx->key, &cx->data,
DB_MULTIPLE | DB_NEXT_DUP )) == 0 ) {
u_int8_t *j;
size_t len;
DB_MULTIPLE_INIT( cx->ptr, &cx->data );
while (cx->ptr) {
DB_MULTIPLE_NEXT( cx->ptr, &cx->data, j, len );
if (j) {
AC_MEMCPY( &cx->dbuf, j, sizeof(ID) );
bdb_idl_insert( cx->tmp, cx->dbuf );
/* The first item holds the parent ID. Ignore it. */
cx->rc = cx->dbc->c_get( cx->dbc, &cx->key, &cx->data, DB_SET );
if ( cx->rc == DB_NOTFOUND ) goto saveit;
if ( cx->rc ) return cx->rc;
if ( !cx->ei->bei_dkids ) {
db_recno_t dkids;
cx->dbc->c_count( cx->dbc, &dkids, 0 );
cx->ei->bei_dkids = dkids;
}
if ( cx->prefix == DN_SUBTREE_PREFIX && cx->ei->bei_dkids > 1 ) {
eilist = cx->op->o_tmpalloc( sizeof(EntryInfo *) * cx->ei->bei_dkids, cx->op->o_tmpmemctx );
eilist[cx->ei->bei_dkids-1] = NULL;
ptr = eilist;
}
cx->data.data = cx->buf;
cx->data.ulen = BDB_IDL_UM_SIZE * sizeof(ID);
cx->data.flags = DB_DBT_USERMEM;
/* Fetch the rest of the IDs in a loop... */
while ( (cx->rc = cx->dbc->c_get( cx->dbc, &cx->key, &cx->data,
DB_MULTIPLE | DB_NEXT_DUP )) == 0 ) {
u_int8_t *j;
size_t len;
DB_MULTIPLE_INIT( cx->ptr, &cx->data );
while (cx->ptr) {
DB_MULTIPLE_NEXT( cx->ptr, &cx->data, j, len );
if (j) {
EntryInfo *ei2;
hdb_dn2ei( j, len, &ei );
bdb_idl_insert( cx->tmp, ei.bei_id );
hdb_entryinfo_load( cx->bdb, &ei, &ei2 );
if ( eilist )
*ptr++ = ei2;
}
}
}
cx->dbc->c_close( cx->dbc );
} else {
if ( cx->ei->bei_ckids > 0 ) {
struct apply_arg ap;
if ( cx->prefix == DN_SUBTREE_PREFIX ) {
eilist = cx->op->o_tmpalloc( sizeof(EntryInfo *) * cx->ei->bei_dkids, cx->op->o_tmpmemctx );
eilist[cx->ei->bei_dkids-1] = NULL;
}
ap.idl = cx->tmp;
ap.ei = eilist;
bdb_cache_entryinfo_lock( cx->ei );
avl_apply( cx->ei->bei_kids, apply_func, &ap, -1, AVL_POSTORDER );
bdb_cache_entryinfo_unlock( cx->ei );
}
}
cx->dbc->c_close( cx->dbc );
/* If we got some records, treat as success */
if (!BDB_IDL_IS_ZERO(cx->tmp)) {
@ -946,21 +1037,14 @@ saveit:
;
gotit:
if ( cx->rc == 0 ) {
if ( cx->prefix == DN_SUBTREE_PREFIX ) {
ID *save, idcurs;
save = cx->op->o_tmpalloc( BDB_IDL_SIZEOF( cx->tmp ),
cx->op->o_tmpmemctx );
BDB_IDL_CPY( save, cx->tmp );
/* If eilist is NULL, cx->tmp is empty... */
if ( cx->prefix == DN_SUBTREE_PREFIX && eilist ) {
bdb_idl_union( cx->ids, cx->tmp );
idcurs = 0;
for ( cx->id = bdb_idl_first( save, &idcurs );
cx->id != NOID;
cx->id = bdb_idl_next( save, &idcurs )) {
for (ptr = eilist; *ptr; ptr++) {
cx->ei = *ptr;
cx->id = cx->ei->bei_id;
hdb_dn2idl_internal( cx );
}
cx->op->o_tmpfree( save, cx->op->o_tmpmemctx );
cx->op->o_tmpfree( eilist, cx->op->o_tmpmemctx );
cx->rc = 0;
} else {
BDB_IDL_CPY( cx->ids, cx->tmp );
@ -995,6 +1079,7 @@ hdb_dn2idl(
#endif
cx.id = e->e_id;
cx.ei = BEI(e);
cx.bdb = bdb;
cx.db = cx.bdb->bi_dn2id->bdi_db;
cx.prefix = op->ors_scope == LDAP_SCOPE_SUBTREE ? DN_SUBTREE_PREFIX :

View File

@ -684,7 +684,7 @@ retry: /* transaction retry */
/* Shortcut the search */
nei = neip ? neip : eip;
rs->sr_err = bdb_cache_find_ndn ( op, ltid, &new_ndn, &nei, locker );
rs->sr_err = bdb_cache_find_ndn ( op, ltid, &new_ndn, &nei );
if ( nei ) bdb_cache_entryinfo_unlock( nei );
switch( rs->sr_err ) {
case DB_LOCK_DEADLOCK:

View File

@ -457,8 +457,7 @@ int bdb_cache_find_ndn(
Operation *op,
DB_TXN *txn,
struct berval *ndn,
EntryInfo **res,
u_int32_t locker
EntryInfo **res
);
int bdb_cache_find_id(
Operation *op,

View File

@ -192,8 +192,7 @@ static int bdb_tool_next_id(
DB_TXN *tid,
Entry *e,
struct berval *text,
int hole,
u_int32_t locker )
int hole )
{
struct bdb_info *bdb = (struct bdb_info *) op->o_bd->be_private;
struct berval dn = e->e_nname;
@ -201,7 +200,7 @@ static int bdb_tool_next_id(
EntryInfo *ei = NULL;
int rc;
rc = bdb_cache_find_ndn( op, tid, &dn, &ei, locker );
rc = bdb_cache_find_ndn( op, tid, &dn, &ei );
if ( ei ) bdb_cache_entryinfo_unlock( ei );
if ( rc == DB_NOTFOUND ) {
if ( be_issuffix( op->o_bd, &dn ) ) {
@ -209,7 +208,7 @@ static int bdb_tool_next_id(
} else {
dnParent( &dn, &pdn );
e->e_nname = pdn;
rc = bdb_tool_next_id( op, tid, e, text, 1, locker );
rc = bdb_tool_next_id( op, tid, e, text, 1 );
if ( rc ) {
return rc;
}
@ -282,7 +281,6 @@ ID bdb_tool_entry_put(
struct bdb_info *bdb = (struct bdb_info *) be->be_private;
DB_TXN *tid = NULL;
Operation op = {0};
u_int32_t locker;
assert( be != NULL );
assert( slapMode & SLAP_TOOL_MODE );
@ -319,9 +317,8 @@ ID bdb_tool_entry_put(
op.o_tmpmemctx = NULL;
op.o_tmpmfuncs = &ch_mfuncs;
locker = TXN_ID( tid );
/* add dn2id indices */
rc = bdb_tool_next_id( &op, tid, e, text, 0, locker );
rc = bdb_tool_next_id( &op, tid, e, text, 0 );
if( rc != 0 ) {
goto done;
}