diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index 13f83dd57d6..1b7c2f23a41 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -2645,7 +2645,7 @@ XLogFlush(XLogRecPtr record) END_CRIT_SECTION(); /* wake up walsenders now that we've released heavily contended locks */ - WalSndWakeupProcessRequests(); + WalSndWakeupProcessRequests(true, !RecoveryInProgress()); /* * If we still haven't flushed to the request point then we have a @@ -2816,7 +2816,7 @@ XLogBackgroundFlush(void) END_CRIT_SECTION(); /* wake up walsenders now that we've released heavily contended locks */ - WalSndWakeupProcessRequests(); + WalSndWakeupProcessRequests(true, !RecoveryInProgress()); /* * Great, done. To take some work off the critical path, try to initialize @@ -5765,7 +5765,7 @@ StartupXLOG(void) * If there were cascading standby servers connected to us, nudge any wal * sender processes to notice that we've been promoted. */ - WalSndWakeup(); + WalSndWakeup(true, true); /* * If this was a promotion, request an (online) checkpoint now. This isn't diff --git a/src/backend/access/transam/xlogarchive.c b/src/backend/access/transam/xlogarchive.c index a0f5aa24b58..f3fb92c8f96 100644 --- a/src/backend/access/transam/xlogarchive.c +++ b/src/backend/access/transam/xlogarchive.c @@ -421,7 +421,7 @@ KeepFileRestoredFromArchive(const char *path, const char *xlogfname) * if we restored something other than a WAL segment, but it does no harm * either. */ - WalSndWakeup(); + WalSndWakeup(true, false); } /* diff --git a/src/backend/access/transam/xlogrecovery.c b/src/backend/access/transam/xlogrecovery.c index dbe93947627..02d1b2cd6d8 100644 --- a/src/backend/access/transam/xlogrecovery.c +++ b/src/backend/access/transam/xlogrecovery.c @@ -1935,6 +1935,31 @@ ApplyWalRecord(XLogReaderState *xlogreader, XLogRecord *record, TimeLineID *repl XLogRecoveryCtl->lastReplayedTLI = *replayTLI; SpinLockRelease(&XLogRecoveryCtl->info_lck); + /* ------ + * Wakeup walsenders: + * + * On the standby, the WAL is flushed first (which will only wake up + * physical walsenders) and then applied, which will only wake up logical + * walsenders. + * + * Indeed, logical walsenders on standby can't decode and send data until + * it's been applied. + * + * Physical walsenders don't need to be woken up during replay unless + * cascading replication is allowed and time line change occurred (so that + * they can notice that they are on a new time line). + * + * That's why the wake up conditions are for: + * + * - physical walsenders in case of new time line and cascade + * replication is allowed + * - logical walsenders in case cascade replication is allowed (could not + * be created otherwise) + * ------ + */ + if (AllowCascadeReplication()) + WalSndWakeup(switchedTLI, true); + /* * If rm_redo called XLogRequestWalReceiverReply, then we wake up the * receiver so that it notices the updated lastReplayedEndRecPtr and sends @@ -1958,12 +1983,6 @@ ApplyWalRecord(XLogReaderState *xlogreader, XLogRecord *record, TimeLineID *repl */ RemoveNonParentXlogFiles(xlogreader->EndRecPtr, *replayTLI); - /* - * Wake up any walsenders to notice that we are on a new timeline. - */ - if (AllowCascadeReplication()) - WalSndWakeup(); - /* Reset the prefetcher. */ XLogPrefetchReconfigure(); } @@ -3050,9 +3069,9 @@ ReadRecord(XLogPrefetcher *xlogprefetcher, int emode, { /* * When we find that WAL ends in an incomplete record, keep track - * of that record. After recovery is done, we'll write a record to - * indicate to downstream WAL readers that that portion is to be - * ignored. + * of that record. After recovery is done, we'll write a record + * to indicate to downstream WAL readers that that portion is to + * be ignored. * * However, when ArchiveRecoveryRequested = true, we're going to * switch to a new timeline at the end of recovery. We will only diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c index 685af51d5d3..feff7094351 100644 --- a/src/backend/replication/walreceiver.c +++ b/src/backend/replication/walreceiver.c @@ -1010,7 +1010,7 @@ XLogWalRcvFlush(bool dying, TimeLineID tli) /* Signal the startup process and walsender that new WAL has arrived */ WakeupRecovery(); if (AllowCascadeReplication()) - WalSndWakeup(); + WalSndWakeup(true, false); /* Report XLOG streaming progress in PS display */ if (update_process_title) diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index e40a9b1ba7b..5423cf0a171 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -2603,6 +2603,23 @@ InitWalSenderSlot(void) walsnd->sync_standby_priority = 0; walsnd->latch = &MyProc->procLatch; walsnd->replyTime = 0; + + /* + * The kind assignment is done here and not in StartReplication() + * and StartLogicalReplication(). Indeed, the logical walsender + * needs to read WAL records (like snapshot of running + * transactions) during the slot creation. So it needs to be woken + * up based on its kind. + * + * The kind assignment could also be done in StartReplication(), + * StartLogicalReplication() and CREATE_REPLICATION_SLOT but it + * seems better to set it on one place. + */ + if (MyDatabaseId == InvalidOid) + walsnd->kind = REPLICATION_KIND_PHYSICAL; + else + walsnd->kind = REPLICATION_KIND_LOGICAL; + SpinLockRelease(&walsnd->mutex); /* don't need the lock anymore */ MyWalSnd = (WalSnd *) walsnd; @@ -3280,30 +3297,46 @@ WalSndShmemInit(void) } /* - * Wake up all walsenders + * Wake up physical, logical or both kinds of walsenders + * + * The distinction between physical and logical walsenders is done, because: + * - physical walsenders can't send data until it's been flushed + * - logical walsenders on standby can't decode and send data until it's been + * applied + * + * For cascading replication we need to wake up physical walsenders separately + * from logical walsenders (see the comment before calling WalSndWakeup() in + * ApplyWalRecord() for more details). * * This will be called inside critical sections, so throwing an error is not * advisable. */ void -WalSndWakeup(void) +WalSndWakeup(bool physical, bool logical) { int i; for (i = 0; i < max_wal_senders; i++) { Latch *latch; + ReplicationKind kind; WalSnd *walsnd = &WalSndCtl->walsnds[i]; /* * Get latch pointer with spinlock held, for the unlikely case that - * pointer reads aren't atomic (as they're 8 bytes). + * pointer reads aren't atomic (as they're 8 bytes). While at it, also + * get kind. */ SpinLockAcquire(&walsnd->mutex); latch = walsnd->latch; + kind = walsnd->kind; SpinLockRelease(&walsnd->mutex); - if (latch != NULL) + if (latch == NULL) + continue; + + if ((physical && kind == REPLICATION_KIND_PHYSICAL) || + (logical && kind == REPLICATION_KIND_LOGICAL)) SetLatch(latch); } } diff --git a/src/include/replication/walsender.h b/src/include/replication/walsender.h index 52bb3e2aae3..9df7e50f943 100644 --- a/src/include/replication/walsender.h +++ b/src/include/replication/walsender.h @@ -42,7 +42,7 @@ extern void WalSndResourceCleanup(bool isCommit); extern void WalSndSignals(void); extern Size WalSndShmemSize(void); extern void WalSndShmemInit(void); -extern void WalSndWakeup(void); +extern void WalSndWakeup(bool physical, bool logical); extern void WalSndInitStopping(void); extern void WalSndWaitStopping(void); extern void HandleWalSndInitStopping(void); @@ -60,15 +60,15 @@ extern void WalSndRqstFileReload(void); /* * wakeup walsenders if there is work to be done */ -#define WalSndWakeupProcessRequests() \ - do \ - { \ - if (wake_wal_senders) \ - { \ - wake_wal_senders = false; \ - if (max_wal_senders > 0) \ - WalSndWakeup(); \ - } \ - } while (0) +static inline void +WalSndWakeupProcessRequests(bool physical, bool logical) +{ + if (wake_wal_senders) + { + wake_wal_senders = false; + if (max_wal_senders > 0) + WalSndWakeup(physical, logical); + } +} #endif /* _WALSENDER_H */ diff --git a/src/include/replication/walsender_private.h b/src/include/replication/walsender_private.h index 5310e054c48..ff25aa70a89 100644 --- a/src/include/replication/walsender_private.h +++ b/src/include/replication/walsender_private.h @@ -15,6 +15,7 @@ #include "access/xlog.h" #include "lib/ilist.h" #include "nodes/nodes.h" +#include "nodes/replnodes.h" #include "replication/syncrep.h" #include "storage/latch.h" #include "storage/shmem.h" @@ -79,6 +80,8 @@ typedef struct WalSnd * Timestamp of the last message received from standby. */ TimestampTz replyTime; + + ReplicationKind kind; } WalSnd; extern PGDLLIMPORT WalSnd *MyWalSnd;