mirror of
https://git.postgresql.org/git/postgresql.git
synced 2025-01-06 15:24:56 +08:00
Replace the XLogInsert slots with regular LWLocks.
The special feature the XLogInsert slots had over regular LWLocks is the insertingAt value that was updated atomically with releasing backends waiting on it. Add new functions to the LWLock API to do that, and replace the slots with LWLocks. This reduces the amount of duplicated code. (There's still some duplication, but at least it's all in lwlock.c now.) Reviewed by Andres Freund.
This commit is contained in:
parent
af930e606a
commit
68a2e52bba
File diff suppressed because it is too large
Load Diff
@ -10,6 +10,13 @@
|
||||
* locking should be done with the full lock manager --- which depends on
|
||||
* LWLocks to protect its shared state.
|
||||
*
|
||||
* In addition to exclusive and shared modes, lightweight locks can be used
|
||||
* to wait until a variable changes value. The variable is initially set
|
||||
* when the lock is acquired with LWLockAcquireWithVar, and can be updated
|
||||
* without releasing the lock by calling LWLockUpdateVar. LWLockWaitForVar
|
||||
* waits for the variable to be updated, or until the lock is free. The
|
||||
* meaning of the variable is up to the caller, the lightweight lock code
|
||||
* just assigns and compares it.
|
||||
*
|
||||
* Portions Copyright (c) 1996-2014, PostgreSQL Global Development Group
|
||||
* Portions Copyright (c) 1994, Regents of the University of California
|
||||
@ -78,6 +85,9 @@ static LWLock *held_lwlocks[MAX_SIMUL_LWLOCKS];
|
||||
static int lock_addin_request = 0;
|
||||
static bool lock_addin_request_allowed = true;
|
||||
|
||||
static bool LWLockAcquireCommon(LWLock *l, LWLockMode mode, uint64 *valptr,
|
||||
uint64 val);
|
||||
|
||||
#ifdef LWLOCK_STATS
|
||||
typedef struct lwlock_stats_key
|
||||
{
|
||||
@ -443,16 +453,36 @@ LWLockInitialize(LWLock *lock, int tranche_id)
|
||||
/*
|
||||
* LWLockAcquire - acquire a lightweight lock in the specified mode
|
||||
*
|
||||
* If the lock is not available, sleep until it is.
|
||||
* If the lock is not available, sleep until it is. Returns true if the lock
|
||||
* was available immediately, false if we had to sleep.
|
||||
*
|
||||
* Side effect: cancel/die interrupts are held off until lock release.
|
||||
*/
|
||||
void
|
||||
bool
|
||||
LWLockAcquire(LWLock *l, LWLockMode mode)
|
||||
{
|
||||
return LWLockAcquireCommon(l, mode, NULL, 0);
|
||||
}
|
||||
|
||||
/*
|
||||
* LWLockAcquireWithVar - like LWLockAcquire, but also sets *valptr = val
|
||||
*
|
||||
* The lock is always acquired in exclusive mode with this function.
|
||||
*/
|
||||
bool
|
||||
LWLockAcquireWithVar(LWLock *l, uint64 *valptr, uint64 val)
|
||||
{
|
||||
return LWLockAcquireCommon(l, LW_EXCLUSIVE, valptr, val);
|
||||
}
|
||||
|
||||
/* internal function to implement LWLockAcquire and LWLockAcquireWithVar */
|
||||
static bool
|
||||
LWLockAcquireCommon(LWLock *l, LWLockMode mode, uint64 *valptr, uint64 val)
|
||||
{
|
||||
volatile LWLock *lock = l;
|
||||
PGPROC *proc = MyProc;
|
||||
bool retry = false;
|
||||
bool result = true;
|
||||
int extraWaits = 0;
|
||||
#ifdef LWLOCK_STATS
|
||||
lwlock_stats *lwstats;
|
||||
@ -601,8 +631,13 @@ LWLockAcquire(LWLock *l, LWLockMode mode)
|
||||
|
||||
/* Now loop back and try to acquire lock again. */
|
||||
retry = true;
|
||||
result = false;
|
||||
}
|
||||
|
||||
/* If there's a variable associated with this lock, initialize it */
|
||||
if (valptr)
|
||||
*valptr = val;
|
||||
|
||||
/* We are done updating shared state of the lock itself. */
|
||||
SpinLockRelease(&lock->mutex);
|
||||
|
||||
@ -616,6 +651,8 @@ LWLockAcquire(LWLock *l, LWLockMode mode)
|
||||
*/
|
||||
while (extraWaits-- > 0)
|
||||
PGSemaphoreUnlock(&proc->sem);
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
/*
|
||||
@ -834,6 +871,227 @@ LWLockAcquireOrWait(LWLock *l, LWLockMode mode)
|
||||
return !mustwait;
|
||||
}
|
||||
|
||||
/*
|
||||
* LWLockWaitForVar - Wait until lock is free, or a variable is updated.
|
||||
*
|
||||
* If the lock is held and *valptr equals oldval, waits until the lock is
|
||||
* either freed, or the lock holder updates *valptr by calling
|
||||
* LWLockUpdateVar. If the lock is free on exit (immediately or after
|
||||
* waiting), returns true. If the lock is still held, but *valptr no longer
|
||||
* matches oldval, returns false and sets *newval to the current value in
|
||||
* *valptr.
|
||||
*
|
||||
* It's possible that the lock holder releases the lock, but another backend
|
||||
* acquires it again before we get a chance to observe that the lock was
|
||||
* momentarily released. We wouldn't need to wait for the new lock holder,
|
||||
* but we cannot distinguish that case, so we will have to wait.
|
||||
*
|
||||
* Note: this function ignores shared lock holders; if the lock is held
|
||||
* in shared mode, returns 'true'.
|
||||
*/
|
||||
bool
|
||||
LWLockWaitForVar(LWLock *l, uint64 *valptr, uint64 oldval, uint64 *newval)
|
||||
{
|
||||
volatile LWLock *lock = l;
|
||||
volatile uint64 *valp = valptr;
|
||||
PGPROC *proc = MyProc;
|
||||
int extraWaits = 0;
|
||||
bool result = false;
|
||||
|
||||
/*
|
||||
* Quick test first to see if it the slot is free right now.
|
||||
*
|
||||
* XXX: the caller uses a spinlock before this, so we don't need a memory
|
||||
* barrier here as far as the current usage is concerned. But that might
|
||||
* not be safe in general.
|
||||
*/
|
||||
if (lock->exclusive == 0)
|
||||
return true;
|
||||
|
||||
/*
|
||||
* Lock out cancel/die interrupts while we sleep on the lock. There is
|
||||
* no cleanup mechanism to remove us from the wait queue if we got
|
||||
* interrupted.
|
||||
*/
|
||||
HOLD_INTERRUPTS();
|
||||
|
||||
/*
|
||||
* Loop here to check the lock's status after each time we are signaled.
|
||||
*/
|
||||
for (;;)
|
||||
{
|
||||
bool mustwait;
|
||||
uint64 value;
|
||||
|
||||
/* Acquire mutex. Time spent holding mutex should be short! */
|
||||
#ifdef LWLOCK_STATS
|
||||
lwstats->spin_delay_count += SpinLockAcquire(&lock->mutex);
|
||||
#else
|
||||
SpinLockAcquire(&lock->mutex);
|
||||
#endif
|
||||
|
||||
/* Is the lock now free, and if not, does the value match? */
|
||||
if (lock->exclusive == 0)
|
||||
{
|
||||
result = true;
|
||||
mustwait = false;
|
||||
}
|
||||
else
|
||||
{
|
||||
value = *valp;
|
||||
if (value != oldval)
|
||||
{
|
||||
result = false;
|
||||
mustwait = false;
|
||||
*newval = value;
|
||||
}
|
||||
else
|
||||
mustwait = true;
|
||||
}
|
||||
|
||||
if (!mustwait)
|
||||
break; /* the lock was free or value didn't match */
|
||||
|
||||
/*
|
||||
* Add myself to wait queue.
|
||||
*/
|
||||
proc->lwWaiting = true;
|
||||
proc->lwWaitMode = LW_WAIT_UNTIL_FREE;
|
||||
proc->lwWaitLink = NULL;
|
||||
|
||||
/* waiters are added to the front of the queue */
|
||||
proc->lwWaitLink = lock->head;
|
||||
if (lock->head == NULL)
|
||||
lock->tail = proc;
|
||||
lock->head = proc;
|
||||
|
||||
/* Can release the mutex now */
|
||||
SpinLockRelease(&lock->mutex);
|
||||
|
||||
/*
|
||||
* Wait until awakened.
|
||||
*
|
||||
* Since we share the process wait semaphore with the regular lock
|
||||
* manager and ProcWaitForSignal, and we may need to acquire an LWLock
|
||||
* while one of those is pending, it is possible that we get awakened
|
||||
* for a reason other than being signaled by LWLockRelease. If so,
|
||||
* loop back and wait again. Once we've gotten the LWLock,
|
||||
* re-increment the sema by the number of additional signals received,
|
||||
* so that the lock manager or signal manager will see the received
|
||||
* signal when it next waits.
|
||||
*/
|
||||
LOG_LWDEBUG("LWLockWaitForVar", T_NAME(l), T_ID(l), "waiting");
|
||||
|
||||
#ifdef LWLOCK_STATS
|
||||
lwstats->block_count++;
|
||||
#endif
|
||||
|
||||
TRACE_POSTGRESQL_LWLOCK_WAIT_START(T_NAME(l), T_ID(l), mode);
|
||||
|
||||
for (;;)
|
||||
{
|
||||
/* "false" means cannot accept cancel/die interrupt here. */
|
||||
PGSemaphoreLock(&proc->sem, false);
|
||||
if (!proc->lwWaiting)
|
||||
break;
|
||||
extraWaits++;
|
||||
}
|
||||
|
||||
TRACE_POSTGRESQL_LWLOCK_WAIT_DONE(T_NAME(l), T_ID(l), mode);
|
||||
|
||||
LOG_LWDEBUG("LWLockWaitForVar", T_NAME(l), T_ID(l), "awakened");
|
||||
|
||||
/* Now loop back and check the status of the lock again. */
|
||||
}
|
||||
|
||||
/* We are done updating shared state of the lock itself. */
|
||||
SpinLockRelease(&lock->mutex);
|
||||
|
||||
TRACE_POSTGRESQL_LWLOCK_ACQUIRE(T_NAME(l), T_ID(l), mode);
|
||||
|
||||
/*
|
||||
* Fix the process wait semaphore's count for any absorbed wakeups.
|
||||
*/
|
||||
while (extraWaits-- > 0)
|
||||
PGSemaphoreUnlock(&proc->sem);
|
||||
|
||||
/*
|
||||
* Now okay to allow cancel/die interrupts.
|
||||
*/
|
||||
RESUME_INTERRUPTS();
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* LWLockUpdateVar - Update a variable and wake up waiters atomically
|
||||
*
|
||||
* Sets *valptr to 'val', and wakes up all processes waiting for us with
|
||||
* LWLockWaitForVar(). Setting the value and waking up the processes happen
|
||||
* atomically so that any process calling LWLockWaitForVar() on the same lock
|
||||
* is guaranteed to see the new value, and act accordingly.
|
||||
*
|
||||
* The caller must be holding the lock in exclusive mode.
|
||||
*/
|
||||
void
|
||||
LWLockUpdateVar(LWLock *l, uint64 *valptr, uint64 val)
|
||||
{
|
||||
volatile LWLock *lock = l;
|
||||
volatile uint64 *valp = valptr;
|
||||
PGPROC *head;
|
||||
PGPROC *proc;
|
||||
PGPROC *next;
|
||||
|
||||
/* Acquire mutex. Time spent holding mutex should be short! */
|
||||
SpinLockAcquire(&lock->mutex);
|
||||
|
||||
/* we should hold the lock */
|
||||
Assert(lock->exclusive == 1);
|
||||
|
||||
/* Update the lock's value */
|
||||
*valp = val;
|
||||
|
||||
/*
|
||||
* See if there are any LW_WAIT_UNTIL_FREE waiters that need to be woken
|
||||
* up. They are always in the front of the queue.
|
||||
*/
|
||||
head = lock->head;
|
||||
|
||||
if (head != NULL && head->lwWaitMode == LW_WAIT_UNTIL_FREE)
|
||||
{
|
||||
proc = head;
|
||||
next = proc->lwWaitLink;
|
||||
while (next && next->lwWaitMode == LW_WAIT_UNTIL_FREE)
|
||||
{
|
||||
proc = next;
|
||||
next = next->lwWaitLink;
|
||||
}
|
||||
|
||||
/* proc is now the last PGPROC to be released */
|
||||
lock->head = next;
|
||||
proc->lwWaitLink = NULL;
|
||||
}
|
||||
else
|
||||
head = NULL;
|
||||
|
||||
/* We are done updating shared state of the lock itself. */
|
||||
SpinLockRelease(&lock->mutex);
|
||||
|
||||
/*
|
||||
* Awaken any waiters I removed from the queue.
|
||||
*/
|
||||
while (head != NULL)
|
||||
{
|
||||
proc = head;
|
||||
head = proc->lwWaitLink;
|
||||
proc->lwWaitLink = NULL;
|
||||
proc->lwWaiting = false;
|
||||
PGSemaphoreUnlock(&proc->sem);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* LWLockRelease - release a previously acquired lock
|
||||
*/
|
||||
|
@ -2120,12 +2120,12 @@ static struct config_int ConfigureNamesInt[] =
|
||||
},
|
||||
|
||||
{
|
||||
{"xloginsert_slots", PGC_POSTMASTER, WAL_SETTINGS,
|
||||
gettext_noop("Sets the number of slots for concurrent xlog insertions."),
|
||||
{"xloginsert_locks", PGC_POSTMASTER, WAL_SETTINGS,
|
||||
gettext_noop("Sets the number of locks used for concurrent xlog insertions."),
|
||||
NULL,
|
||||
GUC_NOT_IN_SAMPLE
|
||||
},
|
||||
&num_xloginsert_slots,
|
||||
&num_xloginsert_locks,
|
||||
8, 1, 1000,
|
||||
NULL, NULL, NULL
|
||||
},
|
||||
|
@ -192,7 +192,7 @@ extern bool EnableHotStandby;
|
||||
extern bool fullPageWrites;
|
||||
extern bool wal_log_hints;
|
||||
extern bool log_checkpoints;
|
||||
extern int num_xloginsert_slots;
|
||||
extern int num_xloginsert_locks;
|
||||
|
||||
/* WAL levels */
|
||||
typedef enum WalLevel
|
||||
|
@ -169,13 +169,17 @@ typedef enum LWLockMode
|
||||
extern bool Trace_lwlocks;
|
||||
#endif
|
||||
|
||||
extern void LWLockAcquire(LWLock *lock, LWLockMode mode);
|
||||
extern bool LWLockAcquire(LWLock *lock, LWLockMode mode);
|
||||
extern bool LWLockConditionalAcquire(LWLock *lock, LWLockMode mode);
|
||||
extern bool LWLockAcquireOrWait(LWLock *lock, LWLockMode mode);
|
||||
extern void LWLockRelease(LWLock *lock);
|
||||
extern void LWLockReleaseAll(void);
|
||||
extern bool LWLockHeldByMe(LWLock *lock);
|
||||
|
||||
extern bool LWLockAcquireWithVar(LWLock *lock, uint64 *valptr, uint64 val);
|
||||
extern bool LWLockWaitForVar(LWLock *lock, uint64 *valptr, uint64 oldval, uint64 *newval);
|
||||
extern void LWLockUpdateVar(LWLock *lock, uint64 *valptr, uint64 value);
|
||||
|
||||
extern Size LWLockShmemSize(void);
|
||||
extern void CreateLWLocks(void);
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user