diff --git a/doc/src/sgml/monitoring.sgml b/doc/src/sgml/monitoring.sgml index e2d27da38c..241131ce61 100644 --- a/doc/src/sgml/monitoring.sgml +++ b/doc/src/sgml/monitoring.sgml @@ -298,8 +298,8 @@ postgres: user database host pg_stat_replicationpg_stat_replication One row per WAL sender process, showing process ID, user OID, user name, application name, client's address and port number, - time at which the server process began execution, and transaction log - location. + time at which the server process began execution, current WAL sender + state and transaction log location. diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql index aa89240e85..718e996e6b 100644 --- a/src/backend/catalog/system_views.sql +++ b/src/backend/catalog/system_views.sql @@ -501,6 +501,7 @@ CREATE VIEW pg_stat_replication AS S.client_addr, S.client_port, S.backend_start, + W.state, W.sent_location FROM pg_stat_get_activity(NULL) AS S, pg_authid U, pg_stat_get_wal_senders() AS W diff --git a/src/backend/replication/basebackup.c b/src/backend/replication/basebackup.c index c09700f798..144b17c66b 100644 --- a/src/backend/replication/basebackup.c +++ b/src/backend/replication/basebackup.c @@ -24,6 +24,7 @@ #include "libpq/pqformat.h" #include "nodes/pg_list.h" #include "replication/basebackup.h" +#include "replication/walsender.h" #include "storage/fd.h" #include "storage/ipc.h" #include "utils/builtins.h" @@ -115,6 +116,8 @@ SendBaseBackup(const char *options) ALLOCSET_DEFAULT_MAXSIZE); old_context = MemoryContextSwitchTo(backup_context); + WalSndSetState(WALSNDSTATE_BACKUP); + if (backup_label == NULL) ereport(FATAL, (errcode(ERRCODE_PROTOCOL_VIOLATION), diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index 559e7349fc..a0f20ab41f 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -179,6 +179,7 @@ WalSndHandshake(void) { int firstchar; + WalSndSetState(WALSNDSTATE_STARTUP); set_ps_display("idle", false); /* Wait for a command to arrive */ @@ -482,6 +483,9 @@ WalSndLoop(void) if (!XLogSend(output_message, &caughtup)) break; } + + /* Update our state to indicate if we're behind or not */ + WalSndSetState(caughtup ? WALSNDSTATE_STREAMING : WALSNDSTATE_CATCHUP); } /* @@ -533,6 +537,7 @@ InitWalSnd(void) */ walsnd->pid = MyProcPid; MemSet(&walsnd->sentPtr, 0, sizeof(XLogRecPtr)); + walsnd->state = WALSNDSTATE_STARTUP; SpinLockRelease(&walsnd->mutex); /* don't need the lock anymore */ OwnLatch((Latch *) &walsnd->latch); @@ -960,6 +965,45 @@ WalSndWakeup(void) SetLatch(&WalSndCtl->walsnds[i].latch); } +/* Set state for current walsender (only called in walsender) */ +void +WalSndSetState(WalSndState state) +{ + /* use volatile pointer to prevent code rearrangement */ + volatile WalSnd *walsnd = MyWalSnd; + + Assert(am_walsender); + + if (walsnd->state == state) + return; + + SpinLockAcquire(&walsnd->mutex); + walsnd->state = state; + SpinLockRelease(&walsnd->mutex); +} + +/* + * Return a string constant representing the state. This is used + * in system views, and should *not* be translated. + */ +static const char * +WalSndGetStateString(WalSndState state) +{ + switch (state) + { + case WALSNDSTATE_STARTUP: + return "STARTUP"; + case WALSNDSTATE_BACKUP: + return "BACKUP"; + case WALSNDSTATE_CATCHUP: + return "CATCHUP"; + case WALSNDSTATE_STREAMING: + return "STREAMING"; + } + return "UNKNOWN"; +} + + /* * Returns activity of walsenders, including pids and xlog locations sent to * standby servers. @@ -967,7 +1011,7 @@ WalSndWakeup(void) Datum pg_stat_get_wal_senders(PG_FUNCTION_ARGS) { -#define PG_STAT_GET_WAL_SENDERS_COLS 2 +#define PG_STAT_GET_WAL_SENDERS_COLS 3 ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo; TupleDesc tupdesc; Tuplestorestate *tupstore; @@ -1021,7 +1065,8 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS) memset(nulls, 0, sizeof(nulls)); values[0] = Int32GetDatum(walsnd->pid); - values[1] = CStringGetTextDatum(sent_location); + values[1] = CStringGetTextDatum(WalSndGetStateString(walsnd->state)); + values[2] = CStringGetTextDatum(sent_location); tuplestore_putvalues(tupstore, tupdesc, values, nulls); } diff --git a/src/include/catalog/catversion.h b/src/include/catalog/catversion.h index 7a03b1c117..df3c95b5f9 100644 --- a/src/include/catalog/catversion.h +++ b/src/include/catalog/catversion.h @@ -53,6 +53,6 @@ */ /* yyyymmddN */ -#define CATALOG_VERSION_NO 201101081 +#define CATALOG_VERSION_NO 201101111 #endif diff --git a/src/include/catalog/pg_proc.h b/src/include/catalog/pg_proc.h index 2eadd2ced8..f8b5d4da3d 100644 --- a/src/include/catalog/pg_proc.h +++ b/src/include/catalog/pg_proc.h @@ -3075,7 +3075,7 @@ DATA(insert OID = 1936 ( pg_stat_get_backend_idset PGNSP PGUID 12 1 100 0 f f DESCR("statistics: currently active backend IDs"); DATA(insert OID = 2022 ( pg_stat_get_activity PGNSP PGUID 12 1 100 0 f f f f t s 1 0 2249 "23" "{23,26,23,26,25,25,16,1184,1184,1184,869,23}" "{i,o,o,o,o,o,o,o,o,o,o,o}" "{pid,datid,procpid,usesysid,application_name,current_query,waiting,xact_start,query_start,backend_start,client_addr,client_port}" _null_ pg_stat_get_activity _null_ _null_ _null_ )); DESCR("statistics: information about currently active backends"); -DATA(insert OID = 3099 ( pg_stat_get_wal_senders PGNSP PGUID 12 1 10 0 f f f f t s 0 0 2249 "" "{23,25}" "{o,o}" "{procpid,sent_location}" _null_ pg_stat_get_wal_senders _null_ _null_ _null_ )); +DATA(insert OID = 3099 ( pg_stat_get_wal_senders PGNSP PGUID 12 1 10 0 f f f f t s 0 0 2249 "" "{23,25,25}" "{o,o,o}" "{procpid,state,sent_location}" _null_ pg_stat_get_wal_senders _null_ _null_ _null_ )); DESCR("statistics: information about currently active replication"); DATA(insert OID = 2026 ( pg_backend_pid PGNSP PGUID 12 1 0 0 f f f t f s 0 0 23 "" _null_ _null_ _null_ _null_ pg_backend_pid _null_ _null_ _null_ )); DESCR("statistics: current backend PID"); diff --git a/src/include/replication/walsender.h b/src/include/replication/walsender.h index d6767b9dcf..0b4a143f82 100644 --- a/src/include/replication/walsender.h +++ b/src/include/replication/walsender.h @@ -16,12 +16,22 @@ #include "storage/latch.h" #include "storage/spin.h" + +typedef enum WalSndState +{ + WALSNDSTATE_STARTUP = 0, + WALSNDSTATE_BACKUP, + WALSNDSTATE_CATCHUP, + WALSNDSTATE_STREAMING +} WalSndState; + /* * Each walsender has a WalSnd struct in shared memory. */ typedef struct WalSnd { pid_t pid; /* this walsender's process id, or 0 */ + WalSndState state; /* this walsender's state */ XLogRecPtr sentPtr; /* WAL has been sent up to this point */ slock_t mutex; /* locks shared variables shown above */ @@ -53,6 +63,7 @@ extern void WalSndSignals(void); extern Size WalSndShmemSize(void); extern void WalSndShmemInit(void); extern void WalSndWakeup(void); +extern void WalSndSetState(WalSndState state); extern Datum pg_stat_get_wal_senders(PG_FUNCTION_ARGS);