ITS#7515 Fix tracking of parent txn's cursors.

Restore mc_flags and xcursors, they were tracked but not merged.
Simplify: Track parent txn's original cursors after backing them
up, instead of tracking copies and merging them back at commit.
This commit is contained in:
Hallvard Furuseth 2013-07-07 17:13:27 +02:00
parent 9be6af0dcb
commit be47ca7667

View File

@ -879,8 +879,8 @@ struct MDB_xcursor;
struct MDB_cursor {
/** Next cursor on this DB in this txn */
MDB_cursor *mc_next;
/** Original cursor if this is a shadow */
MDB_cursor *mc_orig;
/** Backup of the original cursor if this cursor is a shadow */
MDB_cursor *mc_backup;
/** Context used for databases with #MDB_DUPSORT, otherwise NULL */
struct MDB_xcursor *mc_xcursor;
/** The transaction that owns this cursor */
@ -1633,57 +1633,35 @@ mdb_env_sync(MDB_env *env, int force)
return rc;
}
/** Make shadow copies of all of parent txn's cursors */
/** Back up parent txn's cursors, then grab the originals for tracking */
static int
mdb_cursor_shadow(MDB_txn *src, MDB_txn *dst)
{
MDB_cursor *mc, *m2;
unsigned int i, j, size;
MDB_cursor *mc, *bk;
MDB_xcursor *mx;
size_t size;
int i;
for (i=0;i<src->mt_numdbs; i++) {
if (src->mt_cursors[i]) {
for (i = src->mt_numdbs; --i >= 0; ) {
if ((mc = src->mt_cursors[i]) != NULL) {
size = sizeof(MDB_cursor);
if (src->mt_cursors[i]->mc_xcursor)
if (mc->mc_xcursor)
size += sizeof(MDB_xcursor);
for (m2 = src->mt_cursors[i]; m2; m2=m2->mc_next) {
mc = malloc(size);
if (!mc)
for (; mc; mc = bk->mc_next) {
bk = malloc(size);
if (!bk)
return ENOMEM;
mc->mc_orig = m2;
mc->mc_txn = dst;
mc->mc_dbi = i;
*bk = *mc;
mc->mc_backup = bk;
mc->mc_db = &dst->mt_dbs[i];
mc->mc_dbx = m2->mc_dbx;
mc->mc_dbflag = &dst->mt_dbflags[i];
mc->mc_snum = m2->mc_snum;
mc->mc_top = m2->mc_top;
mc->mc_flags = m2->mc_flags;
for (j=0; j<mc->mc_snum; j++) {
mc->mc_pg[j] = m2->mc_pg[j];
mc->mc_ki[j] = m2->mc_ki[j];
}
if (m2->mc_xcursor) {
MDB_xcursor *mx, *mx2;
mx = (MDB_xcursor *)(mc+1);
mc->mc_xcursor = mx;
mx2 = m2->mc_xcursor;
mx->mx_db = mx2->mx_db;
mx->mx_dbx = mx2->mx_dbx;
mx->mx_dbflag = mx2->mx_dbflag;
mx->mx_cursor.mc_txn = dst;
mx->mx_cursor.mc_dbi = mx2->mx_cursor.mc_dbi;
mx->mx_cursor.mc_db = &mx->mx_db;
mx->mx_cursor.mc_dbx = &mx->mx_dbx;
mx->mx_cursor.mc_dbflag = &mx->mx_dbflag;
mx->mx_cursor.mc_snum = mx2->mx_cursor.mc_snum;
mx->mx_cursor.mc_top = mx2->mx_cursor.mc_top;
mx->mx_cursor.mc_flags = mx2->mx_cursor.mc_flags;
for (j=0; j<mx2->mx_cursor.mc_snum; j++) {
mx->mx_cursor.mc_pg[j] = mx2->mx_cursor.mc_pg[j];
mx->mx_cursor.mc_ki[j] = mx2->mx_cursor.mc_ki[j];
}
} else {
mc->mc_xcursor = NULL;
/* Kill pointers into src - and dst to reduce abuse: The
* user may not use mc until dst ends. Otherwise we'd...
*/
mc->mc_txn = NULL; /* ...set this to dst */
mc->mc_dbflag = NULL; /* ...and &dst->mt_dbflags[i] */
if ((mx = mc->mc_xcursor) != NULL) {
*(MDB_xcursor *)(bk+1) = *mx;
mx->mx_cursor.mc_txn = NULL; /* ...and dst. */
}
mc->mc_next = dst->mt_cursors[i];
dst->mt_cursors[i] = mc;
@ -1693,32 +1671,40 @@ mdb_cursor_shadow(MDB_txn *src, MDB_txn *dst)
return MDB_SUCCESS;
}
/** Close this write txn's cursors, after optionally merging its shadow
* cursors back into parent's.
/** Close this write txn's cursors, give parent txn's cursors back to parent.
* @param[in] txn the transaction handle.
* @param[in] merge zero to not merge cursors, non-zero to merge.
* @param[in] merge true to keep changes to parent cursors, false to revert.
* @return 0 on success, non-zero on failure.
*/
static void
mdb_cursors_close(MDB_txn *txn, unsigned merge)
{
MDB_cursor **cursors = txn->mt_cursors, *mc, *next;
int i, j;
MDB_cursor **cursors = txn->mt_cursors, *mc, *next, *bk;
MDB_xcursor *mx;
int i;
for (i = txn->mt_numdbs; --i >= 0; ) {
for (mc = cursors[i]; mc; mc = next) {
next = mc->mc_next;
if (merge && mc->mc_orig) {
MDB_cursor *m2 = mc->mc_orig;
m2->mc_snum = mc->mc_snum;
m2->mc_top = mc->mc_top;
for (j = mc->mc_snum; --j >= 0; ) {
m2->mc_pg[j] = mc->mc_pg[j];
m2->mc_ki[j] = mc->mc_ki[j];
}
next = mc->mc_next;
if ((bk = mc->mc_backup) != NULL) {
if (merge) {
/* Commit changes to parent txn */
mc->mc_next = bk->mc_next;
mc->mc_backup = bk->mc_backup;
mc->mc_txn = bk->mc_txn;
mc->mc_db = bk->mc_db;
mc->mc_dbflag = bk->mc_dbflag;
if ((mx = mc->mc_xcursor) != NULL)
mx->mx_cursor.mc_txn = bk->mc_txn;
} else {
/* Abort nested txn */
*mc = *bk;
if ((mx = mc->mc_xcursor) != NULL)
*mx = *(MDB_xcursor *)(bk+1);
}
/* Only malloced cursors are permanently tracked. */
free(mc);
mc = bk;
}
free(mc);
}
cursors[i] = NULL;
}
@ -5867,7 +5853,7 @@ mdb_xcursor_init1(MDB_cursor *mc, MDB_node *node)
static void
mdb_cursor_init(MDB_cursor *mc, MDB_txn *txn, MDB_dbi dbi, MDB_xcursor *mx)
{
mc->mc_orig = NULL;
mc->mc_backup = NULL;
mc->mc_dbi = dbi;
mc->mc_txn = txn;
mc->mc_db = &txn->mt_dbs[dbi];
@ -5961,7 +5947,7 @@ mdb_cursor_count(MDB_cursor *mc, size_t *countp)
void
mdb_cursor_close(MDB_cursor *mc)
{
if (mc != NULL) {
if (mc && !mc->mc_backup) {
/* remove from txn, if tracked */
if ((mc->mc_flags & C_UNTRACK) && mc->mc_txn->mt_cursors) {
MDB_cursor **prev = &mc->mc_txn->mt_cursors[mc->mc_dbi];