Merge branch 'mdb.master' of ssh://git-master.openldap.org/~git/git/openldap

This commit is contained in:
Howard Chu 2011-09-17 04:02:52 -07:00
commit a05cbba77a

View File

@ -512,6 +512,7 @@ typedef struct MDB_page {
pgno_t p_pgno; /**< page number */
void * p_next; /**< for in-memory list of freed structs */
} mp_p;
uint16_t mp_pad;
/** @defgroup mdb_page Page Flags
* @ingroup internal
* Flags for the page headers.
@ -523,8 +524,9 @@ typedef struct MDB_page {
#define P_META 0x08 /**< meta page */
#define P_DIRTY 0x10 /**< dirty page */
#define P_LEAF2 0x20 /**< for #MDB_DUPFIXED records */
#define P_SUBP 0x40 /**< for #MDB_DUPSORT sub-pages */
/** @} */
uint32_t mp_flags; /**< @ref mdb_page */
uint16_t mp_flags; /**< @ref mdb_page */
#define mp_lower mp_pb.pb.pb_lower
#define mp_upper mp_pb.pb.pb_upper
#define mp_pages mp_pb.pb_pages
@ -566,6 +568,8 @@ typedef struct MDB_page {
#define IS_BRANCH(p) F_ISSET((p)->mp_flags, P_BRANCH)
/** Test if a page is an overflow page */
#define IS_OVERFLOW(p) F_ISSET((p)->mp_flags, P_OVERFLOW)
/** Test if a page is a sub page */
#define IS_SUBP(p) F_ISSET((p)->mp_flags, P_SUBP)
/** The number of overflow pages needed to store the given size. */
#define OVPAGES(size, psize) ((PAGEHDRSZ-1 + (size)) / (psize) + 1)
@ -666,9 +670,6 @@ typedef struct MDB_db {
/** Handle for the default DB. */
#define MAIN_DBI 1
/** Identify a data item as a valid sub-DB record */
#define MDB_SUBDATA 0x8200
/** Meta page content. */
typedef struct MDB_meta {
/** Stamp identifying this as an MDB data file. It must be set
@ -873,6 +874,7 @@ static MDB_node *mdb_node_search(MDB_cursor *mc, MDB_val *key, int *exactp);
static int mdb_node_add(MDB_cursor *mc, indx_t indx,
MDB_val *key, MDB_val *data, pgno_t pgno, uint8_t flags);
static void mdb_node_del(MDB_page *mp, indx_t indx, int ksize);
static void mdb_node_shrink(MDB_page *mp, indx_t indx);
static int mdb_node_move(MDB_cursor *csrc, MDB_cursor *cdst);
static int mdb_node_read(MDB_txn *txn, MDB_node *leaf, MDB_val *data);
static size_t mdb_leaf_size(MDB_env *env, MDB_val *key, MDB_val *data);
@ -2561,8 +2563,8 @@ mdb_node_search(MDB_cursor *mc, MDB_val *key, int *exactp)
nkeys = NUMKEYS(mp);
DPRINTF("searching %u keys in %s page %zu",
nkeys, IS_LEAF(mp) ? "leaf" : "branch",
DPRINTF("searching %u keys in %s %spage %zu",
nkeys, IS_LEAF(mp) ? "leaf" : "branch", IS_SUBP(mp) ? "sub-" : "",
mp->mp_pgno);
assert(nkeys > 0);
@ -2984,6 +2986,7 @@ mdb_cursor_next(MDB_cursor *mc, MDB_val *key, MDB_val *data, MDB_cursor_op op)
DPUTS("=====> move to next sibling page");
if (mdb_cursor_sibling(mc, 1) != MDB_SUCCESS) {
mc->mc_flags |= C_EOF;
mc->mc_flags &= ~C_INITIALIZED;
return MDB_NOTFOUND;
}
mp = mc->mc_pg[mc->mc_top];
@ -3113,6 +3116,10 @@ mdb_cursor_set(MDB_cursor *mc, MDB_val *key, MDB_val *data,
MDB_val nodekey;
mp = mc->mc_pg[mc->mc_top];
if (!NUMKEYS(mp)) {
mc->mc_ki[mc->mc_top] = 0;
return MDB_NOTFOUND;
}
if (mp->mp_flags & P_LEAF2) {
nodekey.mv_size = mc->mc_db->md_pad;
nodekey.mv_data = LEAF2KEY(mp, 0, nodekey.mv_size);
@ -3170,6 +3177,11 @@ mdb_cursor_set(MDB_cursor *mc, MDB_val *key, MDB_val *data,
return MDB_NOTFOUND;
}
}
if (!mc->mc_top) {
/* There are no other pages */
mc->mc_ki[mc->mc_top] = 0;
return MDB_NOTFOUND;
}
}
rc = mdb_page_search(mc, key, 0);
@ -3257,9 +3269,11 @@ mdb_cursor_first(MDB_cursor *mc, MDB_val *key, MDB_val *data)
int rc;
MDB_node *leaf;
rc = mdb_page_search(mc, NULL, 0);
if (rc != MDB_SUCCESS)
return rc;
if (!(mc->mc_flags & C_INITIALIZED) || mc->mc_top) {
rc = mdb_page_search(mc, NULL, 0);
if (rc != MDB_SUCCESS)
return rc;
}
assert(IS_LEAF(mc->mc_pg[mc->mc_top]));
leaf = NODEPTR(mc->mc_pg[mc->mc_top], 0);
@ -3302,9 +3316,11 @@ mdb_cursor_last(MDB_cursor *mc, MDB_val *key, MDB_val *data)
lkey.mv_size = MAXKEYSIZE+1;
lkey.mv_data = NULL;
rc = mdb_page_search(mc, &lkey, 0);
if (rc != MDB_SUCCESS)
return rc;
if (!(mc->mc_flags & C_INITIALIZED) || mc->mc_top) {
rc = mdb_page_search(mc, &lkey, 0);
if (rc != MDB_SUCCESS)
return rc;
}
assert(IS_LEAF(mc->mc_pg[mc->mc_top]));
leaf = NODEPTR(mc->mc_pg[mc->mc_top], NUMKEYS(mc->mc_pg[mc->mc_top])-1);
@ -3488,12 +3504,14 @@ mdb_cursor_put(MDB_cursor *mc, MDB_val *key, MDB_val *data,
{
MDB_node *leaf;
MDB_val xdata, *rdata, dkey;
MDB_page *fp;
MDB_db dummy;
char dbuf[PAGESIZE];
int do_sub = 0;
size_t nsize;
DKBUF;
int rc, rc2;
char pbuf[PAGESIZE];
char dbuf[MAXKEYSIZE+1];
DKBUF;
if (F_ISSET(mc->mc_txn->mt_flags, MDB_TXN_RDONLY))
return EACCES;
@ -3564,37 +3582,111 @@ top:
if (F_ISSET(mc->mc_db->md_flags, MDB_DUPSORT)) {
/* Was a single item before, must convert now */
if (!F_ISSET(leaf->mn_flags, F_DUPDATA)) {
/* Just overwrite the current item */
if (flags == MDB_CURRENT)
goto current;
/* create a fake page for the dup items */
dkey.mv_size = NODEDSZ(leaf);
dkey.mv_data = dbuf;
memcpy(dbuf, NODEDATA(leaf), dkey.mv_size);
dkey.mv_data = NODEDATA(leaf);
/* data matches, ignore it */
if (!mc->mc_dbx->md_dcmp(data, &dkey))
return (flags == MDB_NODUPDATA) ? MDB_KEYEXIST : MDB_SUCCESS;
memset(&dummy, 0, sizeof(dummy));
memcpy(dbuf, dkey.mv_data, dkey.mv_size);
dkey.mv_data = dbuf;
fp = (MDB_page *)pbuf;
fp->mp_pgno = mc->mc_pg[mc->mc_top]->mp_pgno;
fp->mp_flags = P_LEAF|P_DIRTY|P_SUBP;
fp->mp_lower = PAGEHDRSZ;
fp->mp_upper = PAGEHDRSZ + dkey.mv_size + data->mv_size;
if (mc->mc_db->md_flags & MDB_DUPFIXED) {
dummy.md_pad = data->mv_size;
dummy.md_flags = MDB_DUPFIXED;
if (mc->mc_db->md_flags & MDB_INTEGERDUP)
dummy.md_flags |= MDB_INTEGERKEY;
}
dummy.md_flags |= MDB_SUBDATA;
dummy.md_root = P_INVALID;
if (dkey.mv_size == sizeof(MDB_db)) {
memcpy(NODEDATA(leaf), &dummy, sizeof(dummy));
goto put_sub;
fp->mp_flags |= P_LEAF2;
fp->mp_pad = data->mv_size;
} else {
fp->mp_upper += 2 * sizeof(indx_t) + 2 * NODESIZE +
(dkey.mv_size & 1) + (data->mv_size & 1);
}
mdb_node_del(mc->mc_pg[mc->mc_top], mc->mc_ki[mc->mc_top], 0);
do_sub = 1;
rdata = &xdata;
xdata.mv_size = sizeof(MDB_db);
xdata.mv_data = &dummy;
/* new sub-DB, must fully init xcursor */
if (flags == MDB_CURRENT)
flags = 0;
xdata.mv_size = fp->mp_upper;
xdata.mv_data = pbuf;
flags |= F_DUPDATA;
goto new_sub;
}
if (!F_ISSET(leaf->mn_flags, F_SUBDATA)) {
/* See if we need to convert from fake page to subDB */
MDB_page *mp;
unsigned int offset;
unsigned int i;
fp = NODEDATA(leaf);
if (flags == MDB_CURRENT) {
fp->mp_flags |= P_DIRTY;
fp->mp_pgno = mc->mc_pg[mc->mc_top]->mp_pgno;
mc->mc_xcursor->mx_cursor.mc_pg[0] = fp;
flags |= F_DUPDATA;
goto put_sub;
}
if (mc->mc_db->md_flags & MDB_DUPFIXED) {
offset = fp->mp_pad;
} else {
offset = NODESIZE + sizeof(indx_t) + data->mv_size;
}
offset += offset & 1;
if (NODEDSZ(leaf) + offset >= mc->mc_txn->mt_env->me_psize / MDB_MINKEYS) {
/* yes, convert it */
dummy.md_flags = 0;
if (mc->mc_db->md_flags & MDB_DUPFIXED) {
dummy.md_pad = fp->mp_pad;
dummy.md_flags = MDB_DUPFIXED;
if (mc->mc_db->md_flags & MDB_INTEGERDUP)
dummy.md_flags |= MDB_INTEGERKEY;
}
dummy.md_depth = 1;
dummy.md_branch_pages = 0;
dummy.md_leaf_pages = 1;
dummy.md_overflow_pages = 0;
dummy.md_entries = NUMKEYS(fp);
rdata = &xdata;
xdata.mv_size = sizeof(MDB_db);
xdata.mv_data = &dummy;
mp = mdb_page_alloc(mc, 1);
if (!mp)
return ENOMEM;
offset = mc->mc_txn->mt_env->me_psize - NODEDSZ(leaf);
flags |= F_DUPDATA|F_SUBDATA;
dummy.md_root = mp->mp_pgno;
} else {
/* no, just grow it */
rdata = &xdata;
xdata.mv_size = NODEDSZ(leaf) + offset;
xdata.mv_data = pbuf;
mp = (MDB_page *)pbuf;
mp->mp_pgno = mc->mc_pg[mc->mc_top]->mp_pgno;
flags |= F_DUPDATA;
}
mp->mp_flags = fp->mp_flags | P_DIRTY;
mp->mp_pad = fp->mp_pad;
mp->mp_lower = fp->mp_lower;
mp->mp_upper = fp->mp_upper + offset;
if (IS_LEAF2(fp)) {
memcpy(METADATA(mp), METADATA(fp), NUMKEYS(fp) * fp->mp_pad);
} else {
nsize = NODEDSZ(leaf) - fp->mp_upper;
memcpy((char *)mp + mp->mp_upper, (char *)fp + fp->mp_upper, nsize);
for (i=0; i<NUMKEYS(fp); i++)
mp->mp_ptrs[i] = fp->mp_ptrs[i] + offset;
}
mdb_node_del(mc->mc_pg[mc->mc_top], mc->mc_ki[mc->mc_top], 0);
do_sub = 1;
goto new_sub;
}
/* data is on sub-DB, just store it */
flags |= F_DUPDATA|F_SUBDATA;
goto put_sub;
}
current:
/* same size, just replace it */
if (!F_ISSET(leaf->mn_flags, F_BIGDATA) &&
NODEDSZ(leaf) == data->mv_size) {
@ -3621,9 +3713,9 @@ new_sub:
mc->mc_txn->mt_flags |= MDB_TXN_ERROR;
else {
/* Remember if we just added a subdatabase */
if (flags & F_SUBDATA) {
if (flags & (F_SUBDATA|F_DUPDATA)) {
leaf = NODEPTR(mc->mc_pg[mc->mc_top], mc->mc_ki[mc->mc_top]);
leaf->mn_flags |= F_SUBDATA;
leaf->mn_flags |= (flags & (F_SUBDATA|F_DUPDATA));
}
/* Now store the actual data in the child DB. Note that we're
@ -3633,27 +3725,33 @@ new_sub:
*/
if (do_sub) {
MDB_db *db;
leaf = NODEPTR(mc->mc_pg[mc->mc_top], mc->mc_ki[mc->mc_top]);
int xflags;
put_sub:
if (flags != MDB_CURRENT)
mdb_xcursor_init1(mc, leaf);
xdata.mv_size = 0;
xdata.mv_data = "";
if (flags == MDB_NODUPDATA)
flags = MDB_NOOVERWRITE;
if (flags & MDB_CURRENT) {
xflags = MDB_CURRENT;
} else {
mdb_xcursor_init1(mc, leaf);
xflags = (flags & MDB_NODUPDATA) ? MDB_NOOVERWRITE : 0;
}
/* converted, write the original data first */
if (dkey.mv_size) {
rc = mdb_cursor_put(&mc->mc_xcursor->mx_cursor, &dkey, &xdata, flags);
rc = mdb_cursor_put(&mc->mc_xcursor->mx_cursor, &dkey, &xdata, xflags);
if (rc)
return rc;
leaf->mn_flags |= F_DUPDATA;
}
rc = mdb_cursor_put(&mc->mc_xcursor->mx_cursor, data, &xdata, flags);
db = NODEDATA(leaf);
assert((db->md_flags & MDB_SUBDATA) == MDB_SUBDATA);
memcpy(db, &mc->mc_xcursor->mx_db, sizeof(MDB_db));
rc = mdb_cursor_put(&mc->mc_xcursor->mx_cursor, data, &xdata, xflags);
if (flags & F_SUBDATA) {
db = NODEDATA(leaf);
memcpy(db, &mc->mc_xcursor->mx_db, sizeof(MDB_db));
}
}
mc->mc_db->md_entries++;
/* sub-writes might have failed so check rc again.
* Don't increment count if we just replaced an existing item.
*/
if (!rc && !(flags & MDB_CURRENT))
mc->mc_db->md_entries++;
}
done:
return rc;
@ -3679,48 +3777,68 @@ mdb_cursor_del(MDB_cursor *mc, unsigned int flags)
if (!IS_LEAF2(mc->mc_pg[mc->mc_top]) && F_ISSET(leaf->mn_flags, F_DUPDATA)) {
if (flags != MDB_NODUPDATA) {
if (!F_ISSET(leaf->mn_flags, F_SUBDATA)) {
mc->mc_xcursor->mx_cursor.mc_pg[0] = NODEDATA(leaf);
}
rc = mdb_cursor_del(&mc->mc_xcursor->mx_cursor, 0);
/* If sub-DB still has entries, we're done */
if (mc->mc_xcursor->mx_db.md_root != P_INVALID) {
MDB_db *db = NODEDATA(leaf);
assert((db->md_flags & MDB_SUBDATA) == MDB_SUBDATA);
memcpy(db, &mc->mc_xcursor->mx_db, sizeof(MDB_db));
if (leaf->mn_flags & F_SUBDATA) {
/* update subDB info */
MDB_db *db = NODEDATA(leaf);
memcpy(db, &mc->mc_xcursor->mx_db, sizeof(MDB_db));
} else {
/* shrink fake page */
mdb_node_shrink(mc->mc_pg[mc->mc_top], mc->mc_ki[mc->mc_top]);
}
mc->mc_db->md_entries--;
return rc;
}
/* otherwise fall thru and delete the sub-DB */
}
/* add all the child DB's pages to the free list */
rc = mdb_page_search(&mc->mc_xcursor->mx_cursor, NULL, 0);
if (rc == MDB_SUCCESS) {
MDB_node *ni;
MDB_cursor *mx;
unsigned int i;
if (leaf->mn_flags & F_SUBDATA) {
/* add all the child DB's pages to the free list */
rc = mdb_page_search(&mc->mc_xcursor->mx_cursor, NULL, 0);
if (rc == MDB_SUCCESS) {
MDB_node *ni;
MDB_cursor *mx;
unsigned int i;
mx = &mc->mc_xcursor->mx_cursor;
mc->mc_db->md_entries -=
mx->mc_db->md_entries;
mx = &mc->mc_xcursor->mx_cursor;
mc->mc_db->md_entries -=
mx->mc_db->md_entries;
mdb_cursor_pop(mx);
while (mx->mc_snum > 1) {
for (i=0; i<NUMKEYS(mx->mc_pg[mx->mc_top]); i++) {
MDB_page *mp;
pgno_t pg;
ni = NODEPTR(mx->mc_pg[mx->mc_top], i);
pg = NODEPGNO(ni);
if ((rc = mdb_page_get(mc->mc_txn, pg, &mp)))
return rc;
/* free it */
mdb_midl_append(mc->mc_txn->mt_free_pgs, pg);
mdb_cursor_pop(mx);
while (mx->mc_snum > 0) {
for (i=0; i<NUMKEYS(mx->mc_pg[mx->mc_top]); i++) {
pgno_t pg;
ni = NODEPTR(mx->mc_pg[mx->mc_top], i);
pg = NODEPGNO(ni);
/* free it */
mdb_midl_append(mc->mc_txn->mt_free_pgs, pg);
}
if (!mx->mc_top)
break;
rc = mdb_cursor_sibling(mx, 1);
if (rc) {
/* no more siblings, go back to beginning
* of previous level. (stack was already popped
* by mdb_cursor_sibling)
*/
for (i=1; i<mx->mc_top; i++) {
pgno_t pg;
ni = NODEPTR(mx->mc_pg[i-1],0);
pg = NODEPGNO(ni);
if ((rc = mdb_page_get(mc->mc_txn, pg, &mx->mc_pg[i])))
break;
}
}
}
rc = mdb_cursor_sibling(mx, 1);
if (rc)
break;
/* free it */
mdb_midl_append(mc->mc_txn->mt_free_pgs,
mx->mc_db->md_root);
}
/* free it */
mdb_midl_append(mc->mc_txn->mt_free_pgs,
mx->mc_db->md_root);
}
}
@ -3839,8 +3957,9 @@ mdb_node_add(MDB_cursor *mc, indx_t indx,
assert(mp->mp_upper >= mp->mp_lower);
DPRINTF("add to %s page %zu index %i, data size %zu key size %zu [%s]",
DPRINTF("add to %s %spage %zu index %i, data size %zu key size %zu [%s]",
IS_LEAF(mp) ? "leaf" : "branch",
IS_SUBP(mp) ? "sub-" : "",
mp->mp_pgno, indx, data ? data->mv_size : 0,
key ? key->mv_size : 0, key ? DKEY(key) : NULL);
@ -3991,6 +4110,58 @@ mdb_node_del(MDB_page *mp, indx_t indx, int ksize)
mp->mp_upper += sz;
}
/** Compact the main page after deleting a node on a subpage.
* @param[in] mp The main page to operate on.
* @param[in] indx The index of the subpage on the main page.
*/
static void
mdb_node_shrink(MDB_page *mp, indx_t indx)
{
MDB_node *node;
MDB_page *sp, *xp;
char *base;
int osize, nsize;
int delta;
indx_t i, numkeys, ptr;
node = NODEPTR(mp, indx);
sp = (MDB_page *)NODEDATA(node);
osize = NODEDSZ(node);
delta = sp->mp_upper - sp->mp_lower;
SETDSZ(node, osize - delta);
xp = (MDB_page *)((char *)sp + delta);
/* shift subpage upward */
if (IS_LEAF2(sp)) {
nsize = NUMKEYS(sp) * sp->mp_pad;
memmove(METADATA(xp), METADATA(sp), nsize);
} else {
int i;
nsize = osize - sp->mp_upper;
numkeys = NUMKEYS(sp);
for (i=numkeys-1; i>=0; i--)
xp->mp_ptrs[i] = sp->mp_ptrs[i] - delta;
}
xp->mp_upper = sp->mp_lower;
xp->mp_lower = sp->mp_lower;
xp->mp_flags = sp->mp_flags;
xp->mp_pad = sp->mp_pad;
xp->mp_pgno = mp->mp_pgno;
/* shift lower nodes upward */
ptr = mp->mp_ptrs[indx];
numkeys = NUMKEYS(mp);
for (i = 0; i < numkeys; i++) {
if (mp->mp_ptrs[i] <= ptr)
mp->mp_ptrs[i] += delta;
}
base = (char *)mp + mp->mp_upper;
memmove(base + delta, base, ptr - mp->mp_upper + NODESIZE + NODEKSZ(node));
mp->mp_upper += delta;
}
/** Initial setup of a sorted-dups cursor.
* Sorted duplicates are implemented as a sub-database for the given key.
* The duplicate data items are actually keys of the sub-database.
@ -4026,18 +4197,41 @@ mdb_xcursor_init0(MDB_cursor *mc)
static void
mdb_xcursor_init1(MDB_cursor *mc, MDB_node *node)
{
MDB_db *db = NODEDATA(node);
MDB_xcursor *mx = mc->mc_xcursor;
assert((db->md_flags & MDB_SUBDATA) == MDB_SUBDATA);
mx->mx_db = *db;
if (node->mn_flags & F_SUBDATA) {
MDB_db *db = NODEDATA(node);
mx->mx_db = *db;
mx->mx_cursor.mc_snum = 0;
mx->mx_cursor.mc_flags = 0;
} else {
MDB_page *fp = NODEDATA(node);
mx->mx_db.md_pad = mc->mc_pg[mc->mc_top]->mp_pad;
mx->mx_db.md_flags = 0;
mx->mx_db.md_depth = 1;
mx->mx_db.md_branch_pages = 0;
mx->mx_db.md_leaf_pages = 1;
mx->mx_db.md_overflow_pages = 0;
mx->mx_db.md_entries = NUMKEYS(fp);
mx->mx_db.md_root = fp->mp_pgno;
mx->mx_cursor.mc_snum = 1;
mx->mx_cursor.mc_flags = C_INITIALIZED;
mx->mx_cursor.mc_top = 0;
mx->mx_cursor.mc_pg[0] = fp;
mx->mx_cursor.mc_ki[0] = 0;
if (mc->mc_db->md_flags & MDB_DUPFIXED) {
mx->mx_db.md_flags = MDB_DUPFIXED;
mx->mx_db.md_pad = fp->mp_pad;
if (mc->mc_db->md_flags & MDB_INTEGERDUP)
mx->mx_db.md_flags |= MDB_INTEGERKEY;
}
}
DPRINTF("Sub-db %u for db %u root page %zu", mx->mx_cursor.mc_dbi, mc->mc_dbi,
db->md_root);
mx->mx_db.md_root);
if (F_ISSET(mc->mc_pg[mc->mc_top]->mp_flags, P_DIRTY))
mx->mx_dbx.md_dirty = 1;
mx->mx_dbx.md_name.mv_data = NODEKEY(node);
mx->mx_dbx.md_name.mv_size = node->mn_ksize;
mx->mx_cursor.mc_snum = 0;
mx->mx_cursor.mc_flags = 0;
if (mx->mx_dbx.md_cmp == mdb_cmp_int && mx->mx_db.md_pad == sizeof(size_t))
mx->mx_dbx.md_cmp = mdb_cmp_long;
}
@ -4468,6 +4662,7 @@ mdb_rebalance(MDB_cursor *mc)
if (PAGEFILL(mc->mc_txn->mt_env, mn.mc_pg[mn.mc_top]) >= FILL_THRESHOLD && NUMKEYS(mn.mc_pg[mn.mc_top]) >= 2)
return mdb_node_move(&mn, mc);
else { /* FIXME: if (has_enough_room()) */
mc->mc_flags &= ~C_INITIALIZED;
if (mc->mc_ki[ptop] == 0)
return mdb_page_merge(&mn, mc);
else