More for large multival attrs

Fix 2335285502
Use custom dupsort function, pass attributeDescription in so
it can use the actual matching rule for sorting.
This commit is contained in:
Howard Chu 2017-01-26 10:28:38 +00:00
parent 0cdb8eac65
commit 2e9f95cbac
3 changed files with 90 additions and 28 deletions

View File

@ -60,6 +60,46 @@ mdb_id2v_compare(
return uv[sizeof(ID)/2] - cv[sizeof(ID)/2];
}
/* usrkey[0] is the key in DB format, as described at mdb_mval_put.
* usrkey[1] is the value we'll actually match against.
* usrkey[2] is the attributeDescription for this value.
*/
int
mdb_id2v_dupsort(
const MDB_val *usrkey,
const MDB_val *curkey
)
{
AttributeDescription *ad = usrkey[2].mv_data;
struct berval bv1, bv2;
int rc, match, olen;
unsigned short s;
char *ptr;
ptr = curkey->mv_data + curkey->mv_size - 2;
memcpy(&s, ptr, 2);
bv2.bv_val = curkey->mv_data;
bv2.bv_len = curkey->mv_size - 3;
if (s)
bv2.bv_len -= (s+1);
bv1.bv_val = usrkey[1].mv_data;
bv1.bv_len = usrkey[1].mv_size;
if (ad) {
MatchingRule *mr = ad->ad_type->sat_equality;
rc = mr->smr_match(&match, SLAP_MR_EQUALITY
| SLAP_MR_VALUE_OF_ASSERTION_SYNTAX
| SLAP_MR_ASSERTED_VALUE_NORMALIZED_MATCH
| SLAP_MR_ATTRIBUTE_VALUE_NORMALIZED_MATCH,
ad->ad_type->sat_syntax, mr, &bv1, &bv2);
} else {
match = ber_bvcmp(&bv1, &bv2);
}
return match;
}
/* Values are stored as
* [normalized-value NUL ] original-value NUL 2-byte-len
* The trailing 2-byte-len is zero if there is no normalized value.
@ -68,7 +108,7 @@ mdb_id2v_compare(
int mdb_mval_put(Operation *op, MDB_cursor *mc, ID id, Attribute *a)
{
struct mdb_info *mdb = (struct mdb_info *) op->o_bd->be_private;
MDB_val key, data;
MDB_val key, data[3];
char *buf;
char ivk[ID2VKSZ];
unsigned i;
@ -80,14 +120,24 @@ int mdb_mval_put(Operation *op, MDB_cursor *mc, ID id, Attribute *a)
memcpy(ivk+sizeof(ID), &s, 2);
key.mv_data = &ivk;
key.mv_size = sizeof(ivk);
if ((a->a_desc->ad_type->sat_flags & SLAP_AT_ORDERED) || a->a_desc == slap_schema.si_ad_objectClass)
data[2].mv_data = NULL;
else
data[2].mv_data = a->a_desc;
for (i=0; i<a->a_numvals; i++) {
len = a->a_nvals[i].bv_len + 1 + 2;
if (a->a_nvals != a->a_vals)
if (a->a_nvals != a->a_vals) {
len += a->a_vals[i].bv_len + 1;
data.mv_size = len;
data[1].mv_data = a->a_nvals[i].bv_val;
data[1].mv_size = a->a_nvals[i].bv_len;
} else {
data[1].mv_data = a->a_vals[i].bv_val;
data[1].mv_size = a->a_vals[i].bv_len;
}
data[0].mv_size = len;
buf = op->o_tmpalloc( len, op->o_tmpmemctx );
data.mv_data = buf;
data[0].mv_data = buf;
memcpy(buf, a->a_nvals[i].bv_val, a->a_nvals[i].bv_len);
buf += a->a_nvals[i].bv_len;
*buf++ = 0;
@ -101,8 +151,8 @@ int mdb_mval_put(Operation *op, MDB_cursor *mc, ID id, Attribute *a)
*buf++ = 0;
*buf++ = 0;
}
rc = mdb_cursor_put(mc, &key, &data, 0);
op->o_tmpfree( data.mv_data, op->o_tmpmemctx );
rc = mdb_cursor_put(mc, &key, data, 0);
op->o_tmpfree( data[0].mv_data, op->o_tmpmemctx );
if (rc)
return rc;
}
@ -112,7 +162,7 @@ int mdb_mval_put(Operation *op, MDB_cursor *mc, ID id, Attribute *a)
int mdb_mval_del(Operation *op, MDB_cursor *mc, ID id, Attribute *a)
{
struct mdb_info *mdb = (struct mdb_info *) op->o_bd->be_private;
MDB_val key, data;
MDB_val key, data[3];
char *ptr;
char ivk[ID2VKSZ];
unsigned i;
@ -124,12 +174,23 @@ int mdb_mval_del(Operation *op, MDB_cursor *mc, ID id, Attribute *a)
memcpy(ivk+sizeof(ID), &s, 2);
key.mv_data = &ivk;
key.mv_size = sizeof(ivk);
if ((a->a_desc->ad_type->sat_flags & SLAP_AT_ORDERED) || a->a_desc == slap_schema.si_ad_objectClass)
data[2].mv_data = NULL;
else
data[2].mv_data = a->a_desc;
if (a->a_numvals) {
for (i=0; i<a->a_numvals; i++) {
data.mv_data = a->a_nvals[i].bv_val;
data.mv_size = a->a_nvals[i].bv_len+1;
rc = mdb_cursor_get(mc, &key, &data, MDB_GET_BOTH_RANGE);
data[0].mv_data = a->a_nvals[i].bv_val;
data[0].mv_size = a->a_nvals[i].bv_len+1;
if (a->a_nvals != a->a_vals) {
data[1].mv_data = a->a_nvals[i].bv_val;
data[1].mv_size = a->a_nvals[i].bv_len;
} else {
data[1].mv_data = a->a_vals[i].bv_val;
data[1].mv_size = a->a_vals[i].bv_len;
}
rc = mdb_cursor_get(mc, &key, data, MDB_GET_BOTH_RANGE);
if (rc)
return rc;
rc = mdb_cursor_del(mc, 0);
@ -137,7 +198,7 @@ int mdb_mval_del(Operation *op, MDB_cursor *mc, ID id, Attribute *a)
return rc;
}
} else {
rc = mdb_cursor_get(mc, &key, &data, MDB_SET);
rc = mdb_cursor_get(mc, &key, data, MDB_SET);
if (rc)
return rc;
rc = mdb_cursor_del(mc, MDB_NODUPDATA);
@ -148,7 +209,7 @@ int mdb_mval_del(Operation *op, MDB_cursor *mc, ID id, Attribute *a)
static int mdb_mval_get(Operation *op, MDB_cursor *mc, ID id, Attribute *a, int have_nvals)
{
struct mdb_info *mdb = (struct mdb_info *) op->o_bd->be_private;
MDB_val key, data;
MDB_val key, data[3];
char *ptr;
char ivk[ID2VKSZ];
unsigned i;
@ -161,28 +222,34 @@ static int mdb_mval_get(Operation *op, MDB_cursor *mc, ID id, Attribute *a, int
key.mv_data = &ivk;
key.mv_size = sizeof(ivk);
/* not needed */
if ((a->a_desc->ad_type->sat_flags & SLAP_AT_ORDERED) || a->a_desc == slap_schema.si_ad_objectClass)
data[2].mv_data = NULL;
else
data[2].mv_data = a->a_desc;
if (have_nvals)
a->a_nvals = a->a_vals + a->a_numvals + 1;
else
a->a_nvals = a->a_vals;
for (i=0; i<a->a_numvals; i++) {
if (!i)
rc = mdb_cursor_get(mc, &key, &data, MDB_SET);
rc = mdb_cursor_get(mc, &key, data, MDB_SET);
else
rc = mdb_cursor_get(mc, &key, &data, MDB_NEXT_DUP);
rc = mdb_cursor_get(mc, &key, data, MDB_NEXT_DUP);
if (rc)
return rc;
ptr = (char*)data.mv_data + data.mv_size - 2;
ptr = (char*)data[0].mv_data + data[0].mv_size - 2;
memcpy(&s, ptr, 2);
if (have_nvals) {
a->a_nvals[i].bv_val = data.mv_data;
a->a_nvals[i].bv_val = data[0].mv_data;
a->a_vals[i].bv_len = s;
a->a_vals[i].bv_val = ptr - a->a_vals[i].bv_len - 1;
a->a_nvals[i].bv_len = a->a_vals[i].bv_val - a->a_nvals[i].bv_val - 1;
} else {
assert(!s);
a->a_vals[i].bv_val = data.mv_data;
a->a_vals[i].bv_len = data.mv_size - 3;
a->a_vals[i].bv_val = data[0].mv_data;
a->a_vals[i].bv_len = data[0].mv_size - 3;
}
}
BER_BVZERO(&a->a_vals[i]);
@ -842,13 +909,6 @@ static int mdb_entry_partsize(struct mdb_info *mdb, MDB_txn *txn, Entry *e,
* attribute's values are already sorted. If the MDB_AT_MULTI bit of the
* attr index is set, the values are stored separately.
*
* Unfortunately, MDB_AT_MULTI and MDB_AT_SORTED are mutually exclusive;
* the DB stores big multi-valued attrs in sorted order but the sorting
* is according to the DB order which is not guaranteed to match the
* attribute's matching rule ordering. So, drop the sorted flag on big
* multi-val attributes, the values will need to be resorted when the
* entry is read again.
*
* If the MDB_AT_NVALS bit of numvals is set, the attribute also has
* normalized values present. (Note - a_numvals is an unsigned int, so this
* means it's possible to receive an attribute that we can't encode due
@ -890,7 +950,7 @@ static int mdb_entry_encode(Operation *op, Entry *e, MDB_val *data, Ecount *eh)
l = mdb->mi_adxs[a->a_desc->ad_index];
if (a->a_flags & SLAP_ATTR_BIG_MULTI)
l |= MDB_AT_MULTI;
else if (a->a_flags & SLAP_ATTR_SORTED_VALS)
if (a->a_flags & SLAP_ATTR_SORTED_VALS)
l |= MDB_AT_SORTED;
*lp++ = l;
l = a->a_numvals;

View File

@ -229,9 +229,10 @@ mdb_db_open( BackendDB *be, ConfigReply *cr )
if ( i == MDB_ID2ENTRY )
mdb_set_compare( txn, mdb->mi_dbis[i], mdb_id_compare );
else if ( i == MDB_ID2VAL )
else if ( i == MDB_ID2VAL ) {
mdb_set_compare( txn, mdb->mi_dbis[i], mdb_id2v_compare );
else if ( i == MDB_DN2ID ) {
mdb_set_dupsort( txn, mdb->mi_dbis[i], mdb_id2v_dupsort );
} else if ( i == MDB_DN2ID ) {
MDB_cursor *mc;
MDB_val key, data;
ID id;

View File

@ -169,6 +169,7 @@ int mdb_filter_candidates(
*/
MDB_cmp_func mdb_id2v_compare;
MDB_cmp_func mdb_id2v_dupsort;
int mdb_id2entry_add(
Operation *op,