More for back-mdb multival

Allow configuring thresholds for specific attributes
This commit is contained in:
Howard Chu 2018-08-30 11:24:25 +01:00
parent 6c221e7730
commit 111329a2dc
8 changed files with 267 additions and 30 deletions

View File

@ -167,23 +167,22 @@ Specify the file protection mode that newly created database
files should have.
The default is 0600.
.TP
.BI multival_hi \ <integer>
Specify the number of values above which a multivalued attribute is
\fBmultival \fR{\fI<attrlist>\fR|\fBdefault\fR} \fI<integer hi>\fR,\fI<integer lo>
Specify the number of values for which a multivalued attribute is
stored in a separate table. Normally entries are stored as a single
blob inside the database. When an entry gets very large or contains
attributes with a very large number of values, modifications on that
entry may get very slow. Splitting the large attributes out to a separate
table can improve the performance of modification operations.
The default is UINT_MAX, which keeps all attributes in the main blob.
.TP
.BI multival_lo \ <integer>
Specify the number of values below which a multivalued attribute
that was stored in a separate table is moved back into the main
entry blob. If a modification deletes enough values to bring an
attribute below this threshold, its values will be removed from the
separate table and merged back into the main entry blob.
The default is UINT_MAX, which keeps all attributes in
the main blob.
The threshold is specified as a pair of integers. If the number of
values exceeds the hi threshold the values will be split out. If
a modification deletes enough values to bring an attribute below
the lo threshold the values will be removed from the separate
table and merged back into the main entry blob.
The threshold can be set for a specific list of attributes, or
the default can be configured for all other attributes.
The default value for both hi and lo thresholds is UINT_MAX, which keeps
all attributes in the main blob.
.TP
.BI rtxnsize \ <entries>
Specify the maximum number of entries to process in a single read

View File

