mirror of
https://git.postgresql.org/git/postgresql.git
synced 2025-02-23 19:39:53 +08:00
For cascading replication, wake physical and logical walsenders separately
Physical walsenders can't send data until it's been flushed; logical walsenders can't decode and send data until it's been applied. On the standby, the WAL is flushed first, which will only wake up physical walsenders; and then applied, which will only wake up logical walsenders. Previously, all walsenders were awakened when the WAL was flushed. That was fine for logical walsenders on the primary; but on the standby the flushed WAL would have been not applied yet, so logical walsenders were awakened too early. Per idea from Jeff Davis and Amit Kapila. Author: "Drouvot, Bertrand" <bertranddrouvot.pg@gmail.com> Reviewed-By: Jeff Davis <pgsql@j-davis.com> Reviewed-By: Robert Haas <robertmhaas@gmail.com> Reviewed-by: Amit Kapila <amit.kapila16@gmail.com> Reviewed-by: Masahiko Sawada <sawada.mshk@gmail.com> Discussion: https://postgr.es/m/CAA4eK1+zO5LUeisabX10c81LU-fWMKO4M9Wyg1cdkbW7Hqh6vQ@mail.gmail.com
This commit is contained in:
parent
26669757b6
commit
e101dfac3a
@ -2645,7 +2645,7 @@ XLogFlush(XLogRecPtr record)
|
||||
END_CRIT_SECTION();
|
||||
|
||||
/* wake up walsenders now that we've released heavily contended locks */
|
||||
WalSndWakeupProcessRequests();
|
||||
WalSndWakeupProcessRequests(true, !RecoveryInProgress());
|
||||
|
||||
/*
|
||||
* If we still haven't flushed to the request point then we have a
|
||||
@ -2816,7 +2816,7 @@ XLogBackgroundFlush(void)
|
||||
END_CRIT_SECTION();
|
||||
|
||||
/* wake up walsenders now that we've released heavily contended locks */
|
||||
WalSndWakeupProcessRequests();
|
||||
WalSndWakeupProcessRequests(true, !RecoveryInProgress());
|
||||
|
||||
/*
|
||||
* Great, done. To take some work off the critical path, try to initialize
|
||||
@ -5765,7 +5765,7 @@ StartupXLOG(void)
|
||||
* If there were cascading standby servers connected to us, nudge any wal
|
||||
* sender processes to notice that we've been promoted.
|
||||
*/
|
||||
WalSndWakeup();
|
||||
WalSndWakeup(true, true);
|
||||
|
||||
/*
|
||||
* If this was a promotion, request an (online) checkpoint now. This isn't
|
||||
|
@ -421,7 +421,7 @@ KeepFileRestoredFromArchive(const char *path, const char *xlogfname)
|
||||
* if we restored something other than a WAL segment, but it does no harm
|
||||
* either.
|
||||
*/
|
||||
WalSndWakeup();
|
||||
WalSndWakeup(true, false);
|
||||
}
|
||||
|
||||
/*
|
||||
|
@ -1935,6 +1935,31 @@ ApplyWalRecord(XLogReaderState *xlogreader, XLogRecord *record, TimeLineID *repl
|
||||
XLogRecoveryCtl->lastReplayedTLI = *replayTLI;
|
||||
SpinLockRelease(&XLogRecoveryCtl->info_lck);
|
||||
|
||||
/* ------
|
||||
* Wakeup walsenders:
|
||||
*
|
||||
* On the standby, the WAL is flushed first (which will only wake up
|
||||
* physical walsenders) and then applied, which will only wake up logical
|
||||
* walsenders.
|
||||
*
|
||||
* Indeed, logical walsenders on standby can't decode and send data until
|
||||
* it's been applied.
|
||||
*
|
||||
* Physical walsenders don't need to be woken up during replay unless
|
||||
* cascading replication is allowed and time line change occurred (so that
|
||||
* they can notice that they are on a new time line).
|
||||
*
|
||||
* That's why the wake up conditions are for:
|
||||
*
|
||||
* - physical walsenders in case of new time line and cascade
|
||||
* replication is allowed
|
||||
* - logical walsenders in case cascade replication is allowed (could not
|
||||
* be created otherwise)
|
||||
* ------
|
||||
*/
|
||||
if (AllowCascadeReplication())
|
||||
WalSndWakeup(switchedTLI, true);
|
||||
|
||||
/*
|
||||
* If rm_redo called XLogRequestWalReceiverReply, then we wake up the
|
||||
* receiver so that it notices the updated lastReplayedEndRecPtr and sends
|
||||
@ -1958,12 +1983,6 @@ ApplyWalRecord(XLogReaderState *xlogreader, XLogRecord *record, TimeLineID *repl
|
||||
*/
|
||||
RemoveNonParentXlogFiles(xlogreader->EndRecPtr, *replayTLI);
|
||||
|
||||
/*
|
||||
* Wake up any walsenders to notice that we are on a new timeline.
|
||||
*/
|
||||
if (AllowCascadeReplication())
|
||||
WalSndWakeup();
|
||||
|
||||
/* Reset the prefetcher. */
|
||||
XLogPrefetchReconfigure();
|
||||
}
|
||||
@ -3050,9 +3069,9 @@ ReadRecord(XLogPrefetcher *xlogprefetcher, int emode,
|
||||
{
|
||||
/*
|
||||
* When we find that WAL ends in an incomplete record, keep track
|
||||
* of that record. After recovery is done, we'll write a record to
|
||||
* indicate to downstream WAL readers that that portion is to be
|
||||
* ignored.
|
||||
* of that record. After recovery is done, we'll write a record
|
||||
* to indicate to downstream WAL readers that that portion is to
|
||||
* be ignored.
|
||||
*
|
||||
* However, when ArchiveRecoveryRequested = true, we're going to
|
||||
* switch to a new timeline at the end of recovery. We will only
|
||||
|
@ -1010,7 +1010,7 @@ XLogWalRcvFlush(bool dying, TimeLineID tli)
|
||||
/* Signal the startup process and walsender that new WAL has arrived */
|
||||
WakeupRecovery();
|
||||
if (AllowCascadeReplication())
|
||||
WalSndWakeup();
|
||||
WalSndWakeup(true, false);
|
||||
|
||||
/* Report XLOG streaming progress in PS display */
|
||||
if (update_process_title)
|
||||
|
@ -2603,6 +2603,23 @@ InitWalSenderSlot(void)
|
||||
walsnd->sync_standby_priority = 0;
|
||||
walsnd->latch = &MyProc->procLatch;
|
||||
walsnd->replyTime = 0;
|
||||
|
||||
/*
|
||||
* The kind assignment is done here and not in StartReplication()
|
||||
* and StartLogicalReplication(). Indeed, the logical walsender
|
||||
* needs to read WAL records (like snapshot of running
|
||||
* transactions) during the slot creation. So it needs to be woken
|
||||
* up based on its kind.
|
||||
*
|
||||
* The kind assignment could also be done in StartReplication(),
|
||||
* StartLogicalReplication() and CREATE_REPLICATION_SLOT but it
|
||||
* seems better to set it on one place.
|
||||
*/
|
||||
if (MyDatabaseId == InvalidOid)
|
||||
walsnd->kind = REPLICATION_KIND_PHYSICAL;
|
||||
else
|
||||
walsnd->kind = REPLICATION_KIND_LOGICAL;
|
||||
|
||||
SpinLockRelease(&walsnd->mutex);
|
||||
/* don't need the lock anymore */
|
||||
MyWalSnd = (WalSnd *) walsnd;
|
||||
@ -3280,30 +3297,46 @@ WalSndShmemInit(void)
|
||||
}
|
||||
|
||||
/*
|
||||
* Wake up all walsenders
|
||||
* Wake up physical, logical or both kinds of walsenders
|
||||
*
|
||||
* The distinction between physical and logical walsenders is done, because:
|
||||
* - physical walsenders can't send data until it's been flushed
|
||||
* - logical walsenders on standby can't decode and send data until it's been
|
||||
* applied
|
||||
*
|
||||
* For cascading replication we need to wake up physical walsenders separately
|
||||
* from logical walsenders (see the comment before calling WalSndWakeup() in
|
||||
* ApplyWalRecord() for more details).
|
||||
*
|
||||
* This will be called inside critical sections, so throwing an error is not
|
||||
* advisable.
|
||||
*/
|
||||
void
|
||||
WalSndWakeup(void)
|
||||
WalSndWakeup(bool physical, bool logical)
|
||||
{
|
||||
int i;
|
||||
|
||||
for (i = 0; i < max_wal_senders; i++)
|
||||
{
|
||||
Latch *latch;
|
||||
ReplicationKind kind;
|
||||
WalSnd *walsnd = &WalSndCtl->walsnds[i];
|
||||
|
||||
/*
|
||||
* Get latch pointer with spinlock held, for the unlikely case that
|
||||
* pointer reads aren't atomic (as they're 8 bytes).
|
||||
* pointer reads aren't atomic (as they're 8 bytes). While at it, also
|
||||
* get kind.
|
||||
*/
|
||||
SpinLockAcquire(&walsnd->mutex);
|
||||
latch = walsnd->latch;
|
||||
kind = walsnd->kind;
|
||||
SpinLockRelease(&walsnd->mutex);
|
||||
|
||||
if (latch != NULL)
|
||||
if (latch == NULL)
|
||||
continue;
|
||||
|
||||
if ((physical && kind == REPLICATION_KIND_PHYSICAL) ||
|
||||
(logical && kind == REPLICATION_KIND_LOGICAL))
|
||||
SetLatch(latch);
|
||||
}
|
||||
}
|
||||
|
@ -42,7 +42,7 @@ extern void WalSndResourceCleanup(bool isCommit);
|
||||
extern void WalSndSignals(void);
|
||||
extern Size WalSndShmemSize(void);
|
||||
extern void WalSndShmemInit(void);
|
||||
extern void WalSndWakeup(void);
|
||||
extern void WalSndWakeup(bool physical, bool logical);
|
||||
extern void WalSndInitStopping(void);
|
||||
extern void WalSndWaitStopping(void);
|
||||
extern void HandleWalSndInitStopping(void);
|
||||
@ -60,15 +60,15 @@ extern void WalSndRqstFileReload(void);
|
||||
/*
|
||||
* wakeup walsenders if there is work to be done
|
||||
*/
|
||||
#define WalSndWakeupProcessRequests() \
|
||||
do \
|
||||
{ \
|
||||
if (wake_wal_senders) \
|
||||
{ \
|
||||
wake_wal_senders = false; \
|
||||
if (max_wal_senders > 0) \
|
||||
WalSndWakeup(); \
|
||||
} \
|
||||
} while (0)
|
||||
static inline void
|
||||
WalSndWakeupProcessRequests(bool physical, bool logical)
|
||||
{
|
||||
if (wake_wal_senders)
|
||||
{
|
||||
wake_wal_senders = false;
|
||||
if (max_wal_senders > 0)
|
||||
WalSndWakeup(physical, logical);
|
||||
}
|
||||
}
|
||||
|
||||
#endif /* _WALSENDER_H */
|
||||
|
@ -15,6 +15,7 @@
|
||||
#include "access/xlog.h"
|
||||
#include "lib/ilist.h"
|
||||
#include "nodes/nodes.h"
|
||||
#include "nodes/replnodes.h"
|
||||
#include "replication/syncrep.h"
|
||||
#include "storage/latch.h"
|
||||
#include "storage/shmem.h"
|
||||
@ -79,6 +80,8 @@ typedef struct WalSnd
|
||||
* Timestamp of the last message received from standby.
|
||||
*/
|
||||
TimestampTz replyTime;
|
||||
|
||||
ReplicationKind kind;
|
||||
} WalSnd;
|
||||
|
||||
extern PGDLLIMPORT WalSnd *MyWalSnd;
|
||||
|
Loading…
Reference in New Issue
Block a user