Don't use env-private copy of DB root nodes.

Just lookup the DB roots as needed. When many DBs are in use,
most of the copies won't be referenced in a given txn, and
there's a bad race condition in the copy routine.
This commit is contained in:
Howard Chu 2012-07-12 17:04:05 -07:00
parent 0ea56294f1
commit a0993354a6

View File

@ -335,45 +335,6 @@ static txnid_t mdb_debug_start;
#define DKEY(x) 0
#endif
/** @defgroup lazylock Lazy Locking
* Macros for locks that aren't actually needed.
* The DB view is always consistent because all writes are wrapped in
* the wmutex. Finer-grained locks aren't necessary.
* @{
*/
#ifndef LAZY_LOCKS
/** Use lazy locking. I.e., don't lock these accesses at all. */
#define LAZY_LOCKS 1
#endif
#if LAZY_LOCKS
/** Grab the reader lock */
#define LAZY_MUTEX_LOCK(x)
/** Release the reader lock */
#define LAZY_MUTEX_UNLOCK(x)
/** Release the DB table reader/writer lock */
#define LAZY_RWLOCK_UNLOCK(x)
/** Grab the DB table write lock */
#define LAZY_RWLOCK_WRLOCK(x)
/** Grab the DB table read lock */
#define LAZY_RWLOCK_RDLOCK(x)
/** Declare the DB table rwlock. Should not be followed by ';'. */
#define LAZY_RWLOCK_DEF(x)
/** Initialize the DB table rwlock */
#define LAZY_RWLOCK_INIT(x,y)
/** Destroy the DB table rwlock */
#define LAZY_RWLOCK_DESTROY(x)
#else
#define LAZY_MUTEX_LOCK(x) pthread_mutex_lock(x)
#define LAZY_MUTEX_UNLOCK(x) pthread_mutex_unlock(x)
#define LAZY_RWLOCK_UNLOCK(x) pthread_rwlock_unlock(x)
#define LAZY_RWLOCK_WRLOCK(x) pthread_rwlock_wrlock(x)
#define LAZY_RWLOCK_RDLOCK(x) pthread_rwlock_rdlock(x)
#define LAZY_RWLOCK_DEF(x) pthread_rwlock_t x;
#define LAZY_RWLOCK_INIT(x,y) pthread_rwlock_init(x,y)
#define LAZY_RWLOCK_DESTROY(x) pthread_rwlock_destroy(x)
#endif
/** @} */
/** An invalid page number.
* Mainly used to denote an empty tree.
*/
@ -924,7 +885,7 @@ struct MDB_env {
/** Failed to update the meta page. Probably an I/O error. */
#define MDB_FATAL_ERROR 0x80000000U
uint32_t me_flags; /**< @ref mdb_env */
uint32_t me_extrapad; /**< unused for now */
unsigned int me_psize; /**< size of a page, from #GET_PAGESIZE */
unsigned int me_maxreaders; /**< size of the reader table */
MDB_dbi me_numdbs; /**< number of DBs opened */
MDB_dbi me_maxdbs; /**< size of the DB table */
@ -936,13 +897,10 @@ struct MDB_env {
size_t me_mapsize; /**< size of the data memory map */
off_t me_size; /**< current file size */
pgno_t me_maxpg; /**< me_mapsize / me_psize */
unsigned int me_psize; /**< size of a page, from #GET_PAGESIZE */
unsigned int me_db_toggle; /**< which DB table is current */
txnid_t me_wtxnid; /**< ID of last txn we committed */
txnid_t me_pgfirst; /**< ID of first old page record we used */
txnid_t me_pglast; /**< ID of last old page record we used */
MDB_dbx *me_dbxs; /**< array of static DB info */
MDB_db *me_dbs[2]; /**< two arrays of MDB_db info */
uint16_t *me_dbflags; /**< array of DB flags */
MDB_oldpages *me_pghead; /**< list of old page records */
MDB_oldpages *me_pgfree; /**< list of page records to free */
pthread_key_t me_txkey; /**< thread-key for readers */
@ -951,8 +909,6 @@ struct MDB_env {
MDB_IDL me_free_pgs;
/** ID2L of pages that were written during a write txn */
MDB_ID2 me_dirty_list[MDB_IDL_UM_SIZE];
/** rwlock for the DB tables, if #LAZY_LOCKS is false */
LAZY_RWLOCK_DEF(me_dblock)
#ifdef _WIN32
HANDLE me_rmutex; /* Windows mutexes don't reside in shared mem */
HANDLE me_wmutex;
@ -1544,12 +1500,15 @@ static int
mdb_txn_renew0(MDB_txn *txn)
{
MDB_env *env = txn->mt_env;
char mt_dbflag = 0;
unsigned int i;
/* Setup db info */
txn->mt_numdbs = env->me_numdbs;
txn->mt_dbxs = env->me_dbxs; /* mostly static anyway */
if (txn->mt_flags & MDB_TXN_RDONLY) {
MDB_reader *r = pthread_getspecific(env->me_txkey);
if (!r) {
unsigned int i;
pid_t pid = getpid();
pthread_t tid = pthread_self();
@ -1569,15 +1528,9 @@ mdb_txn_renew0(MDB_txn *txn)
r = &env->me_txns->mti_readers[i];
pthread_setspecific(env->me_txkey, r);
}
txn->mt_txnid = env->me_txns->mti_txnid;
txn->mt_txnid = r->mr_txnid = env->me_txns->mti_txnid;
txn->mt_toggle = txn->mt_txnid & 1;
txn->mt_next_pgno = env->me_metas[txn->mt_toggle]->mm_last_pg+1;
/* This happens if a different process was the
* last writer to the DB.
*/
if (env->me_wtxnid < txn->mt_txnid)
mt_dbflag = DB_STALE;
r->mr_txnid = txn->mt_txnid;
txn->mt_u.reader = r;
} else {
LOCK_MUTEX_W(env);
@ -1585,8 +1538,6 @@ mdb_txn_renew0(MDB_txn *txn)
txn->mt_txnid = env->me_txns->mti_txnid;
txn->mt_toggle = txn->mt_txnid & 1;
txn->mt_next_pgno = env->me_metas[txn->mt_toggle]->mm_last_pg+1;
if (env->me_wtxnid < txn->mt_txnid)
mt_dbflag = DB_STALE;
txn->mt_txnid++;
#if MDB_DEBUG
if (txn->mt_txnid == mdb_debug_start)
@ -1599,17 +1550,11 @@ mdb_txn_renew0(MDB_txn *txn)
env->me_txn = txn;
}
/* Copy the DB arrays */
LAZY_RWLOCK_RDLOCK(&env->me_dblock);
txn->mt_numdbs = env->me_numdbs;
txn->mt_dbxs = env->me_dbxs; /* mostly static anyway */
/* Copy the DB info and flags */
memcpy(txn->mt_dbs, env->me_metas[txn->mt_toggle]->mm_dbs, 2 * sizeof(MDB_db));
if (txn->mt_numdbs > 2)
memcpy(txn->mt_dbs+2, env->me_dbs[env->me_db_toggle]+2,
(txn->mt_numdbs - 2) * sizeof(MDB_db));
LAZY_RWLOCK_UNLOCK(&env->me_dblock);
memset(txn->mt_dbflags, mt_dbflag, env->me_numdbs);
for (i=2; i<txn->mt_numdbs; i++)
txn->mt_dbs[i].md_flags = env->me_dbflags[i];
memset(txn->mt_dbflags, DB_STALE, env->me_numdbs);
return MDB_SUCCESS;
}
@ -1828,21 +1773,11 @@ mdb_txn_commit(MDB_txn *txn)
if (F_ISSET(txn->mt_flags, MDB_TXN_RDONLY)) {
if (txn->mt_numdbs > env->me_numdbs) {
/* update the DB tables */
int toggle = !env->me_db_toggle;
MDB_db *ip, *jp;
/* update the DB flags */
MDB_dbi i;
ip = &env->me_dbs[toggle][env->me_numdbs];
jp = &txn->mt_dbs[env->me_numdbs];
LAZY_RWLOCK_WRLOCK(&env->me_dblock);
for (i = env->me_numdbs; i < txn->mt_numdbs; i++) {
*ip++ = *jp++;
}
env->me_db_toggle = toggle;
env->me_numdbs = txn->mt_numdbs;
LAZY_RWLOCK_UNLOCK(&env->me_dblock);
for (i = env->me_numdbs; i<txn->mt_numdbs; i++)
env->me_dbflags[i] = txn->mt_dbs[i].md_flags;
env->me_numdbs = i;
}
mdb_txn_abort(txn);
return MDB_SUCCESS;
@ -2171,28 +2106,15 @@ again:
mdb_txn_abort(txn);
return n;
}
env->me_wtxnid = txn->mt_txnid;
done:
env->me_txn = NULL;
/* update the DB tables */
{
int toggle = !env->me_db_toggle;
MDB_db *ip, *jp;
if (txn->mt_numdbs > env->me_numdbs) {
/* update the DB flags */
MDB_dbi i;
ip = &env->me_dbs[toggle][2];
jp = &txn->mt_dbs[2];
LAZY_RWLOCK_WRLOCK(&env->me_dblock);
for (i = 2; i < txn->mt_numdbs; i++) {
if (ip->md_root != jp->md_root)
*ip = *jp;
ip++; jp++;
}
env->me_db_toggle = toggle;
env->me_numdbs = txn->mt_numdbs;
LAZY_RWLOCK_UNLOCK(&env->me_dblock);
for (i = env->me_numdbs; i<txn->mt_numdbs; i++)
env->me_dbflags[i] = txn->mt_dbs[i].md_flags;
env->me_numdbs = i;
}
UNLOCK_MUTEX_W(env);
@ -2388,9 +2310,7 @@ mdb_env_write_meta(MDB_txn *txn)
* readers will get consistent data regardless of how fresh or
* how stale their view of these values is.
*/
LAZY_MUTEX_LOCK(&env->me_txns->mti_mutex);
txn->mt_env->me_txns->mti_txnid = txn->mt_txnid;
LAZY_MUTEX_UNLOCK(&env->me_txns->mti_mutex);
return MDB_SUCCESS;
}
@ -3100,13 +3020,13 @@ mdb_env_open(MDB_env *env, const char *path, unsigned int flags, mode_t mode)
goto leave;
}
#endif
LAZY_RWLOCK_INIT(&env->me_dblock, NULL);
if (excl)
mdb_env_share_locks(env);
env->me_dbxs = calloc(env->me_maxdbs, sizeof(MDB_dbx));
env->me_dbs[0] = calloc(env->me_maxdbs, sizeof(MDB_db));
env->me_dbs[1] = calloc(env->me_maxdbs, sizeof(MDB_db));
env->me_numdbs = 2;
env->me_dbxs = calloc(env->me_maxdbs, sizeof(MDB_dbx));
env->me_dbflags = calloc(env->me_maxdbs, sizeof(uint16_t));
if (!env->me_dbxs || !env->me_dbflags)
rc = ENOMEM;
}
leave:
@ -3140,12 +3060,10 @@ mdb_env_close(MDB_env *env)
free(dp);
}
free(env->me_dbs[1]);
free(env->me_dbs[0]);
free(env->me_dbflags);
free(env->me_dbxs);
free(env->me_path);
LAZY_RWLOCK_DESTROY(&env->me_dblock);
pthread_key_delete(env->me_txkey);
#ifdef _WIN32
/* Delete our key from the global list */
@ -6316,8 +6234,7 @@ int mdb_open(MDB_txn *txn, const char *name, unsigned int flags, MDB_dbi *dbi)
txn->mt_dbflags[slot] = dbflag;
memcpy(&txn->mt_dbs[slot], data.mv_data, sizeof(MDB_db));
*dbi = slot;
txn->mt_env->me_dbs[0][slot] = txn->mt_dbs[slot];
txn->mt_env->me_dbs[1][slot] = txn->mt_dbs[slot];
txn->mt_env->me_dbflags[slot] = txn->mt_dbs[slot].md_flags;
mdb_default_cmp(txn, slot);
if (!unused) {
txn->mt_numdbs++;