Multi-threaded slapindex

This commit is contained in:
Howard Chu 2005-10-27 10:34:33 +00:00
parent fc621a1c78
commit f94968fb5b
5 changed files with 239 additions and 23 deletions

View File

@ -25,12 +25,15 @@
#include "back-bdb.h"
#include "lutil.h"
unsigned
bdb_attr_slot( struct bdb_info *bdb, AttributeDescription *ad )
/* Find the ad, return -1 if not found,
* set point for insertion if ins is non-NULL
*/
int
bdb_attr_slot( struct bdb_info *bdb, AttributeDescription *ad, unsigned *ins )
{
unsigned base = 0, cursor = 0;
unsigned n = bdb->bi_nattrs;
int val;
int val = 0;
while ( 0 < n ) {
int pivot = n >> 1;
@ -46,18 +49,22 @@ bdb_attr_slot( struct bdb_info *bdb, AttributeDescription *ad )
return cursor;
}
}
if ( val > 0 )
++cursor;
return cursor;
if ( ins ) {
if ( val > 0 )
++cursor;
*ins = cursor;
}
return -1;
}
static int
ainfo_insert( struct bdb_info *bdb, AttrInfo *a )
{
unsigned x = bdb_attr_slot( bdb, a->ai_desc );
unsigned x;
int i = bdb_attr_slot( bdb, a->ai_desc, &x );
/* Is it a dup? */
if ( x < bdb->bi_nattrs && bdb->bi_attrs[x]->ai_desc == a->ai_desc )
if ( i >= 0 )
return -1;
bdb->bi_attrs = ch_realloc( bdb->bi_attrs, ( bdb->bi_nattrs+1 ) *
@ -75,9 +82,8 @@ bdb_attr_mask(
struct bdb_info *bdb,
AttributeDescription *desc )
{
unsigned i = bdb_attr_slot( bdb, desc );
return ( i < bdb->bi_nattrs && bdb->bi_attrs[i]->ai_desc == desc ) ?
bdb->bi_attrs[i] : NULL;
int i = bdb_attr_slot( bdb, desc, NULL );
return i < 0 ? NULL : bdb->bi_attrs[i];
}
int
@ -352,10 +358,10 @@ bdb_attr_index_destroy( struct bdb_info *bdb )
void bdb_attr_index_free( struct bdb_info *bdb, AttributeDescription *ad )
{
unsigned i;
int i;
i = bdb_attr_slot( bdb, ad );
if ( i < bdb->bi_nattrs && bdb->bi_attrs[i]->ai_desc == ad ) {
i = bdb_attr_slot( bdb, ad, NULL );
if ( i >= 0 ) {
bdb_attr_info_free( bdb->bi_attrs[i] );
bdb->bi_nattrs--;
for (; i<bdb->bi_nattrs; i++)

View File

@ -300,6 +300,19 @@ typedef struct bdb_attrinfo {
#define BDB_INDEX_DELETING 0x8000U /* index is being modified */
#define BDB_INDEX_UPDATE_OP 0x03 /* performing an index update */
/* For slapindex to record which attrs in an entry belong to which
* index database
*/
typedef struct AttrList {
struct AttrList *next;
Attribute *attr;
} AttrList;
typedef struct IndexRec {
AttrInfo *ai;
AttrList *attrs;
} IndexRec;
#include "proto-bdb.h"
#endif /* _BACK_BDB_H_ */

View File

@ -377,6 +377,80 @@ int bdb_index_values(
return rc;
}
/* Get the list of which indices apply to this attr */
int
bdb_index_recset(
struct bdb_info *bdb,
Attribute *a,
AttributeType *type,
struct berval *tags,
IndexRec *ir )
{
int rc, slot;
AttrList *al;
if( type->sat_sup ) {
/* recurse */
rc = bdb_index_recset( bdb, a, type->sat_sup, tags, ir );
if( rc ) return rc;
}
/* If this type has no AD, we've never used it before */
if( type->sat_ad ) {
slot = bdb_attr_slot( bdb, type->sat_ad, NULL );
if ( slot >= 0 ) {
ir[slot].ai = bdb->bi_attrs[slot];
al = ch_malloc( sizeof( AttrList ));
al->attr = a;
al->next = ir[slot].attrs;
ir[slot].attrs = al;
}
}
if( tags->bv_len ) {
AttributeDescription *desc;
desc = ad_find_tags( type, tags );
if( desc ) {
slot = bdb_attr_slot( bdb, desc, NULL );
if ( slot >= 0 ) {
ir[slot].ai = bdb->bi_attrs[slot];
al = ch_malloc( sizeof( AttrList ));
al->attr = a;
al->next = ir[slot].attrs;
ir[slot].attrs = al;
}
}
}
return LDAP_SUCCESS;
}
/* Apply the indices for the recset */
int bdb_index_recrun(
Operation *op,
struct bdb_info *bdb,
IndexRec *ir0,
ID id,
int base )
{
IndexRec *ir;
AttrList *al;
int i, rc;
for (i=base; i<bdb->bi_nattrs; i+=slap_tool_thread_max) {
ir = ir0 + i;
if ( !ir->ai ) continue;
while (( al = ir->attrs )) {
ir->attrs = al->next;
rc = indexer( op, NULL, ir->ai->ai_desc,
&ir->ai->ai_desc->ad_type->sat_cname,
al->attr->a_nvals, id, SLAP_INDEX_ADD_OP,
ir->ai->ai_indexmask );
free( al );
if ( rc ) break;
}
}
return rc;
}
int
bdb_index_entry(
Operation *op,

View File

@ -44,8 +44,8 @@ AttrInfo *bdb_attr_mask( struct bdb_info *bdb,
void bdb_attr_flush( struct bdb_info *bdb );
unsigned bdb_attr_slot( struct bdb_info *bdb,
AttributeDescription *desc );
int bdb_attr_slot( struct bdb_info *bdb,
AttributeDescription *desc, unsigned *insert );
int bdb_attr_index_config LDAP_P(( struct bdb_info *bdb,
const char *fname, int lineno,
@ -329,6 +329,8 @@ int bdb_idl_append_one( ID *ids, ID id );
#define bdb_index_param BDB_SYMBOL(index_param)
#define bdb_index_values BDB_SYMBOL(index_values)
#define bdb_index_entry BDB_SYMBOL(index_entry)
#define bdb_index_recset BDB_SYMBOL(index_recset)
#define bdb_index_recrun BDB_SYMBOL(index_recrun)
extern int
bdb_index_is_indexed LDAP_P((
@ -353,6 +355,22 @@ bdb_index_values LDAP_P((
ID id,
int opid ));
extern int
bdb_index_recset LDAP_P((
struct bdb_info *bdb,
Attribute *a,
AttributeType *type,
struct berval *tags,
IndexRec *ir ));
extern int
bdb_index_recrun LDAP_P((
Operation *op,
struct bdb_info *bdb,
IndexRec *ir,
ID id,
int base ));
int bdb_index_entry LDAP_P(( Operation *op, DB_TXN *t, int r, Entry *e ));
#define bdb_index_entry_add(op,t,e) \

View File

@ -43,12 +43,22 @@ static int index_nattrs;
#define bdb_tool_idl_flush BDB_SYMBOL(tool_idl_flush)
static int bdb_tool_idl_flush( BackendDB *be );
static int bdb_tool_ix_rec( int base );
static void * bdb_tool_index_task( void *ctx, void *ptr );
#define IDBLOCK 1024
static ID *bdb_tool_idls;
static int bdb_tool_idl_next;
static ID bdb_tool_ix_id;
static Operation *bdb_tool_ix_op;
static volatile int *bdb_tool_index_threads;
static void *bdb_tool_index_rec;
static struct bdb_info *bdb_tool_info;
static ldap_pvt_thread_mutex_t bdb_tool_index_mutex;
static ldap_pvt_thread_cond_t bdb_tool_index_cond;
int bdb_tool_entry_open(
BackendDB *be, int mode )
{
@ -69,10 +79,27 @@ int bdb_tool_entry_open(
}
}
/* Get a block of memory for the IDL cache */
if ( !(slapMode & SLAP_TOOL_READONLY ) && bdb->bi_idl_cache_max_size )
bdb_tool_idls = ch_malloc( bdb->bi_idl_cache_max_size *
sizeof( ID ) * IDBLOCK );
/* Set up for slapindex */
if ( !(slapMode & SLAP_TOOL_READONLY )) {
int i;
if ( bdb->bi_idl_cache_max_size )
bdb_tool_idls = ch_malloc( bdb->bi_idl_cache_max_size *
sizeof( ID ) * IDBLOCK );
if ( !bdb_tool_info && ( slapMode & SLAP_TOOL_QUICK )) {
ldap_pvt_thread_mutex_init( &bdb_tool_index_mutex );
ldap_pvt_thread_cond_init( &bdb_tool_index_cond );
bdb_tool_index_threads = ch_malloc( slap_tool_thread_max * sizeof( int ));
bdb_tool_index_rec = ch_malloc( bdb->bi_nattrs * sizeof( IndexRec ));
for (i=1; i<slap_tool_thread_max; i++) {
int *ptr = ch_malloc( sizeof( int ));
*ptr = i;
ldap_pvt_thread_pool_submit( &connection_pool,
bdb_tool_index_task, ptr );
ldap_pvt_thread_yield();
}
}
bdb_tool_info = bdb;
}
return 0;
}
@ -80,7 +107,14 @@ int bdb_tool_entry_open(
int bdb_tool_entry_close(
BackendDB *be )
{
assert( be != NULL );
struct bdb_info *bdb = (struct bdb_info *) be->be_private;
if ( bdb_tool_info ) {
slapd_shutdown = 1;
ldap_pvt_thread_mutex_lock( &bdb_tool_index_mutex );
ldap_pvt_thread_cond_broadcast( &bdb_tool_index_cond );
ldap_pvt_thread_mutex_unlock( &bdb_tool_index_mutex );
}
if( key.data ) {
ch_free( key.data );
@ -341,6 +375,53 @@ static int bdb_tool_next_id(
return rc;
}
static int
bdb_tool_index_add(
Operation *op,
DB_TXN *txn,
Entry *e )
{
struct bdb_info *bdb = (struct bdb_info *) op->o_bd->be_private;
if ( slapMode & SLAP_TOOL_QUICK ) {
IndexRec *ir;
int i, rc;
Attribute *a;
ir = bdb_tool_index_rec;
memset(ir, 0, bdb->bi_nattrs * sizeof( IndexRec ));
for ( a = e->e_attrs; a != NULL; a = a->a_next ) {
rc = bdb_index_recset( bdb, a, a->a_desc->ad_type,
&a->a_desc->ad_tags, ir );
if ( rc )
return rc;
}
bdb_tool_ix_id = e->e_id;
bdb_tool_ix_op = op;
ldap_pvt_thread_mutex_lock( &bdb_tool_index_mutex );
for ( i=1; i<slap_tool_thread_max; i++ )
bdb_tool_index_threads[i] = LDAP_BUSY;
ldap_pvt_thread_cond_broadcast( &bdb_tool_index_cond );
ldap_pvt_thread_mutex_unlock( &bdb_tool_index_mutex );
rc = bdb_index_recrun( op, bdb, ir, e->e_id, 0 );
if ( rc )
return rc;
for ( i=1; i<slap_tool_thread_max; i++ ) {
if ( bdb_tool_index_threads[i] == LDAP_BUSY ) {
ldap_pvt_thread_yield();
i--;
continue;
}
if ( bdb_tool_index_threads[i] )
return bdb_tool_index_threads[i];
}
return 0;
} else {
return bdb_index_entry_add( op, txn, e );
}
}
ID bdb_tool_entry_put(
BackendDB *be,
Entry *e,
@ -400,7 +481,7 @@ ID bdb_tool_entry_put(
}
if ( !bdb->bi_linear_index )
rc = bdb_index_entry_add( &op, tid, e );
rc = bdb_tool_index_add( &op, tid, e );
if( rc != 0 ) {
snprintf( text->bv_val, text->bv_len,
"index_entry_add failed: %s (%d)",
@ -507,7 +588,7 @@ int bdb_tool_entry_reindex(
op.o_tmpmemctx = NULL;
op.o_tmpmfuncs = &ch_mfuncs;
rc = bdb_index_entry_add( &op, tid, e );
rc = bdb_tool_index_add( &op, tid, e );
done:
if( rc == 0 ) {
@ -878,9 +959,11 @@ retry:
}
/* No free block, create that too */
if ( ic->idn == -1 || ( ic->count & (IDBLOCK-1)) == 0) {
ldap_pvt_thread_mutex_lock( &bdb->bi_idl_tree_lrulock );
bdb->bi_idl_cache_size++;
if ( bdb->bi_idl_cache_size > bdb->bi_idl_cache_max_size ) {
rc = bdb_tool_idl_flush( be );
ldap_pvt_thread_mutex_unlock( &bdb->bi_idl_tree_lrulock );
if ( rc )
return rc;
goto retry;
@ -888,6 +971,7 @@ retry:
ic->idls[++ic->idn] = bdb_tool_idl_next;
ids = bdb_tool_idls + bdb_tool_idl_next;
bdb_tool_idl_next += IDBLOCK;
ldap_pvt_thread_mutex_unlock( &bdb->bi_idl_tree_lrulock );
memset( ids, 0, IDBLOCK * sizeof( ID ));
if ( !ic->count )
@ -899,3 +983,24 @@ retry:
return 0;
}
static void *
bdb_tool_index_task( void *ctx, void *ptr )
{
int base = *(int *)ptr;
free( ptr );
while ( 1 ) {
ldap_pvt_thread_mutex_lock( &bdb_tool_index_mutex );
ldap_pvt_thread_cond_wait( &bdb_tool_index_cond,
&bdb_tool_index_mutex );
ldap_pvt_thread_mutex_unlock( &bdb_tool_index_mutex );
if ( slapd_shutdown )
break;
bdb_tool_index_threads[base] = bdb_index_recrun( bdb_tool_ix_op,
bdb_tool_info, bdb_tool_index_rec, bdb_tool_ix_id, base );
}
return NULL;
}