Merge branch 'mdb.master' of /home/hyc/OD/mdb

This commit is contained in:
Howard Chu 2011-09-06 13:55:02 -07:00
commit ee3f050000

View File

@ -363,18 +363,19 @@ struct MDB_cursor {
#define METADATA(p) ((void *)((char *)(p) + PAGEHDRSZ))
/* We guarantee 2-byte alignment for nodes */
typedef struct MDB_node {
#define mn_pgno mn_p.np_pgno
#define mn_dsize mn_p.np_dsize
union {
pgno_t np_pgno; /* child page number */
uint32_t np_dsize; /* leaf data size */
} mn_p;
unsigned int mn_flags:4;
unsigned int mn_ksize:12; /* key size */
/* lo and hi are used for data size on leaf nodes and for
* child pgno on branch nodes. On 64 bit platforms, flags
* is also used for pgno. (branch nodes ignore flags)
*/
unsigned short mn_lo;
unsigned short mn_hi;
unsigned short mn_flags;
#define F_BIGDATA 0x01 /* data put on overflow page */
#define F_SUBDATA 0x02 /* data is a sub-database */
#define F_DUPDATA 0x04 /* data has duplicates */
unsigned short mn_ksize; /* key size */
char mn_data[1];
} MDB_node;
@ -455,8 +456,19 @@ struct MDB_env {
#define NODEPTR(p, i) ((MDB_node *)((char *)(p) + (p)->mp_ptrs[i]))
#define NODEKEY(node) (void *)((node)->mn_data)
#define NODEDATA(node) (void *)((char *)(node)->mn_data + (node)->mn_ksize)
#define NODEPGNO(node) ((node)->mn_pgno)
#define NODEDSZ(node) ((node)->mn_dsize)
#if LONG_MAX == 0x7fffffff
#define NODEPGNO(node) ((node)->mn_lo | ((node)->mn_hi << 16))
#define SETPGNO(node,pgno) do { \
(node)->mn_lo = (pgno) & 0xffff; (node)->mn_hi = (pgno) >> 16;} while(0)
#else
#define NODEPGNO(node) ((node)->mn_lo | ((node)->mn_hi << 16) | ((unsigned long)(node)->mn_flags << 32))
#define SETPGNO(node,pgno) do { \
(node)->mn_lo = (pgno) & 0xffff; (node)->mn_hi = (pgno) >> 16; \
(node)->mn_flags = (pgno) >> 32; } while(0)
#endif
#define NODEDSZ(node) ((node)->mn_lo | ((unsigned)(node)->mn_hi << 16))
#define SETDSZ(node,size) do { \
(node)->mn_lo = (size) & 0xffff; (node)->mn_hi = (size) >> 16;} while(0)
#define NODEKSZ(node) ((node)->mn_ksize)
#define LEAF2KEY(p, i, ks) ((char *)(p) + PAGEHDRSZ + ((i)*(ks)))
@ -486,10 +498,6 @@ static int mdb_add_node(MDB_txn *txn, MDB_dbi dbi, MDB_page *mp,
static void mdb_del_node(MDB_page *mp, indx_t indx, int ksize);
static int mdb_del0(MDB_cursor *mc, unsigned int ki,
MDB_pageparent *mpp, MDB_node *leaf);
#if 0
static int mdb_put0(MDB_txn *txn, MDB_dbi dbi,
MDB_val *key, MDB_val *data, unsigned int flags);
#endif
static int mdb_read_data(MDB_txn *txn, MDB_node *leaf, MDB_val *data);
static int mdb_rebalance(MDB_txn *txn, MDB_dbi dbi, MDB_pageparent *mp);
@ -753,7 +761,7 @@ mdb_touch(MDB_txn *txn, MDB_dbi dbi, MDB_pageparent *pp)
/* Update the page number to new touched page. */
if (pp->mp_parent != NULL)
NODEPGNO(NODEPTR(pp->mp_parent, pp->mp_pi)) = mp->mp_pgno;
SETPGNO(NODEPTR(pp->mp_parent, pp->mp_pi), mp->mp_pgno);
pp->mp_page = mp;
}
return 0;
@ -2267,14 +2275,14 @@ mdb_read_data(MDB_txn *txn, MDB_node *leaf, MDB_val *data)
int rc;
if (!F_ISSET(leaf->mn_flags, F_BIGDATA)) {
data->mv_size = leaf->mn_dsize;
data->mv_size = NODEDSZ(leaf);
data->mv_data = NODEDATA(leaf);
return MDB_SUCCESS;
}
/* Read overflow data.
*/
data->mv_size = leaf->mn_dsize;
data->mv_size = NODEDSZ(leaf);
memcpy(&pgno, NODEDATA(leaf), sizeof(pgno));
if ((rc = mdb_get_page(txn, pgno, &omp))) {
DPRINTF("read overflow page %lu failed", pgno);
@ -2653,7 +2661,7 @@ set3:
MDB_val d2;
if ((rc = mdb_read_data(cursor->mc_txn, leaf, &d2)) != MDB_SUCCESS)
return rc;
rc = cursor->mc_txn->mt_dbxs[cursor->mc_dbi].md_cmp(data, &d2);
rc = cursor->mc_txn->mt_dbxs[cursor->mc_dbi].md_dcmp(data, &d2);
if (rc) {
if (op == MDB_GET_BOTH || rc > 0)
return MDB_NOTFOUND;
@ -3117,7 +3125,7 @@ mdb_cursor_del(MDB_cursor *mc, unsigned int flags)
MDB_pageparent mp2;
if (flags != MDB_NODUPDATA) {
/* mdb_xcursor_init2(mc); */
mdb_xcursor_init2(mc);
rc = mdb_cursor_del(&mc->mc_xcursor->mx_cursor, 0);
mdb_xcursor_fini(mc);
/* If sub-DB still has entries, we're done */
@ -3243,6 +3251,7 @@ mdb_leaf_size(MDB_env *env, MDB_val *key, MDB_val *data)
/* put on overflow page */
sz -= data->mv_size - sizeof(pgno_t);
}
sz += sz & 1;
return sz + sizeof(indx_t);
}
@ -3318,6 +3327,7 @@ mdb_add_node(MDB_txn *txn, MDB_dbi dbi, MDB_page *mp, indx_t indx,
node_size += data->mv_size;
}
}
node_size += node_size & 1;
if (node_size + sizeof(indx_t) > SIZELEFT(mp)) {
DPRINTF("not enough room in page %lu, got %u ptrs",
@ -3344,9 +3354,9 @@ mdb_add_node(MDB_txn *txn, MDB_dbi dbi, MDB_page *mp, indx_t indx,
node->mn_ksize = (key == NULL) ? 0 : key->mv_size;
node->mn_flags = flags;
if (IS_LEAF(mp))
node->mn_dsize = data->mv_size;
SETDSZ(node,data->mv_size);
else
NODEPGNO(node) = pgno;
SETPGNO(node,pgno);
if (key)
memcpy(NODEKEY(node), key->mv_data, key->mv_size);
@ -3400,6 +3410,7 @@ mdb_del_node(MDB_page *mp, indx_t indx, int ksize)
else
sz += NODEDSZ(node);
}
sz += sz & 1;
ptr = mp->mp_ptrs[indx];
numkeys = NUMKEYS(mp);
@ -3457,6 +3468,7 @@ mdb_xcursor_init1(MDB_cursor *mc, MDB_page *mp, MDB_node *node)
mx->mx_dbs[1] = mc->mc_txn->mt_dbs[1];
if (mc->mc_dbi > 1) {
mx->mx_dbs[2] = mc->mc_txn->mt_dbs[mc->mc_dbi];
mx->mx_dbxs[2].md_dirty = mc->mc_txn->mt_dbxs[mc->mc_dbi].md_dirty;
dbn = 3;
} else {
dbn = 2;
@ -3482,6 +3494,7 @@ mdb_xcursor_init2(MDB_cursor *mc)
mx->mx_dbs[1] = mc->mc_txn->mt_dbs[1];
if (mc->mc_dbi > 1) {
mx->mx_dbs[2] = mc->mc_txn->mt_dbs[mc->mc_dbi];
mx->mx_dbxs[2].md_dirty = mc->mc_txn->mt_dbxs[mc->mc_dbi].md_dirty;
dbn = 3;
} else {
dbn = 2;
@ -3498,8 +3511,6 @@ mdb_xcursor_fini(MDB_cursor *mc)
mc->mc_txn->mt_next_pgno = mx->mx_txn.mt_next_pgno;
mc->mc_txn->mt_dbs[0] = mx->mx_dbs[0];
mc->mc_txn->mt_dbs[1] = mx->mx_dbs[1];
mc->mc_txn->mt_dbxs[0].md_dirty = mx->mx_dbxs[0].md_dirty;
mc->mc_txn->mt_dbxs[1].md_dirty = mx->mx_dbxs[1].md_dirty;
if (mc->mc_dbi > 1) {
mc->mc_txn->mt_dbs[mc->mc_dbi] = mx->mx_dbs[2];
mc->mc_txn->mt_dbxs[mc->mc_dbi].md_dirty = mx->mx_dbxs[2].md_dirty;
@ -4096,6 +4107,7 @@ mdb_split(MDB_txn *txn, MDB_dbi dbi, MDB_page **mpp, unsigned int *newindxp,
psize += sizeof(pgno_t);
else
psize += NODEDSZ(node);
psize += psize & 1;
if (psize > pmax) {
split_indx = i;
break;
@ -4110,6 +4122,7 @@ mdb_split(MDB_txn *txn, MDB_dbi dbi, MDB_page **mpp, unsigned int *newindxp,
psize += sizeof(pgno_t);
else
psize += NODEDSZ(node);
psize += psize & 1;
if (psize > pmax) {
split_indx = i+1;
break;
@ -4199,7 +4212,7 @@ newsep:
rkey.mv_size = node->mn_ksize;
if (IS_LEAF(&mdp->p)) {
rdata.mv_data = NODEDATA(node);
rdata.mv_size = node->mn_dsize;
rdata.mv_size = NODEDSZ(node);
} else
pgno = NODEPGNO(node);
flags = node->mn_flags;
@ -4226,164 +4239,6 @@ newsep:
return rc;
}
#if 0
static int
mdb_put0(MDB_txn *txn, MDB_dbi dbi,
MDB_val *key, MDB_val *data, unsigned int flags)
{
int rc = MDB_SUCCESS, exact;
unsigned int ki;
MDB_node *leaf;
MDB_pageparent mpp;
MDB_val xdata, *rdata, dkey;
MDB_db dummy;
char dbuf[PAGESIZE];
int do_sub = 0;
size_t nsize;
DKBUF;
DPRINTF("==> put db %u key [%s], size %zu, data size %zu",
dbi, DKEY(key), key->mv_size, data->mv_size);
dkey.mv_size = 0;
mpp.mp_parent = NULL;
mpp.mp_pi = 0;
rc = mdb_search_page(txn, dbi, key, NULL, 1, &mpp);
if (rc == MDB_SUCCESS) {
leaf = mdb_search_node(txn, dbi, mpp.mp_page, key, &exact, &ki);
if (leaf && exact) {
if (flags == MDB_NOOVERWRITE) {
DPRINTF("duplicate key [%s]", DKEY(key));
return MDB_KEYEXIST;
}
/* there's only a key anyway, so this is a no-op */
if (IS_LEAF2(mpp.mp_page))
return MDB_SUCCESS;
if (F_ISSET(txn->mt_dbs[dbi].md_flags, MDB_DUPSORT)) {
/* Was a single item before, must convert now */
if (!F_ISSET(leaf->mn_flags, F_DUPDATA)) {
dkey.mv_size = NODEDSZ(leaf);
dkey.mv_data = dbuf;
memcpy(dbuf, NODEDATA(leaf), dkey.mv_size);
/* data matches, ignore it */
if (!mdb_dcmp(txn, dbi, data, &dkey))
return (flags == MDB_NODUPDATA) ? MDB_KEYEXIST : MDB_SUCCESS;
memset(&dummy, 0, sizeof(dummy));
if (txn->mt_dbs[dbi].md_flags & MDB_DUPFIXED) {
dummy.md_pad = data->mv_size;
dummy.md_flags = MDB_DUPFIXED;
if (txn->mt_dbs[dbi].md_flags & MDB_INTEGERDUP)
dummy.md_flags |= MDB_INTEGERKEY;
}
dummy.md_root = P_INVALID;
if (dkey.mv_size == sizeof(MDB_db)) {
memcpy(NODEDATA(leaf), &dummy, sizeof(dummy));
goto put_sub;
}
mdb_del_node(mpp.mp_page, ki, 0);
do_sub = 1;
rdata = &xdata;
xdata.mv_size = sizeof(MDB_db);
xdata.mv_data = &dummy;
goto new_sub;
}
goto put_sub;
}
/* same size, just replace it */
if (!F_ISSET(leaf->mn_flags, F_BIGDATA) &&
NODEDSZ(leaf) == data->mv_size) {
memcpy(NODEDATA(leaf), data->mv_data, data->mv_size);
goto done;
}
mdb_del_node(mpp.mp_page, ki, 0);
}
if (leaf == NULL) { /* append if not found */
ki = NUMKEYS(mpp.mp_page);
DPRINTF("appending key at index %i", ki);
}
} else if (rc == MDB_NOTFOUND) {
MDB_dpage *dp;
/* new file, just write a root leaf page */
DPUTS("allocating new root leaf page");
if ((dp = mdb_new_page(txn, dbi, P_LEAF, 1)) == NULL) {
return ENOMEM;
}
mpp.mp_page = &dp->p;
txn->mt_dbs[dbi].md_root = mpp.mp_page->mp_pgno;
txn->mt_dbs[dbi].md_depth++;
txn->mt_dbxs[dbi].md_dirty = 1;
if ((txn->mt_dbs[dbi].md_flags & (MDB_DUPSORT|MDB_DUPFIXED)) == MDB_DUPFIXED)
mpp.mp_page->mp_flags |= P_LEAF2;
ki = 0;
}
else
goto done;
assert(IS_LEAF(mpp.mp_page));
DPRINTF("there are %u keys, should insert new key at index %i",
NUMKEYS(mpp.mp_page), ki);
rdata = data;
new_sub:
nsize = IS_LEAF2(mpp.mp_page) ? key->mv_size : mdb_leaf_size(txn->mt_env, key, rdata);
if (SIZELEFT(mpp.mp_page) < nsize) {
rc = mdb_split(txn, dbi, &mpp.mp_page, &ki, key, rdata, P_INVALID);
} else {
/* There is room already in this leaf page. */
rc = mdb_add_node(txn, dbi, mpp.mp_page, ki, key, rdata, 0, 0);
}
if (rc != MDB_SUCCESS)
txn->mt_flags |= MDB_TXN_ERROR;
else {
/* Remember if we just added a subdatabase */
if (flags & F_SUBDATA) {
leaf = NODEPTR(mpp.mp_page, ki);
leaf->mn_flags |= F_SUBDATA;
}
/* Now store the actual data in the child DB. Note that we're
* storing the user data in the keys field, so there are strict
* size limits on dupdata. The actual data fields of the child
* DB are all zero size.
*/
if (do_sub) {
MDB_cursor mc;
MDB_xcursor mx;
leaf = NODEPTR(mpp.mp_page, ki);
put_sub:
mc.mc_txn = txn;
mc.mc_dbi = dbi;
mc.mc_flags = 0;
mc.mc_xcursor = &mx;
mdb_xcursor_init0(&mc);
mdb_xcursor_init1(txn, dbi, &mx, mpp.mp_page, leaf);
xdata.mv_size = 0;
xdata.mv_data = "";
if (flags == MDB_NODUPDATA)
flags = MDB_NOOVERWRITE;
/* converted, write the original data first */
if (dkey.mv_size) {
rc = mdb_put0(&mx.mx_txn, mx.mx_cursor.mc_dbi, &dkey, &xdata, flags);
if (rc) return rc;
leaf->mn_flags |= F_DUPDATA;
}
rc = mdb_put0(&mx.mx_txn, mx.mx_cursor.mc_dbi, data, &xdata, flags);
mdb_xcursor_fini(&mc);
memcpy(NODEDATA(leaf), &mx.mx_txn.mt_dbs[mx.mx_cursor.mc_dbi],
sizeof(MDB_db));
}
txn->mt_dbs[dbi].md_entries++;
}
done:
return rc;
}
#endif
int
mdb_put(MDB_txn *txn, MDB_dbi dbi,
MDB_val *key, MDB_val *data, unsigned int flags)