mirror of
https://git.postgresql.org/git/postgresql.git
synced 2025-02-23 19:39:53 +08:00
For cascading replication, wake physical and logical walsenders separately
Physical walsenders can't send data until it's been flushed; logical walsenders can't decode and send data until it's been applied. On the standby, the WAL is flushed first, which will only wake up physical walsenders; and then applied, which will only wake up logical walsenders. Previously, all walsenders were awakened when the WAL was flushed. That was fine for logical walsenders on the primary; but on the standby the flushed WAL would have been not applied yet, so logical walsenders were awakened too early. Per idea from Jeff Davis and Amit Kapila. Author: "Drouvot, Bertrand" <bertranddrouvot.pg@gmail.com> Reviewed-By: Jeff Davis <pgsql@j-davis.com> Reviewed-By: Robert Haas <robertmhaas@gmail.com> Reviewed-by: Amit Kapila <amit.kapila16@gmail.com> Reviewed-by: Masahiko Sawada <sawada.mshk@gmail.com> Discussion: https://postgr.es/m/CAA4eK1+zO5LUeisabX10c81LU-fWMKO4M9Wyg1cdkbW7Hqh6vQ@mail.gmail.com
This commit is contained in:
parent
26669757b6
commit
e101dfac3a
@ -2645,7 +2645,7 @@ XLogFlush(XLogRecPtr record)
|
|||||||
END_CRIT_SECTION();
|
END_CRIT_SECTION();
|
||||||
|
|
||||||
/* wake up walsenders now that we've released heavily contended locks */
|
/* wake up walsenders now that we've released heavily contended locks */
|
||||||
WalSndWakeupProcessRequests();
|
WalSndWakeupProcessRequests(true, !RecoveryInProgress());
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* If we still haven't flushed to the request point then we have a
|
* If we still haven't flushed to the request point then we have a
|
||||||
@ -2816,7 +2816,7 @@ XLogBackgroundFlush(void)
|
|||||||
END_CRIT_SECTION();
|
END_CRIT_SECTION();
|
||||||
|
|
||||||
/* wake up walsenders now that we've released heavily contended locks */
|
/* wake up walsenders now that we've released heavily contended locks */
|
||||||
WalSndWakeupProcessRequests();
|
WalSndWakeupProcessRequests(true, !RecoveryInProgress());
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Great, done. To take some work off the critical path, try to initialize
|
* Great, done. To take some work off the critical path, try to initialize
|
||||||
@ -5765,7 +5765,7 @@ StartupXLOG(void)
|
|||||||
* If there were cascading standby servers connected to us, nudge any wal
|
* If there were cascading standby servers connected to us, nudge any wal
|
||||||
* sender processes to notice that we've been promoted.
|
* sender processes to notice that we've been promoted.
|
||||||
*/
|
*/
|
||||||
WalSndWakeup();
|
WalSndWakeup(true, true);
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* If this was a promotion, request an (online) checkpoint now. This isn't
|
* If this was a promotion, request an (online) checkpoint now. This isn't
|
||||||
|
@ -421,7 +421,7 @@ KeepFileRestoredFromArchive(const char *path, const char *xlogfname)
|
|||||||
* if we restored something other than a WAL segment, but it does no harm
|
* if we restored something other than a WAL segment, but it does no harm
|
||||||
* either.
|
* either.
|
||||||
*/
|
*/
|
||||||
WalSndWakeup();
|
WalSndWakeup(true, false);
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
@ -1935,6 +1935,31 @@ ApplyWalRecord(XLogReaderState *xlogreader, XLogRecord *record, TimeLineID *repl
|
|||||||
XLogRecoveryCtl->lastReplayedTLI = *replayTLI;
|
XLogRecoveryCtl->lastReplayedTLI = *replayTLI;
|
||||||
SpinLockRelease(&XLogRecoveryCtl->info_lck);
|
SpinLockRelease(&XLogRecoveryCtl->info_lck);
|
||||||
|
|
||||||
|
/* ------
|
||||||
|
* Wakeup walsenders:
|
||||||
|
*
|
||||||
|
* On the standby, the WAL is flushed first (which will only wake up
|
||||||
|
* physical walsenders) and then applied, which will only wake up logical
|
||||||
|
* walsenders.
|
||||||
|
*
|
||||||
|
* Indeed, logical walsenders on standby can't decode and send data until
|
||||||
|
* it's been applied.
|
||||||
|
*
|
||||||
|
* Physical walsenders don't need to be woken up during replay unless
|
||||||
|
* cascading replication is allowed and time line change occurred (so that
|
||||||
|
* they can notice that they are on a new time line).
|
||||||
|
*
|
||||||
|
* That's why the wake up conditions are for:
|
||||||
|
*
|
||||||
|
* - physical walsenders in case of new time line and cascade
|
||||||
|
* replication is allowed
|
||||||
|
* - logical walsenders in case cascade replication is allowed (could not
|
||||||
|
* be created otherwise)
|
||||||
|
* ------
|
||||||
|
*/
|
||||||
|
if (AllowCascadeReplication())
|
||||||
|
WalSndWakeup(switchedTLI, true);
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* If rm_redo called XLogRequestWalReceiverReply, then we wake up the
|
* If rm_redo called XLogRequestWalReceiverReply, then we wake up the
|
||||||
* receiver so that it notices the updated lastReplayedEndRecPtr and sends
|
* receiver so that it notices the updated lastReplayedEndRecPtr and sends
|
||||||
@ -1958,12 +1983,6 @@ ApplyWalRecord(XLogReaderState *xlogreader, XLogRecord *record, TimeLineID *repl
|
|||||||
*/
|
*/
|
||||||
RemoveNonParentXlogFiles(xlogreader->EndRecPtr, *replayTLI);
|
RemoveNonParentXlogFiles(xlogreader->EndRecPtr, *replayTLI);
|
||||||
|
|
||||||
/*
|
|
||||||
* Wake up any walsenders to notice that we are on a new timeline.
|
|
||||||
*/
|
|
||||||
if (AllowCascadeReplication())
|
|
||||||
WalSndWakeup();
|
|
||||||
|
|
||||||
/* Reset the prefetcher. */
|
/* Reset the prefetcher. */
|
||||||
XLogPrefetchReconfigure();
|
XLogPrefetchReconfigure();
|
||||||
}
|
}
|
||||||
@ -3050,9 +3069,9 @@ ReadRecord(XLogPrefetcher *xlogprefetcher, int emode,
|
|||||||
{
|
{
|
||||||
/*
|
/*
|
||||||
* When we find that WAL ends in an incomplete record, keep track
|
* When we find that WAL ends in an incomplete record, keep track
|
||||||
* of that record. After recovery is done, we'll write a record to
|
* of that record. After recovery is done, we'll write a record
|
||||||
* indicate to downstream WAL readers that that portion is to be
|
* to indicate to downstream WAL readers that that portion is to
|
||||||
* ignored.
|
* be ignored.
|
||||||
*
|
*
|
||||||
* However, when ArchiveRecoveryRequested = true, we're going to
|
* However, when ArchiveRecoveryRequested = true, we're going to
|
||||||
* switch to a new timeline at the end of recovery. We will only
|
* switch to a new timeline at the end of recovery. We will only
|
||||||
|
@ -1010,7 +1010,7 @@ XLogWalRcvFlush(bool dying, TimeLineID tli)
|
|||||||
/* Signal the startup process and walsender that new WAL has arrived */
|
/* Signal the startup process and walsender that new WAL has arrived */
|
||||||
WakeupRecovery();
|
WakeupRecovery();
|
||||||
if (AllowCascadeReplication())
|
if (AllowCascadeReplication())
|
||||||
WalSndWakeup();
|
WalSndWakeup(true, false);
|
||||||
|
|
||||||
/* Report XLOG streaming progress in PS display */
|
/* Report XLOG streaming progress in PS display */
|
||||||
if (update_process_title)
|
if (update_process_title)
|
||||||
|
@ -2603,6 +2603,23 @@ InitWalSenderSlot(void)
|
|||||||
walsnd->sync_standby_priority = 0;
|
walsnd->sync_standby_priority = 0;
|
||||||
walsnd->latch = &MyProc->procLatch;
|
walsnd->latch = &MyProc->procLatch;
|
||||||
walsnd->replyTime = 0;
|
walsnd->replyTime = 0;
|
||||||
|
|
||||||
|
/*
|
||||||
|
* The kind assignment is done here and not in StartReplication()
|
||||||
|
* and StartLogicalReplication(). Indeed, the logical walsender
|
||||||
|
* needs to read WAL records (like snapshot of running
|
||||||
|
* transactions) during the slot creation. So it needs to be woken
|
||||||
|
* up based on its kind.
|
||||||
|
*
|
||||||
|
* The kind assignment could also be done in StartReplication(),
|
||||||
|
* StartLogicalReplication() and CREATE_REPLICATION_SLOT but it
|
||||||
|
* seems better to set it on one place.
|
||||||
|
*/
|
||||||
|
if (MyDatabaseId == InvalidOid)
|
||||||
|
walsnd->kind = REPLICATION_KIND_PHYSICAL;
|
||||||
|
else
|
||||||
|
walsnd->kind = REPLICATION_KIND_LOGICAL;
|
||||||
|
|
||||||
SpinLockRelease(&walsnd->mutex);
|
SpinLockRelease(&walsnd->mutex);
|
||||||
/* don't need the lock anymore */
|
/* don't need the lock anymore */
|
||||||
MyWalSnd = (WalSnd *) walsnd;
|
MyWalSnd = (WalSnd *) walsnd;
|
||||||
@ -3280,30 +3297,46 @@ WalSndShmemInit(void)
|
|||||||
}
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Wake up all walsenders
|
* Wake up physical, logical or both kinds of walsenders
|
||||||
|
*
|
||||||
|
* The distinction between physical and logical walsenders is done, because:
|
||||||
|
* - physical walsenders can't send data until it's been flushed
|
||||||
|
* - logical walsenders on standby can't decode and send data until it's been
|
||||||
|
* applied
|
||||||
|
*
|
||||||
|
* For cascading replication we need to wake up physical walsenders separately
|
||||||
|
* from logical walsenders (see the comment before calling WalSndWakeup() in
|
||||||
|
* ApplyWalRecord() for more details).
|
||||||
*
|
*
|
||||||
* This will be called inside critical sections, so throwing an error is not
|
* This will be called inside critical sections, so throwing an error is not
|
||||||
* advisable.
|
* advisable.
|
||||||
*/
|
*/
|
||||||
void
|
void
|
||||||
WalSndWakeup(void)
|
WalSndWakeup(bool physical, bool logical)
|
||||||
{
|
{
|
||||||
int i;
|
int i;
|
||||||
|
|
||||||
for (i = 0; i < max_wal_senders; i++)
|
for (i = 0; i < max_wal_senders; i++)
|
||||||
{
|
{
|
||||||
Latch *latch;
|
Latch *latch;
|
||||||
|
ReplicationKind kind;
|
||||||
WalSnd *walsnd = &WalSndCtl->walsnds[i];
|
WalSnd *walsnd = &WalSndCtl->walsnds[i];
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Get latch pointer with spinlock held, for the unlikely case that
|
* Get latch pointer with spinlock held, for the unlikely case that
|
||||||
* pointer reads aren't atomic (as they're 8 bytes).
|
* pointer reads aren't atomic (as they're 8 bytes). While at it, also
|
||||||
|
* get kind.
|
||||||
*/
|
*/
|
||||||
SpinLockAcquire(&walsnd->mutex);
|
SpinLockAcquire(&walsnd->mutex);
|
||||||
latch = walsnd->latch;
|
latch = walsnd->latch;
|
||||||
|
kind = walsnd->kind;
|
||||||
SpinLockRelease(&walsnd->mutex);
|
SpinLockRelease(&walsnd->mutex);
|
||||||
|
|
||||||
if (latch != NULL)
|
if (latch == NULL)
|
||||||
|
continue;
|
||||||
|
|
||||||
|
if ((physical && kind == REPLICATION_KIND_PHYSICAL) ||
|
||||||
|
(logical && kind == REPLICATION_KIND_LOGICAL))
|
||||||
SetLatch(latch);
|
SetLatch(latch);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -42,7 +42,7 @@ extern void WalSndResourceCleanup(bool isCommit);
|
|||||||
extern void WalSndSignals(void);
|
extern void WalSndSignals(void);
|
||||||
extern Size WalSndShmemSize(void);
|
extern Size WalSndShmemSize(void);
|
||||||
extern void WalSndShmemInit(void);
|
extern void WalSndShmemInit(void);
|
||||||
extern void WalSndWakeup(void);
|
extern void WalSndWakeup(bool physical, bool logical);
|
||||||
extern void WalSndInitStopping(void);
|
extern void WalSndInitStopping(void);
|
||||||
extern void WalSndWaitStopping(void);
|
extern void WalSndWaitStopping(void);
|
||||||
extern void HandleWalSndInitStopping(void);
|
extern void HandleWalSndInitStopping(void);
|
||||||
@ -60,15 +60,15 @@ extern void WalSndRqstFileReload(void);
|
|||||||
/*
|
/*
|
||||||
* wakeup walsenders if there is work to be done
|
* wakeup walsenders if there is work to be done
|
||||||
*/
|
*/
|
||||||
#define WalSndWakeupProcessRequests() \
|
static inline void
|
||||||
do \
|
WalSndWakeupProcessRequests(bool physical, bool logical)
|
||||||
{ \
|
{
|
||||||
if (wake_wal_senders) \
|
if (wake_wal_senders)
|
||||||
{ \
|
{
|
||||||
wake_wal_senders = false; \
|
wake_wal_senders = false;
|
||||||
if (max_wal_senders > 0) \
|
if (max_wal_senders > 0)
|
||||||
WalSndWakeup(); \
|
WalSndWakeup(physical, logical);
|
||||||
} \
|
}
|
||||||
} while (0)
|
}
|
||||||
|
|
||||||
#endif /* _WALSENDER_H */
|
#endif /* _WALSENDER_H */
|
||||||
|
@ -15,6 +15,7 @@
|
|||||||
#include "access/xlog.h"
|
#include "access/xlog.h"
|
||||||
#include "lib/ilist.h"
|
#include "lib/ilist.h"
|
||||||
#include "nodes/nodes.h"
|
#include "nodes/nodes.h"
|
||||||
|
#include "nodes/replnodes.h"
|
||||||
#include "replication/syncrep.h"
|
#include "replication/syncrep.h"
|
||||||
#include "storage/latch.h"
|
#include "storage/latch.h"
|
||||||
#include "storage/shmem.h"
|
#include "storage/shmem.h"
|
||||||
@ -79,6 +80,8 @@ typedef struct WalSnd
|
|||||||
* Timestamp of the last message received from standby.
|
* Timestamp of the last message received from standby.
|
||||||
*/
|
*/
|
||||||
TimestampTz replyTime;
|
TimestampTz replyTime;
|
||||||
|
|
||||||
|
ReplicationKind kind;
|
||||||
} WalSnd;
|
} WalSnd;
|
||||||
|
|
||||||
extern PGDLLIMPORT WalSnd *MyWalSnd;
|
extern PGDLLIMPORT WalSnd *MyWalSnd;
|
||||||
|
Loading…
Reference in New Issue
Block a user