diff --git a/doc/src/sgml/protocol.sgml b/doc/src/sgml/protocol.sgml
index d6332e58cf..71c40cc592 100644
--- a/doc/src/sgml/protocol.sgml
+++ b/doc/src/sgml/protocol.sgml
@@ -1463,6 +1463,54 @@ The commands accepted in walsender mode are:
CopyData message):
+
+
+
+
+ Primary keepalive message (B)
+
+
+
+
+
+
+ Byte1('k')
+
+
+
+ Identifies the message as a sender keepalive.
+
+
+
+
+
+ Byte8
+
+
+
+ The current end of WAL on the server, given in
+ XLogRecPtr format.
+
+
+
+
+
+ Byte8
+
+
+
+ The server's system clock at the time of transmission,
+ given in TimestampTz format.
+
+
+
+
+
+
+
+
+
+
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 41800a4604..d98a763fda 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -452,6 +452,9 @@ typedef struct XLogCtlData
XLogRecPtr recoveryLastRecPtr;
/* timestamp of last COMMIT/ABORT record replayed (or being replayed) */
TimestampTz recoveryLastXTime;
+ /* timestamp of when we started replaying the current chunk of WAL data,
+ * only relevant for replication or archive recovery */
+ TimestampTz currentChunkStartTime;
/* end of the last record restored from the archive */
XLogRecPtr restoreLastRecPtr;
/* Are we requested to pause recovery? */
@@ -606,6 +609,7 @@ static void exitArchiveRecovery(TimeLineID endTLI,
static bool recoveryStopsHere(XLogRecord *record, bool *includeThis);
static void recoveryPausesHere(void);
static void SetLatestXTime(TimestampTz xtime);
+static void SetCurrentChunkStartTime(TimestampTz xtime);
static void CheckRequiredParameterValues(void);
static void XLogReportParameters(void);
static void LocalSetXLogInsertAllowed(void);
@@ -5847,6 +5851,41 @@ GetLatestXTime(void)
return xtime;
}
+/*
+ * Save timestamp of the next chunk of WAL records to apply.
+ *
+ * We keep this in XLogCtl, not a simple static variable, so that it can be
+ * seen by all backends.
+ */
+static void
+SetCurrentChunkStartTime(TimestampTz xtime)
+{
+ /* use volatile pointer to prevent code rearrangement */
+ volatile XLogCtlData *xlogctl = XLogCtl;
+
+ SpinLockAcquire(&xlogctl->info_lck);
+ xlogctl->currentChunkStartTime = xtime;
+ SpinLockRelease(&xlogctl->info_lck);
+}
+
+/*
+ * Fetch timestamp of latest processed commit/abort record.
+ * Startup process maintains an accurate local copy in XLogReceiptTime
+ */
+TimestampTz
+GetCurrentChunkReplayStartTime(void)
+{
+ /* use volatile pointer to prevent code rearrangement */
+ volatile XLogCtlData *xlogctl = XLogCtl;
+ TimestampTz xtime;
+
+ SpinLockAcquire(&xlogctl->info_lck);
+ xtime = xlogctl->currentChunkStartTime;
+ SpinLockRelease(&xlogctl->info_lck);
+
+ return xtime;
+}
+
/*
* Returns time of receipt of current chunk of XLOG data, as well as
* whether it was received from streaming replication or from archives.
@@ -6390,6 +6429,7 @@ StartupXLOG(void)
xlogctl->replayEndRecPtr = ReadRecPtr;
xlogctl->recoveryLastRecPtr = ReadRecPtr;
xlogctl->recoveryLastXTime = 0;
+ xlogctl->currentChunkStartTime = 0;
xlogctl->recoveryPause = false;
SpinLockRelease(&xlogctl->info_lck);
@@ -9696,7 +9736,10 @@ retry:
{
havedata = true;
if (!XLByteLT(*RecPtr, latestChunkStart))
+ {
XLogReceiptTime = GetCurrentTimestamp();
+ SetCurrentChunkStartTime(XLogReceiptTime);
+ }
}
else
havedata = false;
diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c
index 1f12dcb62a..8106d6b3a4 100644
--- a/src/backend/replication/walreceiver.c
+++ b/src/backend/replication/walreceiver.c
@@ -124,6 +124,7 @@ static void XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr);
static void XLogWalRcvFlush(bool dying);
static void XLogWalRcvSendReply(void);
static void XLogWalRcvSendHSFeedback(void);
+static void ProcessWalSndrMessage(XLogRecPtr walEnd, TimestampTz sendTime);
/* Signal handlers */
static void WalRcvSigHupHandler(SIGNAL_ARGS);
@@ -218,6 +219,10 @@ WalReceiverMain(void)
/* Fetch information required to start streaming */
strlcpy(conninfo, (char *) walrcv->conninfo, MAXCONNINFO);
startpoint = walrcv->receiveStart;
+
+ /* Initialise to a sanish value */
+ walrcv->lastMsgSendTime = walrcv->lastMsgReceiptTime = GetCurrentTimestamp();
+
SpinLockRelease(&walrcv->mutex);
/* Arrange to clean up at walreceiver exit */
@@ -433,12 +438,28 @@ XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len)
errmsg_internal("invalid WAL message received from primary")));
/* memcpy is required here for alignment reasons */
memcpy(&msghdr, buf, sizeof(WalDataMessageHeader));
+
+ ProcessWalSndrMessage(msghdr.walEnd, msghdr.sendTime);
+
buf += sizeof(WalDataMessageHeader);
len -= sizeof(WalDataMessageHeader);
-
XLogWalRcvWrite(buf, len, msghdr.dataStart);
break;
}
+ case 'k': /* Keepalive */
+ {
+ PrimaryKeepaliveMessage keepalive;
+
+ if (len != sizeof(PrimaryKeepaliveMessage))
+ ereport(ERROR,
+ (errcode(ERRCODE_PROTOCOL_VIOLATION),
+ errmsg_internal("invalid keepalive message received from primary")));
+ /* memcpy is required here for alignment reasons */
+ memcpy(&keepalive, buf, sizeof(PrimaryKeepaliveMessage));
+
+ ProcessWalSndrMessage(keepalive.walEnd, keepalive.sendTime);
+ break;
+ }
default:
ereport(ERROR,
(errcode(ERRCODE_PROTOCOL_VIOLATION),
@@ -711,3 +732,27 @@ XLogWalRcvSendHSFeedback(void)
memcpy(&buf[1], &feedback_message, sizeof(StandbyHSFeedbackMessage));
walrcv_send(buf, sizeof(StandbyHSFeedbackMessage) + 1);
}
+
+/*
+ * Keep track of important messages from primary.
+ */
+static void
+ProcessWalSndrMessage(XLogRecPtr walEnd, TimestampTz sendTime)
+{
+ /* use volatile pointer to prevent code rearrangement */
+ volatile WalRcvData *walrcv = WalRcv;
+
+ TimestampTz lastMsgReceiptTime = GetCurrentTimestamp();
+
+ /* Update shared-memory status */
+ SpinLockAcquire(&walrcv->mutex);
+ walrcv->lastMsgSendTime = sendTime;
+ walrcv->lastMsgReceiptTime = lastMsgReceiptTime;
+ SpinLockRelease(&walrcv->mutex);
+
+ elog(DEBUG2, "sendtime %s receipttime %s replication apply delay %d transfer latency %d",
+ timestamptz_to_str(sendTime),
+ timestamptz_to_str(lastMsgReceiptTime),
+ GetReplicationApplyDelay(),
+ GetReplicationTransferLatency());
+}
diff --git a/src/backend/replication/walreceiverfuncs.c b/src/backend/replication/walreceiverfuncs.c
index 5bce1c34a1..054355b2c5 100644
--- a/src/backend/replication/walreceiverfuncs.c
+++ b/src/backend/replication/walreceiverfuncs.c
@@ -28,6 +28,7 @@
#include "replication/walreceiver.h"
#include "storage/pmsignal.h"
#include "storage/shmem.h"
+#include "utils/timestamp.h"
WalRcvData *WalRcv = NULL;
@@ -238,3 +239,65 @@ GetWalRcvWriteRecPtr(XLogRecPtr *latestChunkStart)
return recptr;
}
+
+/*
+ * Returns the replication apply delay in ms
+ */
+int
+GetReplicationApplyDelay(void)
+{
+ /* use volatile pointer to prevent code rearrangement */
+ volatile WalRcvData *walrcv = WalRcv;
+
+ XLogRecPtr receivePtr;
+ XLogRecPtr replayPtr;
+
+ long secs;
+ int usecs;
+
+ SpinLockAcquire(&walrcv->mutex);
+ receivePtr = walrcv->receivedUpto;
+ SpinLockRelease(&walrcv->mutex);
+
+ replayPtr = GetXLogReplayRecPtr(NULL);
+
+ if (XLByteLE(receivePtr, replayPtr))
+ return 0;
+
+ TimestampDifference(GetCurrentChunkReplayStartTime(),
+ GetCurrentTimestamp(),
+ &secs, &usecs);
+
+ return (((int) secs * 1000) + (usecs / 1000));
+}
+
+/*
+ * Returns the network latency in ms, note that this includes any
+ * difference in clock settings between the servers, as well as timezone.
+ */
+int
+GetReplicationTransferLatency(void)
+{
+ /* use volatile pointer to prevent code rearrangement */
+ volatile WalRcvData *walrcv = WalRcv;
+
+ TimestampTz lastMsgSendTime;
+ TimestampTz lastMsgReceiptTime;
+
+ long secs = 0;
+ int usecs = 0;
+ int ms;
+
+ SpinLockAcquire(&walrcv->mutex);
+ lastMsgSendTime = walrcv->lastMsgSendTime;
+ lastMsgReceiptTime = walrcv->lastMsgReceiptTime;
+ SpinLockRelease(&walrcv->mutex);
+
+ TimestampDifference(lastMsgSendTime,
+ lastMsgReceiptTime,
+ &secs, &usecs);
+
+ ms = ((int) secs * 1000) + (usecs / 1000);
+
+ return ms;
+}
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index ea86520417..ed7298b6ee 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -131,6 +131,7 @@ static void ProcessStandbyMessage(void);
static void ProcessStandbyReplyMessage(void);
static void ProcessStandbyHSFeedbackMessage(void);
static void ProcessRepliesIfAny(void);
+static void WalSndKeepalive(char *msgbuf);
/* Main entry point for walsender process */
@@ -823,30 +824,24 @@ WalSndLoop(void)
*/
if (caughtup || pq_is_send_pending())
{
- TimestampTz finish_time = 0;
- long sleeptime = -1;
+ TimestampTz timeout = 0;
+ long sleeptime = 10000; /* 10 s */
int wakeEvents;
wakeEvents = WL_LATCH_SET | WL_POSTMASTER_DEATH |
- WL_SOCKET_READABLE;
+ WL_SOCKET_READABLE | WL_TIMEOUT;
+
if (pq_is_send_pending())
wakeEvents |= WL_SOCKET_WRITEABLE;
+ else
+ WalSndKeepalive(output_message);
/* Determine time until replication timeout */
if (replication_timeout > 0)
{
- long secs;
- int usecs;
-
- finish_time = TimestampTzPlusMilliseconds(last_reply_timestamp,
+ timeout = TimestampTzPlusMilliseconds(last_reply_timestamp,
replication_timeout);
- TimestampDifference(GetCurrentTimestamp(),
- finish_time, &secs, &usecs);
- sleeptime = secs * 1000 + usecs / 1000;
- /* Avoid Assert in WaitLatchOrSocket if timeout is past */
- if (sleeptime < 0)
- sleeptime = 0;
- wakeEvents |= WL_TIMEOUT;
+ sleeptime = 1 + (replication_timeout / 10);
}
/* Sleep until something happens or replication timeout */
@@ -859,7 +854,7 @@ WalSndLoop(void)
* timeout ... he's supposed to reply *before* that.
*/
if (replication_timeout > 0 &&
- GetCurrentTimestamp() >= finish_time)
+ GetCurrentTimestamp() >= timeout)
{
/*
* Since typically expiration of replication timeout means
@@ -1627,6 +1622,23 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
return (Datum) 0;
}
+static void
+WalSndKeepalive(char *msgbuf)
+{
+ PrimaryKeepaliveMessage keepalive_message;
+
+ /* Construct a new message */
+ keepalive_message.walEnd = sentPtr;
+ keepalive_message.sendTime = GetCurrentTimestamp();
+
+ elog(DEBUG2, "sending replication keepalive");
+
+ /* Prepend with the message type and send it. */
+ msgbuf[0] = 'k';
+ memcpy(msgbuf + 1, &keepalive_message, sizeof(PrimaryKeepaliveMessage));
+ pq_putmessage_noblock('d', msgbuf, sizeof(PrimaryKeepaliveMessage) + 1);
+}
+
/*
* This isn't currently used for anything. Monitoring tools might be
* interested in the future, and we'll need something like this in the
diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h
index 86ab3276ca..4b1f8b8c2f 100644
--- a/src/include/access/xlog.h
+++ b/src/include/access/xlog.h
@@ -293,6 +293,7 @@ extern XLogRecPtr GetXLogWriteRecPtr(void);
extern bool RecoveryIsPaused(void);
extern void SetRecoveryPause(bool recoveryPause);
extern TimestampTz GetLatestXTime(void);
+extern TimestampTz GetCurrentChunkReplayStartTime(void);
extern void UpdateControlFile(void);
extern uint64 GetSystemIdentifier(void);
diff --git a/src/include/replication/walprotocol.h b/src/include/replication/walprotocol.h
index 656c8fc17f..053376d377 100644
--- a/src/include/replication/walprotocol.h
+++ b/src/include/replication/walprotocol.h
@@ -16,6 +16,20 @@
#include "datatype/timestamp.h"
+/*
+ * All messages from WalSender must contain these fields to allow us to
+ * correctly calculate the replication delay.
+ */
+typedef struct
+{
+ /* Current end of WAL on the sender */
+ XLogRecPtr walEnd;
+
+ /* Sender's system clock at the time of transmission */
+ TimestampTz sendTime;
+} WalSndrMessage;
+
+
/*
* Header for a WAL data message (message type 'w'). This is wrapped within
* a CopyData message at the FE/BE protocol level.
@@ -39,6 +53,14 @@ typedef struct
TimestampTz sendTime;
} WalDataMessageHeader;
+/*
+ * Keepalive message from primary (message type 'k'). (lowercase k)
+ * This is wrapped within a CopyData message at the FE/BE protocol level.
+ *
+ * Note that the data length is not specified here.
+ */
+typedef WalSndrMessage PrimaryKeepaliveMessage;
+
/*
* Reply message from standby (message type 'r'). This is wrapped within
* a CopyData message at the FE/BE protocol level.
diff --git a/src/include/replication/walreceiver.h b/src/include/replication/walreceiver.h
index 77f5252091..926730c9f8 100644
--- a/src/include/replication/walreceiver.h
+++ b/src/include/replication/walreceiver.h
@@ -78,6 +78,12 @@ typedef struct
*/
XLogRecPtr latestChunkStart;
+ /*
+ * Time of send and receive of any message received.
+ */
+ TimestampTz lastMsgSendTime;
+ TimestampTz lastMsgReceiptTime;
+
/*
* connection string; is used for walreceiver to connect with the primary.
*/
@@ -112,5 +118,7 @@ extern void ShutdownWalRcv(void);
extern bool WalRcvInProgress(void);
extern void RequestXLogStreaming(XLogRecPtr recptr, const char *conninfo);
extern XLogRecPtr GetWalRcvWriteRecPtr(XLogRecPtr *latestChunkStart);
+extern int GetReplicationApplyDelay(void);
+extern int GetReplicationTransferLatency(void);
#endif /* _WALRECEIVER_H */