Add IDL caching for slapadd/slapindex quick mode

This commit is contained in:
Howard Chu 2005-10-22 21:41:58 +00:00
parent 3ee954de81
commit 34e154e28b
3 changed files with 272 additions and 1 deletions

View File

@ -84,7 +84,11 @@ bdb_key_change(
if (op == SLAP_INDEX_ADD_OP) {
/* Add values */
rc = bdb_idl_insert_key( be, db, txn, &key, id );
if ( slapMode & SLAP_TOOL_QUICK )
rc = bdb_tool_idl_add( be, db, txn, &key, id );
else
rc = bdb_idl_insert_key( be, db, txn, &key, id );
if ( rc == DB_KEYEXIST ) rc = 0;
} else {
/* Delete values */

View File

@ -595,6 +595,7 @@ bdb_trans_backoff( int num_retries );
#define bdb_tool_dn2id_get BDB_SYMBOL(tool_dn2id_get)
#define bdb_tool_id2entry_get BDB_SYMBOL(tool_id2entry_get)
#define bdb_tool_entry_modify BDB_SYMBOL(tool_entry_modify)
#define bdb_tool_idl_add BDB_SYMBOL(tool_idl_add)
extern BI_init bdb_back_initialize;
@ -626,6 +627,8 @@ extern BI_tool_dn2id_get bdb_tool_dn2id_get;
extern BI_tool_id2entry_get bdb_tool_id2entry_get;
extern BI_tool_entry_modify bdb_tool_entry_modify;
int bdb_tool_idl_add( BackendDB *be, DB *db, DB_TXN *txn, DBT *key, ID id );
LDAP_END_DECL
#endif /* _PROTO_BDB_H */

View File

@ -21,6 +21,7 @@
#define AVL_INTERNAL
#include "back-bdb.h"
#include "idl.h"
static DBC *cursor = NULL;
static DBT key, data;
@ -37,6 +38,12 @@ static unsigned nholes;
static Avlnode *index_attrs, index_dummy;
#define bdb_tool_idl_cmp BDB_SYMBOL(tool_idl_cmp)
#define bdb_tool_idl_flush_one BDB_SYMBOL(tool_idl_flush_one)
#define bdb_tool_idl_flush BDB_SYMBOL(tool_idl_flush)
static int bdb_tool_idl_flush( BackendDB *be );
int bdb_tool_entry_open(
BackendDB *be, int mode )
{
@ -79,6 +86,8 @@ int bdb_tool_entry_close(
cursor = NULL;
}
bdb_tool_idl_flush( be );
if( nholes ) {
unsigned i;
fprintf( stderr, "Error, entries missing!\n");
@ -630,3 +639,258 @@ done:
return e->e_id;
}
#define IDBLOCK 1024
typedef struct bdb_tool_idl_cache_entry {
struct bdb_tool_idl_cache_entry *next;
ID ids[IDBLOCK];
} bdb_tool_idl_cache_entry;
typedef struct bdb_tool_idl_cache {
struct berval kstr;
bdb_tool_idl_cache_entry *head, *tail;
ID first, last;
int count;
} bdb_tool_idl_cache;
static bdb_tool_idl_cache_entry *bdb_tool_idl_free_list;
static int
bdb_tool_idl_cmp( const void *v1, const void *v2 )
{
const bdb_tool_idl_cache *c1 = v1, *c2 = v2;
int rc;
if (( rc = c1->kstr.bv_len - c2->kstr.bv_len )) return rc;
return memcmp( c1->kstr.bv_val, c2->kstr.bv_val, c1->kstr.bv_len );
}
static int
bdb_tool_idl_flush_one( void *v1, void *arg )
{
bdb_tool_idl_cache *ic = v1;
DB *db = arg;
bdb_tool_idl_cache_entry *ice;
DBC *curs;
DBT key, data;
int i, rc;
ID id, nid;
rc = db->cursor( db, NULL, &curs, 0 );
if ( rc )
return -1;
DBTzero( &key );
DBTzero( &data );
bv2DBT( &ic->kstr, &key );
data.size = data.ulen = sizeof( ID );
data.flags = DB_DBT_USERMEM;
data.data = &nid;
rc = curs->c_get( curs, &key, &data, DB_SET );
/* If key already exists and we're writing a range... */
if ( rc == 0 && ic->head == NULL ) {
/* If it's not currently a range, must delete old info */
if ( nid ) {
/* Skip lo */
while ( curs->c_get( curs, &key, &data, DB_NEXT_DUP ) == 0 )
curs->c_del( curs, 0 );
nid = 0;
/* Store range marker */
curs->c_put( curs, &key, &data, DB_KEYFIRST );
} else {
/* Skip lo */
rc = curs->c_get( curs, &key, &data, DB_NEXT_DUP );
/* Get hi */
rc = curs->c_get( curs, &key, &data, DB_NEXT_DUP );
/* Delete hi */
curs->c_del( curs, 0 );
}
BDB_ID2DISK( ic->last, &nid );
curs->c_put( curs, &key, &data, DB_KEYLAST );
rc = 0;
} else if ( rc && rc != DB_NOTFOUND ) {
rc = -1;
} else if ( ic->head == NULL ) {
/* range, didn't exist before */
nid = 0;
rc = curs->c_put( curs, &key, &data, DB_KEYLAST );
if ( rc == 0 ) {
BDB_ID2DISK( ic->first, &nid );
rc = curs->c_put( curs, &key, &data, DB_KEYLAST );
if ( rc == 0 ) {
BDB_ID2DISK( ic->last, &nid );
rc = curs->c_put( curs, &key, &data, DB_KEYLAST );
}
}
if ( rc ) {
rc = -1;
}
} else {
/* Just a normal write */
rc = 0;
for ( ice = ic->head; ice; ice = ic->head ) {
int end = ice->next ? IDBLOCK : (ic->count & (IDBLOCK-1));
ic->head = ice->next;
for ( i=0; i<end; i++ ) {
if ( !ice->ids[i] ) continue;
BDB_ID2DISK( ice->ids[i], &nid );
rc = curs->c_put( curs, &key, &data, DB_NODUPDATA );
if ( rc ) {
if ( rc == DB_KEYEXIST ) {
rc = 0;
continue;
}
rc = -1;
break;
}
}
ice->next = bdb_tool_idl_free_list;
bdb_tool_idl_free_list = ice;
if ( rc ) {
rc = -1;
break;
}
}
}
ch_free( ic );
curs->c_close( curs );
return rc;
}
static int
bdb_tool_idl_flush( BackendDB *be )
{
struct bdb_info *bdb = (struct bdb_info *) be->be_private;
DB *db;
Avlnode *root;
int i, rc = 0;
for ( i=BDB_NDB; i < bdb->bi_ndatabases; i++ ) {
db = bdb->bi_databases[i]->bdi_db;
if ( !db->app_private ) continue;
rc = avl_apply( db->app_private, bdb_tool_idl_flush_one, db, -1,
AVL_INORDER );
avl_free( db->app_private, NULL );
db->app_private = NULL;
if ( rc == -1 ) break;
rc = 0;
}
if ( !rc )
bdb->bi_idl_cache_size = 0;
return rc;
}
int bdb_tool_idl_add(
BackendDB *be,
DB *db,
DB_TXN *txn,
DBT *key,
ID id )
{
struct bdb_info *bdb = (struct bdb_info *) be->be_private;
bdb_tool_idl_cache *ic, itmp;
bdb_tool_idl_cache_entry *ice;
Avlnode *root;
int rc;
if ( !bdb->bi_idl_cache_max_size )
return bdb_idl_insert_key( be, db, txn, key, id );
root = db->app_private;
DBT2bv( key, &itmp.kstr );
ic = avl_find( root, &itmp, bdb_tool_idl_cmp );
/* No entry yet, create one */
if ( !ic ) {
DBC *curs;
DBT data;
ID nid;
int rc;
ic = ch_malloc( sizeof( bdb_tool_idl_cache ) + itmp.kstr.bv_len );
ic->kstr.bv_len = itmp.kstr.bv_len;
ic->kstr.bv_val = (char *)(ic+1);
AC_MEMCPY( ic->kstr.bv_val, itmp.kstr.bv_val, ic->kstr.bv_len );
ic->head = ic->tail = NULL;
ic->last = 0;
ic->count = 0;
avl_insert( &root, ic, bdb_tool_idl_cmp, avl_dup_error );
db->app_private = root;
/* load existing key count here */
rc = db->cursor( db, NULL, &curs, 0 );
if ( rc ) return rc;
data.ulen = sizeof( ID );
data.flags = DB_DBT_USERMEM;
data.data = &nid;
rc = curs->c_get( curs, key, &data, DB_SET );
if ( rc == 0 ) {
if ( nid == 0 ) {
ic->count = BDB_IDL_DB_SIZE+1;
} else {
db_recno_t count;
curs->c_count( curs, &count, 0 );
ic->count = count;
BDB_DISK2ID( &nid, &ic->first );
}
}
curs->c_close( curs );
}
/* are we a range already? */
if ( ic->count > BDB_IDL_DB_SIZE ) {
ic->last = id;
return 0;
/* Are we at the limit, and converting to a range? */
} else if ( ic->count == BDB_IDL_DB_SIZE ) {
for (ice = ic->head; ice; ice = ice->next ) {
bdb->bi_idl_cache_size--;
}
ic->tail->next = bdb_tool_idl_free_list;
bdb_tool_idl_free_list = ic->head;
ic->head = ic->tail = NULL;
ic->last = id;
ic->count++;
return 0;
}
/* No free block, create that too */
if ( !ic->tail || ( ic->count & (IDBLOCK-1)) == 0) {
if ( bdb_tool_idl_free_list ) {
ice = bdb_tool_idl_free_list;
bdb_tool_idl_free_list = ice->next;
} else {
ice = ch_malloc( sizeof( bdb_tool_idl_cache_entry ));
}
memset( ice, 0, sizeof( *ice ));
if ( !ic->head ) {
ic->head = ice;
} else {
ic->tail->next = ice;
}
ic->tail = ice;
bdb->bi_idl_cache_size++;
if ( !ic->count )
ic->first = id;
}
ice = ic->tail;
ice->ids[ ic->count & (IDBLOCK-1) ] = id;
ic->count++;
rc = 0;
/* Check for flush */
if ( bdb->bi_idl_cache_size > bdb->bi_idl_cache_max_size ) {
rc = bdb_tool_idl_flush( be );
}
return rc;
}