@ -122,6 +122,8 @@ mdb_attr_dbs_open(
for ( i=0; i<mdb->mi_nattrs; i++ ) {
if ( mdb->mi_attrs[i]->ai_dbi ) /* already open */
continue;
if ( !mdb->mi_attrs[i]->ai_indexmask ) /* not an index record */
continue;
rc = mdb_dbi_open( txn, mdb->mi_attrs[i]->ai_desc->ad_type->sat_cname.bv_val,
flags, &mdb->mi_attrs[i]->ai_dbi );
if ( rc ) {
@ -384,6 +386,8 @@ fail:
a->ai_root = NULL;
a->ai_desc = ad;
a->ai_dbi = 0;
a->ai_multi_hi = UINT_MAX;
a->ai_multi_lo = UINT_MAX;
if ( mdb->mi_flags & MDB_IS_OPEN ) {
a->ai_indexmask = 0;
@ -494,7 +498,160 @@ mdb_attr_index_unparse( struct mdb_info *mdb, BerVarray *bva )
mdb_attr_index_unparser( &aidef, bva );
}
for ( i=0; i<mdb->mi_nattrs; i++ )
mdb_attr_index_unparser( mdb->mi_attrs[i], bva );
if ( mdb->mi_attrs[i]->ai_indexmask )
mdb_attr_index_unparser( mdb->mi_attrs[i], bva );
}
int
mdb_attr_multi_config(
struct mdb_info *mdb,
const char *fname,
int lineno,
int argc,
char **argv,
struct config_reply_s *c_reply)
{
int rc = 0;
int i;
unsigned hi,lo;
char **attrs, *next, *s;
attrs = ldap_str2charray( argv[0], "," );
if( attrs == NULL ) {
fprintf( stderr, "%s: line %d: "
"no attributes specified: %s\n",
fname, lineno, argv[0] );
return LDAP_PARAM_ERROR;
}
hi = strtoul( argv[1], &next, 10 );
if ( next == argv[1] || next[0] != ',' )
goto badval;
s = next+1;
lo = strtoul( s, &next, 10 );
if ( next == s || next[0] != '\0' )
goto badval;
if ( lo >= hi ) {
badval:
snprintf(c_reply->msg, sizeof(c_reply->msg),
"invalid hi/lo thresholds" );
fprintf( stderr, "%s: line %d: %s\n",
fname, lineno, c_reply->msg );
return LDAP_PARAM_ERROR;
}
for ( i = 0; attrs[i] != NULL; i++ ) {
AttrInfo *a;
AttributeDescription *ad;
const char *text;
if( strcasecmp( attrs[i], "default" ) == 0 ) {
mdb->mi_multi_hi = hi;
mdb->mi_multi_lo = lo;
continue;
}
ad = NULL;
rc = slap_str2ad( attrs[i], &ad, &text );
if( rc != LDAP_SUCCESS ) {
if ( c_reply )
{
snprintf(c_reply->msg, sizeof(c_reply->msg),
"multival attribute \"%s\" undefined",
attrs[i] );
fprintf( stderr, "%s: line %d: %s\n",
fname, lineno, c_reply->msg );
}
fail:
goto done;
}
a = (AttrInfo *) ch_calloc( 1, sizeof(AttrInfo) );
a->ai_desc = ad;
a->ai_multi_hi = hi;
a->ai_multi_lo = lo;
rc = ainfo_insert( mdb, a );
if( rc ) {
if (c_reply) {
snprintf(c_reply->msg, sizeof(c_reply->msg),
"duplicate multival definition for attr \"%s\"",
attrs[i] );
fprintf( stderr, "%s: line %d: %s\n",
fname, lineno, c_reply->msg );
}
rc = LDAP_PARAM_ERROR;
goto done;
}
}
done:
ldap_charray_free( attrs );
return rc;
}
static int
mdb_attr_multi_unparser( void *v1, void *v2 )
{
AttrInfo *ai = v1;
BerVarray *bva = v2;
struct berval bv;
char digbuf[sizeof("4294967296,4294967296")];
char *ptr;
bv.bv_len = snprintf( digbuf, sizeof(digbuf), "%u,%u",
ai->ai_multi_hi, ai->ai_multi_lo );
if ( bv.bv_len ) {
bv.bv_len += ai->ai_desc->ad_cname.bv_len + 1;
ptr = ch_malloc( bv.bv_len+1 );
bv.bv_val = lutil_strcopy( ptr, ai->ai_desc->ad_cname.bv_val );
*bv.bv_val++ = ' ';
strcpy(bv.bv_val, digbuf);
bv.bv_val = ptr;
ber_bvarray_add( bva, &bv );
}
return 0;
}
void
mdb_attr_multi_unparse( struct mdb_info *mdb, BerVarray *bva )
{
int i;
if ( mdb->mi_multi_hi < UINT_MAX ) {
aidef.ai_multi_hi = mdb->mi_multi_hi;
aidef.ai_multi_lo = mdb->mi_multi_lo;
mdb_attr_multi_unparser( &aidef, bva );
}
for ( i=0; i<mdb->mi_nattrs; i++ )
if ( mdb->mi_attrs[i]->ai_multi_hi < UINT_MAX )
mdb_attr_multi_unparser( mdb->mi_attrs[i], bva );
}
void
mdb_attr_multi_thresh( struct mdb_info *mdb, AttributeDescription *ad, unsigned *hi, unsigned *lo )
{
AttrInfo *ai = mdb_attr_mask( mdb, ad );
if ( ai && ai->ai_multi_hi < UINT_MAX )
{
if ( hi )
*hi = ai->ai_multi_hi;
if ( lo )
*lo = ai->ai_multi_lo;
} else
{
if ( hi )
*hi = mdb->mi_multi_hi;
if ( lo )
*lo = mdb->mi_multi_lo;
}
}
void

View File

@ -147,6 +147,8 @@ typedef struct mdb_attrinfo {
MDB_cursor *ai_cursor; /* for tools */
int ai_idx; /* position in AI array */
MDB_dbi ai_dbi;
unsigned ai_multi_hi;
unsigned ai_multi_lo;
} AttrInfo;
/* tool threaded indexer state */

View File

@ -40,6 +40,7 @@ enum {
MDB_MAXSIZE,
MDB_MODE,
MDB_SSTACK,
MDB_MULTIVAL,
};
static ConfigTable mdbcfg[] = {
@ -83,16 +84,11 @@ static ConfigTable mdbcfg[] = {
mdb_cf_gen, "( OLcfgDbAt:0.3 NAME 'olcDbMode' "
"DESC 'Unix permissions of database files' "
"SYNTAX OMsDirectoryString SINGLE-VALUE )", NULL, NULL },
{ "multival_hi", "num", 2, 2, 0, ARG_UINT|ARG_OFFSET,
(void *)offsetof(struct mdb_info, mi_multi_hi),
"( OLcfgDbAt:12.6 NAME 'olcDbMultivalHi' "
"DESC 'Threshold for splitting multivalued attr out of main blob' "
"SYNTAX OMsInteger SINGLE-VALUE )", NULL, NULL },
{ "multival_lo", "num", 2, 2, 0, ARG_UINT|ARG_OFFSET,
(void *)offsetof(struct mdb_info, mi_multi_lo),
"( OLcfgDbAt:12.7 NAME 'olcDbMultivalLo' "
"DESC 'Threshold for consolidating multivalued attr back into main blob' "
"SYNTAX OMsInteger SINGLE-VALUE )", NULL, NULL },
{ "multival", "attr> <hi,lo", 3, 3, 0, ARG_MAGIC|MDB_MULTIVAL,
mdb_cf_gen,
"( OLcfgDbAt:12.6 NAME 'olcDbMultival' "
"DESC 'Hi/Lo thresholds for splitting multivalued attr out of main blob' "
"SYNTAX OMsDirectoryString )", NULL, NULL },
{ "rtxnsize", "entries", 2, 2, 0, ARG_UINT|ARG_OFFSET,
(void *)offsetof(struct mdb_info, mi_rtxn_size),
"( OLcfgDbAt:12.5 NAME 'olcDbRtxnSize' "
@ -116,7 +112,7 @@ static ConfigOCs mdbocs[] = {
"MAY ( olcDbCheckpoint $ olcDbEnvFlags $ "
"olcDbNoSync $ olcDbIndex $ olcDbMaxReaders $ olcDbMaxSize $ "
"olcDbMode $ olcDbSearchStack $ olcDbMaxEntrySize $ olcDbRtxnSize $ "
"olcDbMultivalHi $ olcDbMultivalLo ) )",
"olcDbMultival ) )",
Cft_Database, mdbcfg },
{ NULL, 0, NULL }
};
@ -357,6 +353,11 @@ mdb_cf_gen( ConfigArgs *c )
case MDB_MAXSIZE:
c->value_ulong = mdb->mi_mapsize;
break;
case MDB_MULTIVAL:
mdb_attr_multi_unparse( mdb, &c->rvalue_vals );
if ( !c->rvalue_vals ) rc = 1;
break;
}
return rc;
} else if ( c->op == LDAP_MOD_DELETE ) {
@ -489,6 +490,61 @@ mdb_cf_gen( ConfigArgs *c )
}
}
break;
case MDB_MULTIVAL:
if ( c->valx == -1 ) {
int i;
/* delete all */
for ( i = 0; i < mdb->mi_nattrs; i++ ) {
mdb->mi_attrs[i]->ai_multi_hi = UINT_MAX;
mdb->mi_attrs[i]->ai_multi_lo = UINT_MAX;
}
mdb->mi_multi_hi = UINT_MAX;
mdb->mi_multi_lo = UINT_MAX;
} else {
struct berval bv, def = BER_BVC("default");
char *ptr;
for (ptr = c->line; !isspace( (unsigned char) *ptr ); ptr++);
bv.bv_val = c->line;
bv.bv_len = ptr - bv.bv_val;
if ( bvmatch( &bv, &def )) {
mdb->mi_multi_hi = UINT_MAX;
mdb->mi_multi_lo = UINT_MAX;
} else {
int i;
char **attrs;
char sep;
sep = bv.bv_val[ bv.bv_len ];
bv.bv_val[ bv.bv_len ] = '\0';
attrs = ldap_str2charray( bv.bv_val, "," );
for ( i = 0; attrs[ i ]; i++ ) {
AttributeDescription *ad = NULL;
const char *text;
AttrInfo *ai;
slap_str2ad( attrs[ i ], &ad, &text );
/* if we got here... */
assert( ad != NULL );
ai = mdb_attr_mask( mdb, ad );
/* if we got here... */
assert( ai != NULL );
ai->ai_multi_hi = UINT_MAX;
ai->ai_multi_lo = UINT_MAX;
}
bv.bv_val[ bv.bv_len ] = sep;
ldap_charray_free( attrs );
}
}
break;
}
return rc;
}
@ -694,6 +750,12 @@ mdb_cf_gen( ConfigArgs *c )
}
break;
case MDB_MULTIVAL:
rc = mdb_attr_multi_config( mdb, c->fname, c->lineno,
c->argc - 1, &c->argv[1], &c->reply);
if( rc != LDAP_SUCCESS ) return 1;
break;
}
return 0;
}

