Rename fields of lock and lockholder structures to something a tad less

confusing, and clean up documentation.
This commit is contained in:
Tom Lane 2001-01-16 06:11:34 +00:00
parent 2dbc457ef5
commit 64e6c60897
5 changed files with 299 additions and 271 deletions

View File

@ -1,4 +1,4 @@
$Header: /cvsroot/pgsql/src/backend/storage/lmgr/README,v 1.4 2000/12/22 00:51:54 tgl Exp $ $Header: /cvsroot/pgsql/src/backend/storage/lmgr/README,v 1.5 2001/01/16 06:11:34 tgl Exp $
There are two fundamental lock structures: the per-lockable-object LOCK There are two fundamental lock structures: the per-lockable-object LOCK
struct, and the per-lock-holder HOLDER struct. A LOCK object exists struct, and the per-lock-holder HOLDER struct. A LOCK object exists
@ -44,51 +44,52 @@ tag -
only a single table (see Gray's paper on 2 phase locking if only a single table (see Gray's paper on 2 phase locking if
you are puzzled about how multi-level lock tables work). you are puzzled about how multi-level lock tables work).
mask - grantMask -
This field indicates what types of locks are currently held on the This bitmask indicates what types of locks are currently held on the
given lockable object. It is used (against the lock table's conflict given lockable object. It is used (against the lock table's conflict
table) to determine if the new lock request will conflict with existing table) to determine if a new lock request will conflict with existing
lock types held. Conficts are determined by bitwise AND operations lock types held. Conficts are determined by bitwise AND operations
between the mask and the conflict table entry for the given lock type between the grantMask and the conflict table entry for the requested
to be set. The current representation is that each bit (1 through 5) lock type. Bit i of grantMask is 1 if and only if granted[i] > 0.
is set when that lock type (WRITE, READ, WRITE INTENT, READ INTENT, EXTEND)
has been acquired for the lock. waitMask -
This bitmask shows the types of locks being waited for. Bit i of waitMask
is 1 if and only if requested[i] > granted[i].
waitProcs - waitProcs -
This is a shared memory queue of all process structures corresponding to This is a shared memory queue of all process structures corresponding to
a backend that is waiting (sleeping) until another backend releases this a backend that is waiting (sleeping) until another backend releases this
lock. The process structure holds the information needed to determine lock. The process structure holds the information needed to determine
if it should be woken up when this lock is released. If, for example, if it should be woken up when this lock is released.
we are releasing a read lock and the process is sleeping trying to acquire
a read lock then there is no point in waking it since the lock being
released isn't what caused it to sleep in the first place. There will
be more on this below (when I get to releasing locks and waking sleeping
process routines).
nHolding - nRequested -
Keeps a count of how many times this lock has been attempted to be Keeps a count of how many times this lock has been attempted to be
acquired. The count includes attempts by processes which were put acquired. The count includes attempts by processes which were put
to sleep due to conflicts. It also counts the same backend twice to sleep due to conflicts. It also counts the same backend twice
if, for example, a backend process first acquires a read and then if, for example, a backend process first acquires a read and then
acquires a write. acquires a write, or acquires a read lock twice.
holders - requested -
Keeps a count of how many locks of each type have been attempted. Only Keeps a count of how many locks of each type have been attempted. Only
elements 1 through MAX_LOCK_TYPES are used as they correspond to the lock elements 1 through MAX_LOCKMODES-1 are used as they correspond to the lock
type defined constants (WRITE through EXTEND). Summing the values of type defined constants. Summing the values of requested[] should come out
holders should come out equal to nHolding. equal to nRequested.
nActive - nGranted -
Keeps a count of how many times this lock has been succesfully acquired. Keeps count of how many times this lock has been successfully acquired.
This count does not include attempts that are waiting due to conflicts, This count does not include attempts that are waiting due to conflicts,
but can count the same backend twice (e.g. a read then a write -- since but can count the same backend twice (e.g. a read then a write -- since
its the same transaction this won't cause a conflict) its the same transaction this won't cause a conflict).
activeHolders - granted -
Keeps a count of how locks of each type are currently held. Once again Keeps count of how many locks of each type are currently held. Once again
only elements 1 through MAX_LOCK_TYPES are used (0 is not). Also, like only elements 1 through MAX_LOCKMODES-1 are used (0 is not). Also, like
holders, summing the values of activeHolders should total to the value requested, summing the values of granted should total to the value
of nActive. of nGranted.
We should always have 0 <= nGranted <= nRequested, and
0 <= granted[i] <= requested[i] for each i. If the request counts go to
zero, the lock object is no longer needed and can be freed.
--------------------------------------------------------------------------- ---------------------------------------------------------------------------
@ -116,14 +117,12 @@ tag -
are always session locks, and we also use session locks for multi- are always session locks, and we also use session locks for multi-
transaction operations like VACUUM. transaction operations like VACUUM.
holders - holding -
The number of successfully acquired locks of each type for this holder. The number of successfully acquired locks of each type for this holder.
(CAUTION: the semantics are not the same as the LOCK's holder[], which This should be <= the corresponding granted[] value of the lock object!
counts both acquired and pending requests. Probably a different name
should be used...)
nHolding - nHolding -
Sum of the holders[] array. Sum of the holding[] array.
queue - queue -
List link for shared memory queue of all the HOLDER objects for the List link for shared memory queue of all the HOLDER objects for the

View File

@ -8,7 +8,7 @@
* *
* *
* IDENTIFICATION * IDENTIFICATION
* $Header: /cvsroot/pgsql/src/backend/storage/lmgr/lock.c,v 1.77 2001/01/14 05:08:15 tgl Exp $ * $Header: /cvsroot/pgsql/src/backend/storage/lmgr/lock.c,v 1.78 2001/01/16 06:11:34 tgl Exp $
* *
* NOTES * NOTES
* Outside modules can create a lock table and acquire/release * Outside modules can create a lock table and acquire/release
@ -43,8 +43,8 @@
static int WaitOnLock(LOCKMETHOD lockmethod, LOCKMODE lockmode, static int WaitOnLock(LOCKMETHOD lockmethod, LOCKMODE lockmode,
LOCK *lock, HOLDER *holder); LOCK *lock, HOLDER *holder);
static void LockCountMyLocks(SHMEM_OFFSET lockOffset, PROC *proc, static void LockCountMyLocks(SHMEM_OFFSET lockOffset, PROC *proc,
int *myHolders); int *myHolding);
static int LockGetMyHoldLocks(SHMEM_OFFSET lockOffset, PROC *proc); static int LockGetMyHeldLocks(SHMEM_OFFSET lockOffset, PROC *proc);
static char *lock_types[] = static char *lock_types[] =
{ {
@ -91,7 +91,7 @@ LOCK_DEBUG_ENABLED(const LOCK * lock)
return return
(((LOCK_LOCKMETHOD(*lock) == DEFAULT_LOCKMETHOD && Trace_locks) (((LOCK_LOCKMETHOD(*lock) == DEFAULT_LOCKMETHOD && Trace_locks)
|| (LOCK_LOCKMETHOD(*lock) == USER_LOCKMETHOD && Trace_userlocks)) || (LOCK_LOCKMETHOD(*lock) == USER_LOCKMETHOD && Trace_userlocks))
&& (lock->tag.relId >= Trace_lock_oidmin)) && (lock->tag.relId >= (Oid) Trace_lock_oidmin))
|| (Trace_lock_table && (lock->tag.relId == Trace_lock_table)); || (Trace_lock_table && (lock->tag.relId == Trace_lock_table));
} }
@ -101,17 +101,18 @@ LOCK_PRINT(const char * where, const LOCK * lock, LOCKMODE type)
{ {
if (LOCK_DEBUG_ENABLED(lock)) if (LOCK_DEBUG_ENABLED(lock))
elog(DEBUG, elog(DEBUG,
"%s: lock(%lx) tbl(%d) rel(%u) db(%u) obj(%u) mask(%x) " "%s: lock(%lx) tbl(%d) rel(%u) db(%u) obj(%u) grantMask(%x) "
"hold(%d,%d,%d,%d,%d,%d,%d)=%d " "req(%d,%d,%d,%d,%d,%d,%d)=%d "
"act(%d,%d,%d,%d,%d,%d,%d)=%d wait(%d) type(%s)", "grant(%d,%d,%d,%d,%d,%d,%d)=%d wait(%d) type(%s)",
where, MAKE_OFFSET(lock), where, MAKE_OFFSET(lock),
lock->tag.lockmethod, lock->tag.relId, lock->tag.dbId, lock->tag.lockmethod, lock->tag.relId, lock->tag.dbId,
lock->tag.objId.blkno, lock->mask, lock->tag.objId.blkno, lock->grantMask,
lock->holders[1], lock->holders[2], lock->holders[3], lock->holders[4], lock->requested[1], lock->requested[2], lock->requested[3],
lock->holders[5], lock->holders[6], lock->holders[7], lock->nHolding, lock->requested[4], lock->requested[5], lock->requested[6],
lock->activeHolders[1], lock->activeHolders[2], lock->activeHolders[3], lock->requested[7], lock->nRequested,
lock->activeHolders[4], lock->activeHolders[5], lock->activeHolders[6], lock->granted[1], lock->granted[2], lock->granted[3],
lock->activeHolders[7], lock->nActive, lock->granted[4], lock->granted[5], lock->granted[6],
lock->granted[7], lock->nGranted,
lock->waitProcs.size, lock_types[type]); lock->waitProcs.size, lock_types[type]);
} }
@ -122,7 +123,7 @@ HOLDER_PRINT(const char * where, const HOLDER * holderP)
if ( if (
(((HOLDER_LOCKMETHOD(*holderP) == DEFAULT_LOCKMETHOD && Trace_locks) (((HOLDER_LOCKMETHOD(*holderP) == DEFAULT_LOCKMETHOD && Trace_locks)
|| (HOLDER_LOCKMETHOD(*holderP) == USER_LOCKMETHOD && Trace_userlocks)) || (HOLDER_LOCKMETHOD(*holderP) == USER_LOCKMETHOD && Trace_userlocks))
&& (((LOCK *)MAKE_PTR(holderP->tag.lock))->tag.relId >= Trace_lock_oidmin)) && (((LOCK *)MAKE_PTR(holderP->tag.lock))->tag.relId >= (Oid) Trace_lock_oidmin))
|| (Trace_lock_table && (((LOCK *)MAKE_PTR(holderP->tag.lock))->tag.relId == Trace_lock_table)) || (Trace_lock_table && (((LOCK *)MAKE_PTR(holderP->tag.lock))->tag.relId == Trace_lock_table))
) )
elog(DEBUG, elog(DEBUG,
@ -130,8 +131,9 @@ HOLDER_PRINT(const char * where, const HOLDER * holderP)
where, MAKE_OFFSET(holderP), holderP->tag.lock, where, MAKE_OFFSET(holderP), holderP->tag.lock,
HOLDER_LOCKMETHOD(*(holderP)), HOLDER_LOCKMETHOD(*(holderP)),
holderP->tag.pid, holderP->tag.xid, holderP->tag.pid, holderP->tag.xid,
holderP->holders[1], holderP->holders[2], holderP->holders[3], holderP->holders[4], holderP->holding[1], holderP->holding[2], holderP->holding[3],
holderP->holders[5], holderP->holders[6], holderP->holders[7], holderP->nHolding); holderP->holding[4], holderP->holding[5], holderP->holding[6],
holderP->holding[7], holderP->nHolding);
} }
#else /* not LOCK_DEBUG */ #else /* not LOCK_DEBUG */
@ -146,7 +148,12 @@ HOLDER_PRINT(const char * where, const HOLDER * holderP)
SPINLOCK LockMgrLock; /* in Shmem or created in SPINLOCK LockMgrLock; /* in Shmem or created in
* CreateSpinlocks() */ * CreateSpinlocks() */
/* This is to simplify/speed up some bit arithmetic */ /*
* These are to simplify/speed up some bit arithmetic.
*
* XXX is a fetch from a static array really faster than a shift?
* Wouldn't bet on it...
*/
static LOCKMASK BITS_OFF[MAX_LOCKMODES]; static LOCKMASK BITS_OFF[MAX_LOCKMODES];
static LOCKMASK BITS_ON[MAX_LOCKMODES]; static LOCKMASK BITS_ON[MAX_LOCKMODES];
@ -471,7 +478,7 @@ LockAcquire(LOCKMETHOD lockmethod, LOCKTAG *locktag,
SPINLOCK masterLock; SPINLOCK masterLock;
LOCKMETHODTABLE *lockMethodTable; LOCKMETHODTABLE *lockMethodTable;
int status; int status;
int myHolders[MAX_LOCKMODES]; int myHolding[MAX_LOCKMODES];
int i; int i;
#ifdef LOCK_DEBUG #ifdef LOCK_DEBUG
@ -517,20 +524,21 @@ LockAcquire(LOCKMETHOD lockmethod, LOCKTAG *locktag,
*/ */
if (!found) if (!found)
{ {
lock->mask = 0; lock->grantMask = 0;
lock->nHolding = 0; lock->waitMask = 0;
lock->nActive = 0; lock->nRequested = 0;
MemSet((char *) lock->holders, 0, sizeof(int) * MAX_LOCKMODES); lock->nGranted = 0;
MemSet((char *) lock->activeHolders, 0, sizeof(int) * MAX_LOCKMODES); MemSet((char *) lock->requested, 0, sizeof(int) * MAX_LOCKMODES);
MemSet((char *) lock->granted, 0, sizeof(int) * MAX_LOCKMODES);
ProcQueueInit(&(lock->waitProcs)); ProcQueueInit(&(lock->waitProcs));
LOCK_PRINT("LockAcquire: new", lock, lockmode); LOCK_PRINT("LockAcquire: new", lock, lockmode);
} }
else else
{ {
LOCK_PRINT("LockAcquire: found", lock, lockmode); LOCK_PRINT("LockAcquire: found", lock, lockmode);
Assert((lock->nHolding > 0) && (lock->holders[lockmode] >= 0)); Assert((lock->nRequested >= 0) && (lock->requested[lockmode] >= 0));
Assert((lock->nActive > 0) && (lock->activeHolders[lockmode] >= 0)); Assert((lock->nGranted >= 0) && (lock->granted[lockmode] >= 0));
Assert(lock->nActive <= lock->nHolding); Assert(lock->nGranted <= lock->nRequested);
} }
/* ------------------ /* ------------------
@ -561,15 +569,15 @@ LockAcquire(LOCKMETHOD lockmethod, LOCKTAG *locktag,
if (!found) if (!found)
{ {
holder->nHolding = 0; holder->nHolding = 0;
MemSet((char *) holder->holders, 0, sizeof(int) * MAX_LOCKMODES); MemSet((char *) holder->holding, 0, sizeof(int) * MAX_LOCKMODES);
ProcAddLock(&holder->queue); ProcAddLock(&holder->queue);
HOLDER_PRINT("LockAcquire: new", holder); HOLDER_PRINT("LockAcquire: new", holder);
} }
else else
{ {
HOLDER_PRINT("LockAcquire: found", holder); HOLDER_PRINT("LockAcquire: found", holder);
Assert((holder->nHolding > 0) && (holder->holders[lockmode] >= 0)); Assert((holder->nHolding >= 0) && (holder->holding[lockmode] >= 0));
Assert(holder->nHolding <= lock->nActive); Assert(holder->nHolding <= lock->nGranted);
#ifdef CHECK_DEADLOCK_RISK #ifdef CHECK_DEADLOCK_RISK
/* /*
@ -588,7 +596,7 @@ LockAcquire(LOCKMETHOD lockmethod, LOCKTAG *locktag,
*/ */
for (i = lockMethodTable->ctl->numLockModes; i > 0; i--) for (i = lockMethodTable->ctl->numLockModes; i > 0; i--)
{ {
if (holder->holders[i] > 0) if (holder->holding[i] > 0)
{ {
if (i >= (int) lockmode) if (i >= (int) lockmode)
break; /* safe: we have a lock >= req level */ break; /* safe: we have a lock >= req level */
@ -603,21 +611,21 @@ LockAcquire(LOCKMETHOD lockmethod, LOCKTAG *locktag,
} }
/* ---------------- /* ----------------
* lock->nHolding and lock->holders count the total number of holders * lock->nRequested and lock->requested[] count the total number of
* either holding or waiting for the lock, so increment those immediately. * requests, whether granted or waiting, so increment those immediately.
* The other counts don't increment till we get the lock. * The other counts don't increment till we get the lock.
* ---------------- * ----------------
*/ */
lock->nHolding++; lock->nRequested++;
lock->holders[lockmode]++; lock->requested[lockmode]++;
Assert((lock->nHolding > 0) && (lock->holders[lockmode] > 0)); Assert((lock->nRequested > 0) && (lock->requested[lockmode] > 0));
/* -------------------- /* --------------------
* If I'm the only one holding any lock on this object, then there * If I'm the only one holding any lock on this object, then there
* cannot be a conflict. The same is true if I already hold this lock. * cannot be a conflict. The same is true if I already hold this lock.
* -------------------- * --------------------
*/ */
if (holder->nHolding == lock->nActive || holder->holders[lockmode] != 0) if (holder->nHolding == lock->nGranted || holder->holding[lockmode] != 0)
{ {
GrantLock(lock, holder, lockmode); GrantLock(lock, holder, lockmode);
HOLDER_PRINT("LockAcquire: owning", holder); HOLDER_PRINT("LockAcquire: owning", holder);
@ -630,8 +638,8 @@ LockAcquire(LOCKMETHOD lockmethod, LOCKTAG *locktag,
* then there is no conflict, either. * then there is no conflict, either.
* -------------------- * --------------------
*/ */
LockCountMyLocks(holder->tag.lock, MyProc, myHolders); LockCountMyLocks(holder->tag.lock, MyProc, myHolding);
if (myHolders[lockmode] != 0) if (myHolding[lockmode] != 0)
{ {
GrantLock(lock, holder, lockmode); GrantLock(lock, holder, lockmode);
HOLDER_PRINT("LockAcquire: my other XID owning", holder); HOLDER_PRINT("LockAcquire: my other XID owning", holder);
@ -650,7 +658,7 @@ LockAcquire(LOCKMETHOD lockmethod, LOCKTAG *locktag,
*/ */
for (i = 1; i <= lockMethodTable->ctl->numLockModes; i++) for (i = 1; i <= lockMethodTable->ctl->numLockModes; i++)
{ {
if (myHolders[i] > 0 && if (myHolding[i] > 0 &&
lockMethodTable->ctl->conflictTab[i] & lock->waitMask) lockMethodTable->ctl->conflictTab[i] & lock->waitMask)
break; /* yes, there is a conflict */ break; /* yes, there is a conflict */
} }
@ -664,12 +672,12 @@ LockAcquire(LOCKMETHOD lockmethod, LOCKTAG *locktag,
else else
status = LockResolveConflicts(lockmethod, lockmode, status = LockResolveConflicts(lockmethod, lockmode,
lock, holder, lock, holder,
MyProc, myHolders); MyProc, myHolding);
} }
else else
status = LockResolveConflicts(lockmethod, lockmode, status = LockResolveConflicts(lockmethod, lockmode,
lock, holder, lock, holder,
MyProc, myHolders); MyProc, myHolding);
if (status == STATUS_OK) if (status == STATUS_OK)
GrantLock(lock, holder, lockmode); GrantLock(lock, holder, lockmode);
@ -694,11 +702,11 @@ LockAcquire(LOCKMETHOD lockmethod, LOCKTAG *locktag,
} }
else else
HOLDER_PRINT("LockAcquire: NHOLDING", holder); HOLDER_PRINT("LockAcquire: NHOLDING", holder);
lock->nHolding--; lock->nRequested--;
lock->holders[lockmode]--; lock->requested[lockmode]--;
LOCK_PRINT("LockAcquire: user lock failed", lock, lockmode); LOCK_PRINT("LockAcquire: user lock failed", lock, lockmode);
Assert((lock->nHolding > 0) && (lock->holders[lockmode] >= 0)); Assert((lock->nRequested > 0) && (lock->requested[lockmode] >= 0));
Assert(lock->nActive <= lock->nHolding); Assert(lock->nGranted <= lock->nRequested);
SpinRelease(masterLock); SpinRelease(masterLock);
return FALSE; return FALSE;
} }
@ -708,17 +716,17 @@ LockAcquire(LOCKMETHOD lockmethod, LOCKTAG *locktag,
* Construct bitmask of locks this process holds on this object. * Construct bitmask of locks this process holds on this object.
*/ */
{ {
int holdLock = 0; int heldLocks = 0;
int tmpMask; int tmpMask;
for (i = 1, tmpMask = 2; for (i = 1, tmpMask = 2;
i <= lockMethodTable->ctl->numLockModes; i <= lockMethodTable->ctl->numLockModes;
i++, tmpMask <<= 1) i++, tmpMask <<= 1)
{ {
if (myHolders[i] > 0) if (myHolding[i] > 0)
holdLock |= tmpMask; heldLocks |= tmpMask;
} }
MyProc->holdLock = holdLock; MyProc->heldLocks = heldLocks;
} }
/* /*
@ -736,7 +744,7 @@ LockAcquire(LOCKMETHOD lockmethod, LOCKTAG *locktag,
* Check the holder entry status, in case something in the ipc * Check the holder entry status, in case something in the ipc
* communication doesn't work correctly. * communication doesn't work correctly.
*/ */
if (!((holder->nHolding > 0) && (holder->holders[lockmode] > 0))) if (!((holder->nHolding > 0) && (holder->holding[lockmode] > 0)))
{ {
HOLDER_PRINT("LockAcquire: INCONSISTENT", holder); HOLDER_PRINT("LockAcquire: INCONSISTENT", holder);
LOCK_PRINT("LockAcquire: INCONSISTENT", lock, lockmode); LOCK_PRINT("LockAcquire: INCONSISTENT", lock, lockmode);
@ -763,7 +771,7 @@ LockAcquire(LOCKMETHOD lockmethod, LOCKTAG *locktag,
* determining whether or not any new lock acquired conflicts with * determining whether or not any new lock acquired conflicts with
* the old ones. * the old ones.
* *
* The caller can optionally pass the process's total holders counts, if * The caller can optionally pass the process's total holding counts, if
* known. If NULL is passed then these values will be computed internally. * known. If NULL is passed then these values will be computed internally.
* ---------------------------- * ----------------------------
*/ */
@ -773,28 +781,28 @@ LockResolveConflicts(LOCKMETHOD lockmethod,
LOCK *lock, LOCK *lock,
HOLDER *holder, HOLDER *holder,
PROC *proc, PROC *proc,
int *myHolders) /* myHolders[] array or NULL */ int *myHolding) /* myHolding[] array or NULL */
{ {
LOCKMETHODCTL *lockctl = LockMethodTable[lockmethod]->ctl; LOCKMETHODCTL *lockctl = LockMethodTable[lockmethod]->ctl;
int numLockModes = lockctl->numLockModes; int numLockModes = lockctl->numLockModes;
int bitmask; int bitmask;
int i, int i,
tmpMask; tmpMask;
int localHolders[MAX_LOCKMODES]; int localHolding[MAX_LOCKMODES];
Assert((holder->nHolding >= 0) && (holder->holders[lockmode] >= 0)); Assert((holder->nHolding >= 0) && (holder->holding[lockmode] >= 0));
/* ---------------------------- /* ----------------------------
* first check for global conflicts: If no locks conflict * first check for global conflicts: If no locks conflict
* with mine, then I get the lock. * with mine, then I get the lock.
* *
* Checking for conflict: lock->mask represents the types of * Checking for conflict: lock->grantMask represents the types of
* currently held locks. conflictTable[lockmode] has a bit * currently held locks. conflictTable[lockmode] has a bit
* set for each type of lock that conflicts with mine. Bitwise * set for each type of lock that conflicts with mine. Bitwise
* compare tells if there is a conflict. * compare tells if there is a conflict.
* ---------------------------- * ----------------------------
*/ */
if (!(lockctl->conflictTab[lockmode] & lock->mask)) if (!(lockctl->conflictTab[lockmode] & lock->grantMask))
{ {
HOLDER_PRINT("LockResolveConflicts: no conflict", holder); HOLDER_PRINT("LockResolveConflicts: no conflict", holder);
return STATUS_OK; return STATUS_OK;
@ -807,11 +815,11 @@ LockResolveConflicts(LOCKMETHOD lockmethod,
* process under another XID also count as "our own locks". * process under another XID also count as "our own locks".
* ------------------------ * ------------------------
*/ */
if (myHolders == NULL) if (myHolding == NULL)
{ {
/* Caller didn't do calculation of total holding for me */ /* Caller didn't do calculation of total holding for me */
LockCountMyLocks(holder->tag.lock, proc, localHolders); LockCountMyLocks(holder->tag.lock, proc, localHolding);
myHolders = localHolders; myHolding = localHolding;
} }
/* Compute mask of lock types held by other processes */ /* Compute mask of lock types held by other processes */
@ -819,7 +827,7 @@ LockResolveConflicts(LOCKMETHOD lockmethod,
tmpMask = 2; tmpMask = 2;
for (i = 1; i <= numLockModes; i++, tmpMask <<= 1) for (i = 1; i <= numLockModes; i++, tmpMask <<= 1)
{ {
if (lock->activeHolders[i] != myHolders[i]) if (lock->granted[i] != myHolding[i])
bitmask |= tmpMask; bitmask |= tmpMask;
} }
@ -852,20 +860,20 @@ LockResolveConflicts(LOCKMETHOD lockmethod,
* be a net slowdown. * be a net slowdown.
*/ */
static void static void
LockCountMyLocks(SHMEM_OFFSET lockOffset, PROC *proc, int *myHolders) LockCountMyLocks(SHMEM_OFFSET lockOffset, PROC *proc, int *myHolding)
{ {
HOLDER *holder = NULL; HOLDER *holder = NULL;
HOLDER *nextHolder = NULL; HOLDER *nextHolder = NULL;
SHM_QUEUE *lockQueue = &(proc->lockQueue); SHM_QUEUE *holderQueue = &(proc->holderQueue);
SHMEM_OFFSET end = MAKE_OFFSET(lockQueue); SHMEM_OFFSET end = MAKE_OFFSET(holderQueue);
int i; int i;
MemSet(myHolders, 0, MAX_LOCKMODES * sizeof(int)); MemSet(myHolding, 0, MAX_LOCKMODES * sizeof(int));
if (SHMQueueEmpty(lockQueue)) if (SHMQueueEmpty(holderQueue))
return; return;
SHMQueueFirst(lockQueue, (Pointer *) &holder, &holder->queue); SHMQueueFirst(holderQueue, (Pointer *) &holder, &holder->queue);
do do
{ {
@ -885,7 +893,7 @@ LockCountMyLocks(SHMEM_OFFSET lockOffset, PROC *proc, int *myHolders)
{ {
for (i = 1; i < MAX_LOCKMODES; i++) for (i = 1; i < MAX_LOCKMODES; i++)
{ {
myHolders[i] += holder->holders[i]; myHolding[i] += holder->holding[i];
} }
} }
@ -894,47 +902,47 @@ LockCountMyLocks(SHMEM_OFFSET lockOffset, PROC *proc, int *myHolders)
} }
/* /*
* LockGetMyHoldLocks -- compute bitmask of lock types held by a process * LockGetMyHeldLocks -- compute bitmask of lock types held by a process
* for a given lockable object. * for a given lockable object.
*/ */
static int static int
LockGetMyHoldLocks(SHMEM_OFFSET lockOffset, PROC *proc) LockGetMyHeldLocks(SHMEM_OFFSET lockOffset, PROC *proc)
{ {
int myHolders[MAX_LOCKMODES]; int myHolding[MAX_LOCKMODES];
int holdLock = 0; int heldLocks = 0;
int i, int i,
tmpMask; tmpMask;
LockCountMyLocks(lockOffset, proc, myHolders); LockCountMyLocks(lockOffset, proc, myHolding);
for (i = 1, tmpMask = 2; for (i = 1, tmpMask = 2;
i < MAX_LOCKMODES; i < MAX_LOCKMODES;
i++, tmpMask <<= 1) i++, tmpMask <<= 1)
{ {
if (myHolders[i] > 0) if (myHolding[i] > 0)
holdLock |= tmpMask; heldLocks |= tmpMask;
} }
return holdLock; return heldLocks;
} }
/* /*
* GrantLock -- update the lock and holder data structures to show * GrantLock -- update the lock and holder data structures to show
* the new lock has been granted. * the lock request has been granted.
*/ */
void void
GrantLock(LOCK *lock, HOLDER *holder, LOCKMODE lockmode) GrantLock(LOCK *lock, HOLDER *holder, LOCKMODE lockmode)
{ {
lock->nActive++; lock->nGranted++;
lock->activeHolders[lockmode]++; lock->granted[lockmode]++;
lock->mask |= BITS_ON[lockmode]; lock->grantMask |= BITS_ON[lockmode];
if (lock->activeHolders[lockmode] == lock->holders[lockmode]) if (lock->granted[lockmode] == lock->requested[lockmode])
lock->waitMask &= BITS_OFF[lockmode]; lock->waitMask &= BITS_OFF[lockmode];
LOCK_PRINT("GrantLock", lock, lockmode); LOCK_PRINT("GrantLock", lock, lockmode);
Assert((lock->nActive > 0) && (lock->activeHolders[lockmode] > 0)); Assert((lock->nGranted > 0) && (lock->granted[lockmode] > 0));
Assert(lock->nActive <= lock->nHolding); Assert(lock->nGranted <= lock->nRequested);
holder->holders[lockmode]++; holder->holding[lockmode]++;
holder->nHolding++; holder->nHolding++;
Assert((holder->nHolding > 0) && (holder->holders[lockmode] > 0)); Assert((holder->nHolding > 0) && (holder->holding[lockmode] > 0));
} }
/* /*
@ -952,14 +960,6 @@ WaitOnLock(LOCKMETHOD lockmethod, LOCKMODE lockmode,
Assert(lockmethod < NumLockMethods); Assert(lockmethod < NumLockMethods);
/*
* the waitqueue is ordered by priority. I insert myself according to
* the priority of the lock I am acquiring.
*
* SYNC NOTE: I am assuming that the lock table spinlock is sufficient
* synchronization for this queue. That will not be true if/when
* people can be deleted from the queue by a SIGINT or something.
*/
LOCK_PRINT("WaitOnLock: sleeping on lock", lock, lockmode); LOCK_PRINT("WaitOnLock: sleeping on lock", lock, lockmode);
old_status = pstrdup(get_ps_display()); old_status = pstrdup(get_ps_display());
@ -971,7 +971,7 @@ WaitOnLock(LOCKMETHOD lockmethod, LOCKMODE lockmode,
/* /*
* NOTE: Think not to put any lock state cleanup after the call to * NOTE: Think not to put any lock state cleanup after the call to
* ProcSleep, in either the normal or failure path. The lock state * ProcSleep, in either the normal or failure path. The lock state
* must be fully set by the lock grantor, or by HandleDeadlock if we * must be fully set by the lock grantor, or by HandleDeadLock if we
* give up waiting for the lock. This is necessary because of the * give up waiting for the lock. This is necessary because of the
* possibility that a cancel/die interrupt will interrupt ProcSleep * possibility that a cancel/die interrupt will interrupt ProcSleep
* after someone else grants us the lock, but before we've noticed it. * after someone else grants us the lock, but before we've noticed it.
@ -1074,9 +1074,6 @@ LockRelease(LOCKMETHOD lockmethod, LOCKTAG *locktag,
return FALSE; return FALSE;
} }
LOCK_PRINT("LockRelease: found", lock, lockmode); LOCK_PRINT("LockRelease: found", lock, lockmode);
Assert((lock->nHolding > 0) && (lock->holders[lockmode] >= 0));
Assert((lock->nActive > 0) && (lock->activeHolders[lockmode] >= 0));
Assert(lock->nActive <= lock->nHolding);
/* /*
* Find the holder entry for this holder. * Find the holder entry for this holder.
@ -1107,29 +1104,32 @@ LockRelease(LOCKMETHOD lockmethod, LOCKTAG *locktag,
* Check that we are actually holding a lock of the type we want to * Check that we are actually holding a lock of the type we want to
* release. * release.
*/ */
if (!(holder->holders[lockmode] > 0)) if (!(holder->holding[lockmode] > 0))
{ {
SpinRelease(masterLock); SpinRelease(masterLock);
HOLDER_PRINT("LockRelease: WRONGTYPE", holder); HOLDER_PRINT("LockRelease: WRONGTYPE", holder);
elog(NOTICE, "LockRelease: you don't own a lock of type %s", elog(NOTICE, "LockRelease: you don't own a lock of type %s",
lock_types[lockmode]); lock_types[lockmode]);
Assert(holder->holders[lockmode] >= 0); Assert(holder->holding[lockmode] >= 0);
return FALSE; return FALSE;
} }
Assert(holder->nHolding > 0); Assert(holder->nHolding > 0);
Assert((lock->nRequested > 0) && (lock->requested[lockmode] > 0));
Assert((lock->nGranted > 0) && (lock->granted[lockmode] > 0));
Assert(lock->nGranted <= lock->nRequested);
/* /*
* fix the general lock stats * fix the general lock stats
*/ */
lock->nHolding--; lock->nRequested--;
lock->holders[lockmode]--; lock->requested[lockmode]--;
lock->nActive--; lock->nGranted--;
lock->activeHolders[lockmode]--; lock->granted[lockmode]--;
if (!(lock->activeHolders[lockmode])) if (lock->granted[lockmode] == 0)
{ {
/* change the conflict mask. No more of this lock type. */ /* change the conflict mask. No more of this lock type. */
lock->mask &= BITS_OFF[lockmode]; lock->grantMask &= BITS_OFF[lockmode];
} }
#ifdef NOT_USED #ifdef NOT_USED
@ -1139,14 +1139,14 @@ LockRelease(LOCKMETHOD lockmethod, LOCKTAG *locktag,
* with the remaining locks. * with the remaining locks.
* -------------------------- * --------------------------
*/ */
if (lock->activeHolders[lockmode]) if (lock->granted[lockmode])
wakeupNeeded = false; wakeupNeeded = false;
else else
#endif #endif
/* /*
* Above is not valid any more (due to MVCC lock modes). Actually * Above is not valid any more (due to MVCC lock modes). Actually
* we should compare activeHolders[lockmode] with number of * we should compare granted[lockmode] with number of
* waiters holding lock of this type and try to wakeup only if * waiters holding lock of this type and try to wakeup only if
* these numbers are equal (and lock released conflicts with locks * these numbers are equal (and lock released conflicts with locks
* requested by waiters). For the moment we only check the last * requested by waiters). For the moment we only check the last
@ -1156,11 +1156,11 @@ LockRelease(LOCKMETHOD lockmethod, LOCKTAG *locktag,
wakeupNeeded = true; wakeupNeeded = true;
LOCK_PRINT("LockRelease: updated", lock, lockmode); LOCK_PRINT("LockRelease: updated", lock, lockmode);
Assert((lock->nHolding >= 0) && (lock->holders[lockmode] >= 0)); Assert((lock->nRequested >= 0) && (lock->requested[lockmode] >= 0));
Assert((lock->nActive >= 0) && (lock->activeHolders[lockmode] >= 0)); Assert((lock->nGranted >= 0) && (lock->granted[lockmode] >= 0));
Assert(lock->nActive <= lock->nHolding); Assert(lock->nGranted <= lock->nRequested);
if (!lock->nHolding) if (!lock->nRequested)
{ {
/* ------------------ /* ------------------
* if there's no one waiting in the queue, * if there's no one waiting in the queue,
@ -1180,10 +1180,10 @@ LockRelease(LOCKMETHOD lockmethod, LOCKTAG *locktag,
/* /*
* Now fix the per-holder lock stats. * Now fix the per-holder lock stats.
*/ */
holder->holders[lockmode]--; holder->holding[lockmode]--;
holder->nHolding--; holder->nHolding--;
HOLDER_PRINT("LockRelease: updated", holder); HOLDER_PRINT("LockRelease: updated", holder);
Assert((holder->nHolding >= 0) && (holder->holders[lockmode] >= 0)); Assert((holder->nHolding >= 0) && (holder->holding[lockmode] >= 0));
/* /*
* If this was my last hold on this lock, delete my entry in the holder * If this was my last hold on this lock, delete my entry in the holder
@ -1236,8 +1236,8 @@ LockReleaseAll(LOCKMETHOD lockmethod, PROC *proc,
{ {
HOLDER *holder = NULL; HOLDER *holder = NULL;
HOLDER *nextHolder = NULL; HOLDER *nextHolder = NULL;
SHM_QUEUE *lockQueue = &(proc->lockQueue); SHM_QUEUE *holderQueue = &(proc->holderQueue);
SHMEM_OFFSET end = MAKE_OFFSET(lockQueue); SHMEM_OFFSET end = MAKE_OFFSET(holderQueue);
SPINLOCK masterLock; SPINLOCK masterLock;
LOCKMETHODTABLE *lockMethodTable; LOCKMETHODTABLE *lockMethodTable;
int i, int i,
@ -1260,7 +1260,7 @@ LockReleaseAll(LOCKMETHOD lockmethod, PROC *proc,
return FALSE; return FALSE;
} }
if (SHMQueueEmpty(lockQueue)) if (SHMQueueEmpty(holderQueue))
return TRUE; return TRUE;
numLockModes = lockMethodTable->ctl->numLockModes; numLockModes = lockMethodTable->ctl->numLockModes;
@ -1268,7 +1268,7 @@ LockReleaseAll(LOCKMETHOD lockmethod, PROC *proc,
SpinAcquire(masterLock); SpinAcquire(masterLock);
SHMQueueFirst(lockQueue, (Pointer *) &holder, &holder->queue); SHMQueueFirst(holderQueue, (Pointer *) &holder, &holder->queue);
nleft = 0; nleft = 0;
@ -1308,53 +1308,55 @@ LockReleaseAll(LOCKMETHOD lockmethod, PROC *proc,
HOLDER_PRINT("LockReleaseAll", holder); HOLDER_PRINT("LockReleaseAll", holder);
LOCK_PRINT("LockReleaseAll", lock, 0); LOCK_PRINT("LockReleaseAll", lock, 0);
Assert(lock->nHolding > 0); Assert(lock->nRequested >= 0);
Assert(lock->nActive > 0); Assert(lock->nGranted >= 0);
Assert(lock->nActive <= lock->nHolding); Assert(lock->nGranted <= lock->nRequested);
Assert(holder->nHolding >= 0); Assert(holder->nHolding >= 0);
Assert(holder->nHolding <= lock->nHolding); Assert(holder->nHolding <= lock->nRequested);
/* ------------------ /* ------------------
* fix the general lock stats * fix the general lock stats
* ------------------ * ------------------
*/ */
if (lock->nHolding != holder->nHolding) if (lock->nRequested != holder->nHolding)
{ {
for (i = 1; i <= numLockModes; i++) for (i = 1; i <= numLockModes; i++)
{ {
Assert(holder->holders[i] >= 0); Assert(holder->holding[i] >= 0);
lock->holders[i] -= holder->holders[i]; if (holder->holding[i] > 0)
lock->activeHolders[i] -= holder->holders[i]; {
Assert((lock->holders[i] >= 0) \ lock->requested[i] -= holder->holding[i];
&&(lock->activeHolders[i] >= 0)); lock->granted[i] -= holder->holding[i];
if (!lock->activeHolders[i]) Assert(lock->requested[i] >= 0 && lock->granted[i] >= 0);
lock->mask &= BITS_OFF[i]; if (lock->granted[i] == 0)
lock->grantMask &= BITS_OFF[i];
/* /*
* Read comments in LockRelease * Read comments in LockRelease
*/ */
if (!wakeupNeeded && holder->holders[i] > 0 && if (!wakeupNeeded &&
lockMethodTable->ctl->conflictTab[i] & lock->waitMask) lockMethodTable->ctl->conflictTab[i] & lock->waitMask)
wakeupNeeded = true; wakeupNeeded = true;
}
} }
lock->nHolding -= holder->nHolding; lock->nRequested -= holder->nHolding;
lock->nActive -= holder->nHolding; lock->nGranted -= holder->nHolding;
Assert((lock->nHolding >= 0) && (lock->nActive >= 0)); Assert((lock->nRequested >= 0) && (lock->nGranted >= 0));
Assert(lock->nActive <= lock->nHolding); Assert(lock->nGranted <= lock->nRequested);
} }
else else
{ {
/* -------------- /* --------------
* set nHolding to zero so that we can garbage collect the lock * set nRequested to zero so that we can garbage collect the lock
* down below... * down below...
* -------------- * --------------
*/ */
lock->nHolding = 0; lock->nRequested = 0;
lock->nGranted = 0;
/* Fix the lock status, just for next LOCK_PRINT message. */ /* Fix the lock status, just for next LOCK_PRINT message. */
for (i = 1; i <= numLockModes; i++) for (i = 1; i <= numLockModes; i++)
{ {
Assert(lock->holders[i] == lock->activeHolders[i]); Assert(lock->requested[i] == lock->granted[i]);
lock->holders[i] = lock->activeHolders[i] = 0; lock->requested[i] = lock->granted[i] = 0;
} }
} }
LOCK_PRINT("LockReleaseAll: updated", lock, 0); LOCK_PRINT("LockReleaseAll: updated", lock, 0);
@ -1380,11 +1382,11 @@ LockReleaseAll(LOCKMETHOD lockmethod, PROC *proc,
return FALSE; return FALSE;
} }
if (!lock->nHolding) if (!lock->nRequested)
{ {
/* -------------------- /* --------------------
* if there's no one waiting in the queue, we've just released * We've just released the last lock, so garbage-collect the
* the last lock. * lock object.
* -------------------- * --------------------
*/ */
LOCK_PRINT("LockReleaseAll: deleting", lock, 0); LOCK_PRINT("LockReleaseAll: deleting", lock, 0);
@ -1413,12 +1415,13 @@ next_item:
{ {
#ifdef LOCK_DEBUG #ifdef LOCK_DEBUG
if (lockmethod == USER_LOCKMETHOD ? Trace_userlocks : Trace_locks) if (lockmethod == USER_LOCKMETHOD ? Trace_userlocks : Trace_locks)
elog(DEBUG, "LockReleaseAll: reinitializing lockQueue"); elog(DEBUG, "LockReleaseAll: reinitializing holderQueue");
#endif #endif
SHMQueueInit(lockQueue); SHMQueueInit(holderQueue);
} }
SpinRelease(masterLock); SpinRelease(masterLock);
#ifdef LOCK_DEBUG #ifdef LOCK_DEBUG
if (lockmethod == USER_LOCKMETHOD ? Trace_userlocks : Trace_locks) if (lockmethod == USER_LOCKMETHOD ? Trace_userlocks : Trace_locks)
elog(DEBUG, "LockReleaseAll: done"); elog(DEBUG, "LockReleaseAll: done");
@ -1457,16 +1460,17 @@ LockShmemSize(int maxBackends)
} }
/* /*
* DeadlockCheck -- Checks for deadlocks for a given process * DeadLockCheck -- Checks for deadlocks for a given process
*
* We can't block on user locks, so no sense testing for deadlock
* because there is no blocking, and no timer for the block.
* *
* This code takes a list of locks a process holds, and the lock that * This code takes a list of locks a process holds, and the lock that
* the process is sleeping on, and tries to find if any of the processes * the process is sleeping on, and tries to find if any of the processes
* waiting on its locks hold the lock it is waiting for. If no deadlock * waiting on its locks hold the lock it is waiting for. If no deadlock
* is found, it goes on to look at all the processes waiting on their locks. * is found, it goes on to look at all the processes waiting on their locks.
* *
* We can't block on user locks, so no sense testing for deadlock
* because there is no blocking, and no timer for the block. So,
* only look at regular locks.
*
* We have already locked the master lock before being called. * We have already locked the master lock before being called.
*/ */
bool bool
@ -1476,8 +1480,8 @@ DeadLockCheck(PROC *thisProc, LOCK *findlock)
HOLDER *nextHolder = NULL; HOLDER *nextHolder = NULL;
PROC *waitProc; PROC *waitProc;
PROC_QUEUE *waitQueue; PROC_QUEUE *waitQueue;
SHM_QUEUE *lockQueue = &(thisProc->lockQueue); SHM_QUEUE *holderQueue = &(thisProc->holderQueue);
SHMEM_OFFSET end = MAKE_OFFSET(lockQueue); SHMEM_OFFSET end = MAKE_OFFSET(holderQueue);
LOCKMETHODCTL *lockctl = LockMethodTable[DEFAULT_LOCKMETHOD]->ctl; LOCKMETHODCTL *lockctl = LockMethodTable[DEFAULT_LOCKMETHOD]->ctl;
LOCK *lock; LOCK *lock;
int i, int i,
@ -1494,10 +1498,13 @@ DeadLockCheck(PROC *thisProc, LOCK *findlock)
nprocs = 1; nprocs = 1;
} }
if (SHMQueueEmpty(lockQueue)) /*
* Scan over all the locks held/awaited by thisProc.
*/
if (SHMQueueEmpty(holderQueue))
return false; return false;
SHMQueueFirst(lockQueue, (Pointer *) &holder, &holder->queue); SHMQueueFirst(holderQueue, (Pointer *) &holder, &holder->queue);
do do
{ {
@ -1525,7 +1532,7 @@ DeadLockCheck(PROC *thisProc, LOCK *findlock)
LOCK_PRINT("DeadLockCheck", lock, 0); LOCK_PRINT("DeadLockCheck", lock, 0);
/* /*
* waitLock is always in lockQueue of waiting proc, if !first_run * waitLock is always in holderQueue of waiting proc, if !first_run
* then upper caller will handle waitProcs queue of waitLock. * then upper caller will handle waitProcs queue of waitLock.
*/ */
if (thisProc->waitLock == lock && !first_run) if (thisProc->waitLock == lock && !first_run)
@ -1542,13 +1549,13 @@ DeadLockCheck(PROC *thisProc, LOCK *findlock)
Assert(holder->nHolding > 0); Assert(holder->nHolding > 0);
for (lm = 1; lm <= lockctl->numLockModes; lm++) for (lm = 1; lm <= lockctl->numLockModes; lm++)
{ {
if (holder->holders[lm] > 0 && if (holder->holding[lm] > 0 &&
lockctl->conflictTab[lm] & findlock->waitMask) lockctl->conflictTab[lm] & findlock->waitMask)
return true; return true;
} }
/* /*
* Else - get the next lock from thisProc's lockQueue * Else - get the next lock from thisProc's holderQueue
*/ */
goto nxtl; goto nxtl;
} }
@ -1557,6 +1564,8 @@ DeadLockCheck(PROC *thisProc, LOCK *findlock)
waitProc = (PROC *) MAKE_PTR(waitQueue->links.prev); waitProc = (PROC *) MAKE_PTR(waitQueue->links.prev);
/* /*
* Inner loop scans over all processes waiting for this lock.
*
* NOTE: loop must count down because we want to examine each item * NOTE: loop must count down because we want to examine each item
* in the queue even if waitQueue->size decreases due to waking up * in the queue even if waitQueue->size decreases due to waking up
* some of the processes. * some of the processes.
@ -1573,14 +1582,14 @@ DeadLockCheck(PROC *thisProc, LOCK *findlock)
if (lock == findlock) /* first_run also true */ if (lock == findlock) /* first_run also true */
{ {
/* /*
* If me blocked by his holdlock... * If I'm blocked by his heldLocks...
*/ */
if (lockctl->conflictTab[MyProc->waitLockMode] & waitProc->holdLock) if (lockctl->conflictTab[MyProc->waitLockMode] & waitProc->heldLocks)
{ {
/* and he blocked by me -> deadlock */ /* and he blocked by me -> deadlock */
if (lockctl->conflictTab[waitProc->waitLockMode] & MyProc->holdLock) if (lockctl->conflictTab[waitProc->waitLockMode] & MyProc->heldLocks)
return true; return true;
/* we shouldn't look at lockQueue of our blockers */ /* we shouldn't look at holderQueue of our blockers */
goto nextWaitProc; goto nextWaitProc;
} }
@ -1591,11 +1600,11 @@ DeadLockCheck(PROC *thisProc, LOCK *findlock)
* implicitly). Note that we don't do like test if * implicitly). Note that we don't do like test if
* !first_run (when thisProc is holder and non-waiter on * !first_run (when thisProc is holder and non-waiter on
* lock) and so we call DeadLockCheck below for every * lock) and so we call DeadLockCheck below for every
* waitProc in thisProc->lockQueue, even for waitProc-s * waitProc in thisProc->holderQueue, even for waitProc-s
* un-blocked by thisProc. Should we? This could save us * un-blocked by thisProc. Should we? This could save us
* some time... * some time...
*/ */
if (!(lockctl->conflictTab[waitProc->waitLockMode] & MyProc->holdLock) && if (!(lockctl->conflictTab[waitProc->waitLockMode] & MyProc->heldLocks) &&
!(lockctl->conflictTab[waitProc->waitLockMode] & (1 << MyProc->waitLockMode))) !(lockctl->conflictTab[waitProc->waitLockMode] & (1 << MyProc->waitLockMode)))
goto nextWaitProc; goto nextWaitProc;
} }
@ -1609,13 +1618,13 @@ DeadLockCheck(PROC *thisProc, LOCK *findlock)
goto nextWaitProc; goto nextWaitProc;
} }
/* Recursively check this process's lockQueue. */ /* Recursively check this process's holderQueue. */
Assert(nprocs < MAXBACKENDS); Assert(nprocs < MAXBACKENDS);
checked_procs[nprocs++] = waitProc; checked_procs[nprocs++] = waitProc;
if (DeadLockCheck(waitProc, findlock)) if (DeadLockCheck(waitProc, findlock))
{ {
int holdLock; int heldLocks;
/* /*
* Ok, but is waitProc waiting for me (thisProc) ? * Ok, but is waitProc waiting for me (thisProc) ?
@ -1623,15 +1632,15 @@ DeadLockCheck(PROC *thisProc, LOCK *findlock)
if (thisProc->waitLock == lock) if (thisProc->waitLock == lock)
{ {
Assert(first_run); Assert(first_run);
holdLock = thisProc->holdLock; heldLocks = thisProc->heldLocks;
} }
else else
{ {
/* should we cache holdLock to speed this up? */ /* should we cache heldLocks to speed this up? */
holdLock = LockGetMyHoldLocks(holder->tag.lock, thisProc); heldLocks = LockGetMyHeldLocks(holder->tag.lock, thisProc);
Assert(holdLock != 0); Assert(heldLocks != 0);
} }
if (lockctl->conflictTab[waitProc->waitLockMode] & holdLock) if (lockctl->conflictTab[waitProc->waitLockMode] & heldLocks)
{ {
/* /*
* Last attempt to avoid deadlock: try to wakeup myself. * Last attempt to avoid deadlock: try to wakeup myself.
@ -1703,7 +1712,7 @@ nxtl:
#ifdef LOCK_DEBUG #ifdef LOCK_DEBUG
/* /*
* Dump all locks in the proc->lockQueue. Must have already acquired * Dump all locks in the proc->holderQueue. Must have already acquired
* the masterLock. * the masterLock.
*/ */
void void
@ -1711,7 +1720,7 @@ DumpLocks(void)
{ {
SHMEM_OFFSET location; SHMEM_OFFSET location;
PROC *proc; PROC *proc;
SHM_QUEUE *lockQueue; SHM_QUEUE *holderQueue;
HOLDER *holder = NULL; HOLDER *holder = NULL;
HOLDER *nextHolder = NULL; HOLDER *nextHolder = NULL;
SHMEM_OFFSET end; SHMEM_OFFSET end;
@ -1725,8 +1734,8 @@ DumpLocks(void)
proc = (PROC *) MAKE_PTR(location); proc = (PROC *) MAKE_PTR(location);
if (proc != MyProc) if (proc != MyProc)
return; return;
lockQueue = &proc->lockQueue; holderQueue = &proc->holderQueue;
end = MAKE_OFFSET(lockQueue); end = MAKE_OFFSET(holderQueue);
Assert(lockmethod < NumLockMethods); Assert(lockmethod < NumLockMethods);
lockMethodTable = LockMethodTable[lockmethod]; lockMethodTable = LockMethodTable[lockmethod];
@ -1736,10 +1745,10 @@ DumpLocks(void)
if (proc->waitLock) if (proc->waitLock)
LOCK_PRINT("DumpLocks: waiting on", proc->waitLock, 0); LOCK_PRINT("DumpLocks: waiting on", proc->waitLock, 0);
if (SHMQueueEmpty(lockQueue)) if (SHMQueueEmpty(holderQueue))
return; return;
SHMQueueFirst(lockQueue, (Pointer *) &holder, &holder->queue); SHMQueueFirst(holderQueue, (Pointer *) &holder, &holder->queue);
do do
{ {

View File

@ -8,7 +8,7 @@
* *
* *
* IDENTIFICATION * IDENTIFICATION
* $Header: /cvsroot/pgsql/src/backend/storage/lmgr/proc.c,v 1.92 2001/01/14 05:08:16 tgl Exp $ * $Header: /cvsroot/pgsql/src/backend/storage/lmgr/proc.c,v 1.93 2001/01/16 06:11:34 tgl Exp $
* *
*------------------------------------------------------------------------- *-------------------------------------------------------------------------
*/ */
@ -48,7 +48,7 @@
* This is so that we can support more backends. (system-wide semaphore * This is so that we can support more backends. (system-wide semaphore
* sets run out pretty fast.) -ay 4/95 * sets run out pretty fast.) -ay 4/95
* *
* $Header: /cvsroot/pgsql/src/backend/storage/lmgr/proc.c,v 1.92 2001/01/14 05:08:16 tgl Exp $ * $Header: /cvsroot/pgsql/src/backend/storage/lmgr/proc.c,v 1.93 2001/01/16 06:11:34 tgl Exp $
*/ */
#include "postgres.h" #include "postgres.h"
@ -230,7 +230,7 @@ InitProcess(void)
} }
/* this cannot be initialized until after the buffer pool */ /* this cannot be initialized until after the buffer pool */
SHMQueueInit(&(MyProc->lockQueue)); SHMQueueInit(&(MyProc->holderQueue));
} }
/* /*
@ -311,8 +311,8 @@ ZeroProcSemaphore(PROC *proc)
* Locktable lock must be held by caller. * Locktable lock must be held by caller.
* *
* NB: this does not remove the process' holder object, nor the lock object, * NB: this does not remove the process' holder object, nor the lock object,
* even though their holder counts might now have gone to zero. That will * even though their counts might now have gone to zero. That will happen
* happen during a subsequent LockReleaseAll call, which we expect will happen * during a subsequent LockReleaseAll call, which we expect will happen
* during transaction cleanup. (Removal of a proc from its wait queue by * during transaction cleanup. (Removal of a proc from its wait queue by
* this routine can only happen if we are aborting the transaction.) * this routine can only happen if we are aborting the transaction.)
*/ */
@ -331,14 +331,14 @@ RemoveFromWaitQueue(PROC *proc)
SHMQueueDelete(&(proc->links)); SHMQueueDelete(&(proc->links));
waitLock->waitProcs.size--; waitLock->waitProcs.size--;
/* Undo increments of holder counts by waiting process */ /* Undo increments of request counts by waiting process */
Assert(waitLock->nHolding > 0); Assert(waitLock->nRequested > 0);
Assert(waitLock->nHolding > proc->waitLock->nActive); Assert(waitLock->nRequested > proc->waitLock->nGranted);
waitLock->nHolding--; waitLock->nRequested--;
Assert(waitLock->holders[lockmode] > 0); Assert(waitLock->requested[lockmode] > 0);
waitLock->holders[lockmode]--; waitLock->requested[lockmode]--;
/* don't forget to clear waitMask bit if appropriate */ /* don't forget to clear waitMask bit if appropriate */
if (waitLock->activeHolders[lockmode] == waitLock->holders[lockmode]) if (waitLock->granted[lockmode] == waitLock->requested[lockmode])
waitLock->waitMask &= ~(1 << lockmode); waitLock->waitMask &= ~(1 << lockmode);
/* Clean up the proc's own state */ /* Clean up the proc's own state */
@ -546,7 +546,7 @@ ProcSleep(LOCKMETHODCTL *lockctl,
int waitMask = lock->waitMask; int waitMask = lock->waitMask;
PROC *proc; PROC *proc;
int i; int i;
int aheadHolders[MAX_LOCKMODES]; int aheadGranted[MAX_LOCKMODES];
bool selfConflict = (lockctl->conflictTab[lockmode] & myMask), bool selfConflict = (lockctl->conflictTab[lockmode] & myMask),
prevSame = false; prevSame = false;
#ifndef __BEOS__ #ifndef __BEOS__
@ -559,7 +559,7 @@ ProcSleep(LOCKMETHODCTL *lockctl,
MyProc->waitLock = lock; MyProc->waitLock = lock;
MyProc->waitHolder = holder; MyProc->waitHolder = holder;
MyProc->waitLockMode = lockmode; MyProc->waitLockMode = lockmode;
/* We assume the caller set up MyProc->holdLock */ /* We assume the caller set up MyProc->heldLocks */
proc = (PROC *) MAKE_PTR(waitQueue->links.prev); proc = (PROC *) MAKE_PTR(waitQueue->links.prev);
@ -567,57 +567,61 @@ ProcSleep(LOCKMETHODCTL *lockctl,
if (!(lockctl->conflictTab[lockmode] & waitMask)) if (!(lockctl->conflictTab[lockmode] & waitMask))
goto ins; goto ins;
/* otherwise, determine where we should go into the queue */
for (i = 1; i < MAX_LOCKMODES; i++) for (i = 1; i < MAX_LOCKMODES; i++)
aheadHolders[i] = lock->activeHolders[i]; aheadGranted[i] = lock->granted[i];
(aheadHolders[lockmode])++; (aheadGranted[lockmode])++;
for (i = 0; i < waitQueue->size; i++) for (i = 0; i < waitQueue->size; i++)
{ {
/* am I waiting for him ? */ LOCKMODE procWaitMode = proc->waitLockMode;
if (lockctl->conflictTab[lockmode] & proc->holdLock)
/* must I wait for him ? */
if (lockctl->conflictTab[lockmode] & proc->heldLocks)
{ {
/* is he waiting for me ? */ /* is he waiting for me ? */
if (lockctl->conflictTab[proc->waitLockMode] & MyProc->holdLock) if (lockctl->conflictTab[procWaitMode] & MyProc->heldLocks)
{ {
/* Yes, report deadlock failure */ /* Yes, report deadlock failure */
MyProc->errType = STATUS_ERROR; MyProc->errType = STATUS_ERROR;
goto rt; goto rt;
} }
/* being waiting for him - go past */ /* I must go after him in queue - so continue loop */
} }
/* if he waits for me */ /* if he waits for me, go before him in queue */
else if (lockctl->conflictTab[proc->waitLockMode] & MyProc->holdLock) else if (lockctl->conflictTab[procWaitMode] & MyProc->heldLocks)
break; break;
/* if conflicting locks requested */ /* if conflicting locks requested */
else if (lockctl->conflictTab[proc->waitLockMode] & myMask) else if (lockctl->conflictTab[procWaitMode] & myMask)
{ {
/* /*
* If I request non self-conflicting lock and there are others * If I request non self-conflicting lock and there are others
* requesting the same lock just before me - stay here. * requesting the same lock just before this guy - stop here.
*/ */
if (!selfConflict && prevSame) if (!selfConflict && prevSame)
break; break;
} }
/* /*
* Last attempt to don't move any more: if we don't conflict with * Last attempt to not move any further to the back of the queue:
* rest waiters in queue. * if we don't conflict with remaining waiters, stop here.
*/ */
else if (!(lockctl->conflictTab[lockmode] & waitMask)) else if (!(lockctl->conflictTab[lockmode] & waitMask))
break; break;
prevSame = (proc->waitLockMode == lockmode); /* Move past this guy, and update state accordingly */
(aheadHolders[proc->waitLockMode])++; prevSame = (procWaitMode == lockmode);
if (aheadHolders[proc->waitLockMode] == lock->holders[proc->waitLockMode]) (aheadGranted[procWaitMode])++;
waitMask &= ~(1 << proc->waitLockMode); if (aheadGranted[procWaitMode] == lock->requested[procWaitMode])
waitMask &= ~(1 << procWaitMode);
proc = (PROC *) MAKE_PTR(proc->links.prev); proc = (PROC *) MAKE_PTR(proc->links.prev);
} }
ins:; ins:;
/* ------------------- /* -------------------
* Insert self into queue. These operations are atomic (because * Insert self into queue, ahead of the given proc.
* of the spinlock). * These operations are atomic (because of the spinlock).
* ------------------- * -------------------
*/ */
SHMQueueInsertTL(&(proc->links), &(MyProc->links)); SHMQueueInsertTL(&(proc->links), &(MyProc->links));
@ -838,7 +842,7 @@ nextProc:
void void
ProcAddLock(SHM_QUEUE *elem) ProcAddLock(SHM_QUEUE *elem)
{ {
SHMQueueInsertTL(&MyProc->lockQueue, elem); SHMQueueInsertTL(&MyProc->holderQueue, elem);
} }
/* -------------------- /* --------------------

View File

@ -7,7 +7,7 @@
* Portions Copyright (c) 1996-2000, PostgreSQL, Inc * Portions Copyright (c) 1996-2000, PostgreSQL, Inc
* Portions Copyright (c) 1994, Regents of the University of California * Portions Copyright (c) 1994, Regents of the University of California
* *
* $Id: lock.h,v 1.40 2000/12/22 00:51:54 tgl Exp $ * $Id: lock.h,v 1.41 2001/01/16 06:11:34 tgl Exp $
* *
*------------------------------------------------------------------------- *-------------------------------------------------------------------------
*/ */
@ -150,26 +150,28 @@ typedef struct LOCKTAG
* Per-locked-object lock information: * Per-locked-object lock information:
* *
* tag -- uniquely identifies the object being locked * tag -- uniquely identifies the object being locked
* mask -- union of the conflict masks of all lock types * grantMask -- bitmask for all lock types currently granted on this object.
* currently held on this object. * waitMask -- bitmask for all lock types currently awaited on this object.
* waitProcs -- queue of processes waiting for this lock * waitProcs -- queue of processes waiting for this lock.
* holders -- count of each lock type currently held on the * requested -- count of each lock type currently requested on the lock
* lock. * (includes requests already granted!!).
* nHolding -- total locks of all types. * nRequested -- total requested locks of all types.
* granted -- count of each lock type currently granted on the lock.
* nGranted -- total granted locks of all types.
*/ */
typedef struct LOCK typedef struct LOCK
{ {
/* hash key */ /* hash key */
LOCKTAG tag; LOCKTAG tag; /* unique identifier of lockable object */
/* data */ /* data */
int mask; int grantMask; /* bitmask for lock types already granted */
int waitMask; int waitMask; /* bitmask for lock types awaited */
PROC_QUEUE waitProcs; PROC_QUEUE waitProcs; /* list of PROC objects waiting on lock */
int holders[MAX_LOCKMODES]; int requested[MAX_LOCKMODES]; /* counts of requested locks */
int nHolding; int nRequested; /* total of requested[] array */
int activeHolders[MAX_LOCKMODES]; int granted[MAX_LOCKMODES]; /* counts of granted locks */
int nActive; int nGranted; /* total of granted[] array */
} LOCK; } LOCK;
#define SHMEM_LOCKTAB_KEYSIZE sizeof(LOCKTAG) #define SHMEM_LOCKTAB_KEYSIZE sizeof(LOCKTAG)
@ -193,6 +195,12 @@ typedef struct LOCK
* Currently, session holders are used for user locks and for cross-xact * Currently, session holders are used for user locks and for cross-xact
* locks obtained for VACUUM. We assume that a session lock never conflicts * locks obtained for VACUUM. We assume that a session lock never conflicts
* with per-transaction locks obtained by the same backend. * with per-transaction locks obtained by the same backend.
*
* The holding[] array counts the granted locks (of each type) represented
* by this holder. Note that there will be a holder object, possibly with
* zero holding[], for any lock that the process is currently waiting on.
* Otherwise, holder objects whose counts have gone to zero are recycled
* as soon as convenient.
*/ */
typedef struct HOLDERTAG typedef struct HOLDERTAG
{ {
@ -204,12 +212,12 @@ typedef struct HOLDERTAG
typedef struct HOLDER typedef struct HOLDER
{ {
/* tag */ /* tag */
HOLDERTAG tag; HOLDERTAG tag; /* unique identifier of holder object */
/* data */ /* data */
int holders[MAX_LOCKMODES]; int holding[MAX_LOCKMODES]; /* count of locks currently held */
int nHolding; int nHolding; /* total of holding[] array */
SHM_QUEUE queue; SHM_QUEUE queue; /* list link for process' list of holders */
} HOLDER; } HOLDER;
#define SHMEM_HOLDERTAB_KEYSIZE sizeof(HOLDERTAG) #define SHMEM_HOLDERTAB_KEYSIZE sizeof(HOLDERTAG)
@ -241,7 +249,7 @@ extern bool LockReleaseAll(LOCKMETHOD lockmethod, PROC *proc,
bool allxids, TransactionId xid); bool allxids, TransactionId xid);
extern int LockResolveConflicts(LOCKMETHOD lockmethod, LOCKMODE lockmode, extern int LockResolveConflicts(LOCKMETHOD lockmethod, LOCKMODE lockmode,
LOCK *lock, HOLDER *holder, PROC *proc, LOCK *lock, HOLDER *holder, PROC *proc,
int *myHolders); int *myHolding);
extern void GrantLock(LOCK *lock, HOLDER *holder, LOCKMODE lockmode); extern void GrantLock(LOCK *lock, HOLDER *holder, LOCKMODE lockmode);
extern int LockShmemSize(int maxBackends); extern int LockShmemSize(int maxBackends);
extern bool DeadLockCheck(PROC *thisProc, LOCK *findlock); extern bool DeadLockCheck(PROC *thisProc, LOCK *findlock);

View File

@ -7,7 +7,7 @@
* Portions Copyright (c) 1996-2000, PostgreSQL, Inc * Portions Copyright (c) 1996-2000, PostgreSQL, Inc
* Portions Copyright (c) 1994, Regents of the University of California * Portions Copyright (c) 1994, Regents of the University of California
* *
* $Id: proc.h,v 1.34 2001/01/14 05:08:16 tgl Exp $ * $Id: proc.h,v 1.35 2001/01/16 06:11:34 tgl Exp $
* *
*------------------------------------------------------------------------- *-------------------------------------------------------------------------
*/ */
@ -27,13 +27,20 @@ typedef struct
} SEMA; } SEMA;
/* /*
* Each backend has: * Each backend has a PROC struct in shared memory. There is also a list
* of currently-unused PROC structs that will be reallocated to new backends
* (a fairly pointless optimization, but it's there anyway).
*
* links: list link for any list the PROC is in. When waiting for a lock,
* the PROC is linked into that lock's waitProcs queue. A recycled PROC
* is linked into ProcGlobal's freeProcs list.
*/ */
struct proc struct proc
{ {
/* proc->links MUST BE THE FIRST ELEMENT OF STRUCT (see ProcWakeup()) */ /* proc->links MUST BE THE FIRST ELEMENT OF STRUCT (see ProcWakeup()) */
SHM_QUEUE links; /* proc can be waiting for one event(lock) */ SHM_QUEUE links; /* list link if process is in a list */
SEMA sem; /* ONE semaphore to sleep on */ SEMA sem; /* ONE semaphore to sleep on */
int errType; /* error code tells why we woke up */ int errType; /* error code tells why we woke up */
@ -48,16 +55,17 @@ struct proc
/* Info about lock the process is currently waiting for, if any */ /* Info about lock the process is currently waiting for, if any */
LOCK *waitLock; /* Lock object we're sleeping on ... */ LOCK *waitLock; /* Lock object we're sleeping on ... */
HOLDER *waitHolder; /* Per-holder info for our lock */ HOLDER *waitHolder; /* Per-holder info for awaited lock */
LOCKMODE waitLockMode; /* type of lock we're waiting for */ LOCKMODE waitLockMode; /* type of lock we're waiting for */
LOCKMASK holdLock; /* bitmask for lock types already held */ LOCKMASK heldLocks; /* bitmask for lock types already held on
* this lock object by this backend */
int pid; /* This backend's process id */ int pid; /* This backend's process id */
Oid databaseId; /* OID of database this backend is using */ Oid databaseId; /* OID of database this backend is using */
short sLocks[MAX_SPINS]; /* Spin lock stats */ short sLocks[MAX_SPINS]; /* Spin lock stats */
SHM_QUEUE lockQueue; /* locks associated with current SHM_QUEUE holderQueue; /* list of HOLDER objects for locks held or
* transaction */ * awaited by this backend */
}; };
/* NOTE: "typedef struct proc PROC" appears in storage/lock.h. */ /* NOTE: "typedef struct proc PROC" appears in storage/lock.h. */