From 04f488e7a0b81be79f7cd28404457edadefef193 Mon Sep 17 00:00:00 2001
From: Howard Chu <hyc@symas.com>
Date: Tue, 27 Mar 2012 10:42:22 -0700
Subject: [PATCH] ITS#7210 additional freelist fixes

Also allow read access to freelist in mdb_cursor_open
---
 libraries/libmdb/mdb.c | 114 +++++++++++++++++++++++++++++++----------
 1 file changed, 88 insertions(+), 26 deletions(-)

diff --git a/libraries/libmdb/mdb.c b/libraries/libmdb/mdb.c
index 3ac46d006b..a44b631333 100644
--- a/libraries/libmdb/mdb.c
+++ b/libraries/libmdb/mdb.c
@@ -1108,6 +1108,55 @@ mdb_page_keys(MDB_page *mp)
 }
 #endif
 
+#if MDB_DEBUG > 2
+/** Count all the pages in each DB and in the freelist
+ *  and make sure it matches the actual number of pages
+ *  being used.
+ */
+static void mdb_audit(MDB_txn *txn)
+{
+	MDB_cursor mc;
+	MDB_val key, data;
+	int rc, i;
+	ID freecount, count;
+
+	freecount = 0;
+	mdb_cursor_init(&mc, txn, FREE_DBI, NULL);
+	while ((rc = mdb_cursor_get(&mc, &key, &data, MDB_NEXT)) == 0)
+		freecount += *(ID *)data.mv_data;
+	freecount += txn->mt_dbs[0].md_branch_pages + txn->mt_dbs[0].md_leaf_pages +
+		txn->mt_dbs[0].md_overflow_pages;
+
+	count = 0;
+	for (i = 0; i<txn->mt_numdbs; i++) {
+		count += txn->mt_dbs[i].md_branch_pages +
+			txn->mt_dbs[i].md_leaf_pages +
+			txn->mt_dbs[i].md_overflow_pages;
+		if (txn->mt_dbs[i].md_flags & MDB_DUPSORT) {
+			MDB_xcursor mx;
+			mdb_cursor_init(&mc, txn, i, &mx);
+			mdb_page_search(&mc, NULL, 0);
+			do {
+				int j;
+				MDB_page *mp;
+				mp = mc.mc_pg[mc.mc_top];
+				for (j=0; j<NUMKEYS(mp); j++) {
+					MDB_node *leaf = NODEPTR(mp, j);
+					if (leaf->mn_flags & F_SUBDATA) {
+						MDB_db db;
+						memcpy(&db, NODEDATA(leaf), sizeof(db));
+						count += db.md_branch_pages + db.md_leaf_pages +
+							db.md_overflow_pages;
+					}
+				}
+			}
+			while (mdb_cursor_sibling(&mc, 1) == 0);
+		}
+	}
+	assert(freecount + count + 2 >= txn->mt_next_pgno - 1);
+}
+#endif
+
 int
 mdb_cmp(MDB_txn *txn, MDB_dbi dbi, const MDB_val *a, const MDB_val *b)
 {
@@ -1225,9 +1274,6 @@ mdb_page_alloc(MDB_cursor *mc, int num)
 					}
 				}
 #endif
-				if (mop->mo_txnid == 87869 && txn->mt_txnid == 87879) {
-					int i=1;
-				}
 			}
 		}
 none:
@@ -1747,7 +1793,7 @@ mdb_txn_commit(MDB_txn *txn)
 	off_t		 size;
 	MDB_page	*dp;
 	MDB_env	*env;
-	pgno_t	next;
+	pgno_t	next, freecnt;
 	MDB_cursor mc;
 
 	assert(txn != NULL);
@@ -1878,9 +1924,9 @@ mdb_txn_commit(MDB_txn *txn)
 	}
 
 	/* save to free list */
+	freecnt = txn->mt_free_pgs[0];
 	if (!MDB_IDL_IS_ZERO(txn->mt_free_pgs)) {
 		MDB_val key, data;
-		pgno_t i;
 
 		/* make sure last page of freeDB is touched and on freelist */
 		key.mv_size = MAXKEYSIZE+1;
@@ -1908,40 +1954,51 @@ mdb_txn_commit(MDB_txn *txn)
 		 * and make sure the entire thing got written.
 		 */
 		do {
-			i = txn->mt_free_pgs[0];
+			freecnt = txn->mt_free_pgs[0];
 			data.mv_size = MDB_IDL_SIZEOF(txn->mt_free_pgs);
 			rc = mdb_cursor_put(&mc, &key, &data, 0);
 			if (rc) {
 				mdb_txn_abort(txn);
 				return rc;
 			}
-		} while (i != txn->mt_free_pgs[0]);
+		} while (freecnt != txn->mt_free_pgs[0]);
 		if (mdb_midl_shrink(&txn->mt_free_pgs))
 			env->me_free_pgs = txn->mt_free_pgs;
 	}
 	/* should only be one record now */