View File

@ -838,6 +838,7 @@ static int mdb_entry_partsize(struct mdb_info *mdb, MDB_txn *txn, Entry *e,
ber_len_t len, dlen;
int i, nat = 0, nval = 0, nnval = 0, doff = 0;
Attribute *a;
unsigned hi;
eh->multi = NULL;
len = 4*sizeof(int); /* nattrs, nvals, ocflags, offset */
@ -858,7 +859,8 @@ static int mdb_entry_partsize(struct mdb_info *mdb, MDB_txn *txn, Entry *e,
len += 2*sizeof(int); /* AD index, numvals */
dlen += 2*sizeof(int);
nval += a->a_numvals + 1; /* empty berval at end */
if (a->a_numvals > mdb->mi_multi_hi)
mdb_attr_multi_thresh( mdb, a->a_desc, &hi, NULL );
if (a->a_numvals > hi)
a->a_flags |= SLAP_ATTR_BIG_MULTI;
if (a->a_flags & SLAP_ATTR_BIG_MULTI)
doff += a->a_numvals;

View File

@ -312,7 +312,7 @@ static int index_at_values(
/* If this type has no AD, we've never used it before */
if( type->sat_ad ) {
ai = mdb_attr_mask( op->o_bd->be_private, type->sat_ad );
if ( ai ) {
if ( ai && ( ai->ai_indexmask || ai->ai_newmask )) {
#ifdef LDAP_COMP_MATCH
/* component indexing */
if ( ai->ai_cr ) {
@ -351,7 +351,7 @@ static int index_at_values(
if( desc ) {
ai = mdb_attr_mask( op->o_bd->be_private, desc );
if( ai ) {
if( ai && ( ai->ai_indexmask || ai->ai_newmask )) {
if ( opid == MDB_INDEX_UPDATE_OP )
mask = ai->ai_newmask & ~ai->ai_indexmask;
else

View File

@ -173,12 +173,14 @@ do_add:
Debug(LDAP_DEBUG_ARGS, "mdb_modify_internal: %d %s\n",
err, *text, 0);
} else {
unsigned hi;
if (!aold)
anew = attr_find( e->e_attrs, mod->sm_desc );
else
anew = aold;
mdb_attr_multi_thresh( mdb, mod->sm_desc, &hi, NULL );
/* check for big multivalued attrs */
if ( anew->a_numvals > mdb->mi_multi_hi )
if ( anew->a_numvals > hi )
anew->a_flags |= SLAP_ATTR_BIG_MULTI;
if ( anew->a_flags & SLAP_ATTR_BIG_MULTI ) {
if (!mvc) {
@ -247,7 +249,9 @@ do_del:
if ( mod->sm_numvals ) {
anew = attr_find( e->e_attrs, mod->sm_desc );
if ( anew ) {
if ( anew->a_numvals < mdb->mi_multi_lo ) {
unsigned lo;
mdb_attr_multi_thresh( mdb, mod->sm_desc, NULL, &lo );
if ( anew->a_numvals < lo ) {
anew->a_flags ^= SLAP_ATTR_BIG_MULTI;
anew = NULL;
} else {
@ -280,6 +284,7 @@ do_del:
Debug(LDAP_DEBUG_ARGS, "mdb_modify_internal: %d %s\n",
err, *text, 0);
} else {
unsigned hi;
got_delete = 1;
if (a_flags & SLAP_ATTR_BIG_MULTI) {
Attribute a_dummy;
@ -297,7 +302,8 @@ do_del:
goto mval_fail;
}
anew = attr_find( e->e_attrs, mod->sm_desc );
if (mod->sm_numvals > mdb->mi_multi_hi) {
mdb_attr_multi_thresh( mdb, mod->sm_desc, &hi, NULL );
if (mod->sm_numvals > hi) {
anew->a_flags |= SLAP_ATTR_BIG_MULTI;
if (!mvc) {
err = mdb_cursor_open( tid, mdb->mi_dbis[MDB_ID2VAL], &mvc );

View File

@ -44,6 +44,15 @@ void mdb_attr_index_destroy LDAP_P(( struct mdb_info *mdb ));
void mdb_attr_index_free LDAP_P(( struct mdb_info *mdb,
AttributeDescription *ad ));
int mdb_attr_multi_config LDAP_P(( struct mdb_info *mdb,
const char *fname, int lineno,
int argc, char **argv, struct config_reply_s *cr ));
void mdb_attr_multi_unparse LDAP_P(( struct mdb_info *mdb, BerVarray *bva ));
void mdb_attr_multi_thresh LDAP_P(( struct mdb_info *mdb, AttributeDescription *ad,
unsigned *hi, unsigned *lo ));
void mdb_attr_info_free( AttrInfo *ai );
int mdb_ad_read( struct mdb_info *mdb, MDB_txn *txn );