+again:
 	if (env->me_pghead) {
 		MDB_val key, data;
 		MDB_oldpages *mop;
 		pgno_t orig;
+		txnid_t id;
 
 		mop = env->me_pghead;
-		key.mv_size = sizeof(pgno_t);
-		key.mv_data = &mop->mo_txnid;
+		id = mop->mo_txnid;
+		key.mv_size = sizeof(id);
+		key.mv_data = &id;
 		data.mv_size = MDB_IDL_SIZEOF(mop->mo_pages);
 		data.mv_data = mop->mo_pages;
 		orig = mop->mo_pages[0];
 		mdb_cursor_put(&mc, &key, &data, 0);
-		/* could have been used again here */
-		if (mop->mo_pages[0] != orig) {
-			data.mv_size = MDB_IDL_SIZEOF(mop->mo_pages);
-			data.mv_data = mop->mo_pages;
-			mdb_cursor_put(&mc, &key, &data, 0);
-			env->me_pgfirst = 0;
-			env->me_pglast = 0;
+		if (mop == env->me_pghead) {
+			/* could have been used again here */
+			if (mop->mo_pages[0] != orig) {
+				data.mv_size = MDB_IDL_SIZEOF(mop->mo_pages);
+				data.mv_data = mop->mo_pages;
+				id = mop->mo_txnid;
+				mdb_cursor_put(&mc, &key, &data, 0);
+			}
+			env->me_pghead = NULL;
+			free(mop);
+		} else {
+			/* was completely used up */
+			mdb_cursor_del(&mc, 0);
+			if (env->me_pghead)
+				goto again;
 		}
-		env->me_pghead = NULL;
-		free(mop);
+		env->me_pgfirst = 0;
+		env->me_pglast = 0;
 	}
 
 	/* Update DB root pointers. Their pages have already been
@@ -1960,6 +2017,9 @@ mdb_txn_commit(MDB_txn *txn)
 			}
 		}
 	}
+#if MDB_DEBUG > 2
+	mdb_audit(txn);
+#endif
 
 	/* Commit up to MDB_COMMIT_PAGES dirty pages to disk until done.
 	 */
@@ -2006,7 +2066,6 @@ mdb_txn_commit(MDB_txn *txn)
 			dp = txn->mt_u.dirty_list[i].mptr;
 			if (dp->mp_pgno != next) {
 				if (n) {
-					DPRINTF("committing %u dirty pages", n);
 					rc = writev(env->me_fd, iov, n);
 					if (rc != size) {
 						n = ErrCode();
@@ -2041,7 +2100,6 @@ mdb_txn_commit(MDB_txn *txn)
 		if (n == 0)
 			break;
 
-		DPRINTF("committing %u dirty pages", n);
 		rc = writev(env->me_fd, iov, n);
 		if (rc != size) {
 			n = ErrCode();
@@ -4859,7 +4917,11 @@ mdb_cursor_open(MDB_txn *txn, MDB_dbi dbi, MDB_cursor **ret)
 	MDB_xcursor	*mx = NULL;
 	size_t size = sizeof(MDB_cursor);
 
-	if (txn == NULL || ret == NULL || !dbi || dbi >= txn->mt_numdbs)
+	if (txn == NULL || ret == NULL || dbi >= txn->mt_numdbs)
+		return EINVAL;
+
+	/* Allow read access to the freelist */
+	if (!dbi && !F_ISSET(txn->mt_flags, MDB_TXN_RDONLY))
 		return EINVAL;
 
 	if (txn->mt_dbs[dbi].md_flags & MDB_DUPSORT)
@@ -5542,6 +5604,11 @@ mdb_page_split(MDB_cursor *mc, MDB_val *newkey, MDB_val *newdata, pgno_t newpgno
 	    IS_LEAF(mp) ? "leaf" : "branch", mp->mp_pgno,
 	    DKEY(newkey), mc->mc_ki[mc->mc_top]);
 
+	/* Create a right sibling. */
+	if ((rp = mdb_page_new(mc, mp->mp_flags, 1)) == NULL)
+		return ENOMEM;
+	DPRINTF("new right sibling: page %zu", rp->mp_pgno);
+
 	if (mc->mc_snum < 2) {
 		if ((pp = mdb_page_new(mc, P_BRANCH, 1)) == NULL)
 			return ENOMEM;
@@ -5572,11 +5639,6 @@ mdb_page_split(MDB_cursor *mc, MDB_val *newkey, MDB_val *newdata, pgno_t newpgno
 		DPRINTF("parent branch page is %zu", mc->mc_pg[ptop]->mp_pgno);
 	}
 
-	/* Create a right sibling. */
-	if ((rp = mdb_page_new(mc, mp->mp_flags, 1)) == NULL)
-		return ENOMEM;
-	DPRINTF("new right sibling: page %zu", rp->mp_pgno);
-
 	mdb_cursor_copy(mc, &mn);
 	mn.mc_pg[mn.mc_top] = rp;
 	mn.mc_ki[ptop] = mc->mc_ki[ptop]+1;