mirror of
https://git.postgresql.org/git/postgresql.git
synced 2025-02-23 19:39:53 +08:00
Handle logical slot conflicts on standby
During WAL replay on the standby, when a conflict with a logical slot is identified, invalidate such slots. There are two sources of conflicts: 1) Using the information added in6af1793954
, logical slots are invalidated if required rows are removed 2) wal_level on the primary server is reduced to below logical Uses the infrastructure introduced in the prior commit. FIXME: add commit reference. Change InvalidatePossiblyObsoleteSlot() to use a recovery conflict to interrupt use of a slot, if called in the startup process. The new recovery conflict is added to pg_stat_database_conflicts, as confl_active_logicalslot. See6af1793954
for an overall design of logical decoding on a standby. Bumps catversion for the addition of the pg_stat_database_conflicts column. Bumps PGSTAT_FILE_FORMAT_ID for the same reason. Author: "Drouvot, Bertrand" <bertranddrouvot.pg@gmail.com> Author: Andres Freund <andres@anarazel.de> Author: Amit Khandekar <amitdkhan.pg@gmail.com> (in an older version) Reviewed-by: "Drouvot, Bertrand" <bertranddrouvot.pg@gmail.com> Reviewed-by: Andres Freund <andres@anarazel.de> Reviewed-by: Robert Haas <robertmhaas@gmail.com> Reviewed-by: Fabrízio de Royes Mello <fabriziomello@gmail.com> Reviewed-by: Bharath Rupireddy <bharath.rupireddyforpostgres@gmail.com> Reviewed-by: Amit Kapila <amit.kapila16@gmail.com> Reviewed-by: Alvaro Herrera <alvherre@alvh.no-ip.org> Discussion: https://postgr.es/m/20230407075009.igg7be27ha2htkbt@awork3.anarazel.de
This commit is contained in:
parent
be87200efd
commit
26669757b6
@ -4742,6 +4742,17 @@ SELECT pid, wait_event_type, wait_event FROM pg_stat_activity WHERE wait_event i
|
||||
deadlocks
|
||||
</para></entry>
|
||||
</row>
|
||||
|
||||
<row>
|
||||
<entry role="catalog_table_entry"><para role="column_definition">
|
||||
<structfield>confl_active_logicalslot</structfield> <type>bigint</type>
|
||||
</para>
|
||||
<para>
|
||||
Number of uses of logical slots in this database that have been
|
||||
canceled due to old snapshots or a too low <xref linkend="guc-wal-level"/>
|
||||
on the primary
|
||||
</para></entry>
|
||||
</row>
|
||||
</tbody>
|
||||
</tgroup>
|
||||
</table>
|
||||
|
@ -197,6 +197,7 @@ gistRedoDeleteRecord(XLogReaderState *record)
|
||||
XLogRecGetBlockTag(record, 0, &rlocator, NULL, NULL);
|
||||
|
||||
ResolveRecoveryConflictWithSnapshot(xldata->snapshotConflictHorizon,
|
||||
xldata->isCatalogRel,
|
||||
rlocator);
|
||||
}
|
||||
|
||||
@ -390,6 +391,7 @@ gistRedoPageReuse(XLogReaderState *record)
|
||||
*/
|
||||
if (InHotStandby)
|
||||
ResolveRecoveryConflictWithSnapshotFullXid(xlrec->snapshotConflictHorizon,
|
||||
xlrec->isCatalogRel,
|
||||
xlrec->locator);
|
||||
}
|
||||
|
||||
|
@ -1003,6 +1003,7 @@ hash_xlog_vacuum_one_page(XLogReaderState *record)
|
||||
|
||||
XLogRecGetBlockTag(record, 0, &rlocator, NULL, NULL);
|
||||
ResolveRecoveryConflictWithSnapshot(xldata->snapshotConflictHorizon,
|
||||
xldata->isCatalogRel,
|
||||
rlocator);
|
||||
}
|
||||
|
||||
|
@ -8769,6 +8769,7 @@ heap_xlog_prune(XLogReaderState *record)
|
||||
*/
|
||||
if (InHotStandby)
|
||||
ResolveRecoveryConflictWithSnapshot(xlrec->snapshotConflictHorizon,
|
||||
xlrec->isCatalogRel,
|
||||
rlocator);
|
||||
|
||||
/*
|
||||
@ -8940,6 +8941,7 @@ heap_xlog_visible(XLogReaderState *record)
|
||||
*/
|
||||
if (InHotStandby)
|
||||
ResolveRecoveryConflictWithSnapshot(xlrec->snapshotConflictHorizon,
|
||||
xlrec->flags & VISIBILITYMAP_XLOG_CATALOG_REL,
|
||||
rlocator);
|
||||
|
||||
/*
|
||||
@ -9061,6 +9063,7 @@ heap_xlog_freeze_page(XLogReaderState *record)
|
||||
|
||||
XLogRecGetBlockTag(record, 0, &rlocator, NULL, NULL);
|
||||
ResolveRecoveryConflictWithSnapshot(xlrec->snapshotConflictHorizon,
|
||||
xlrec->isCatalogRel,
|
||||
rlocator);
|
||||
}
|
||||
|
||||
|
@ -669,6 +669,7 @@ btree_xlog_delete(XLogReaderState *record)
|
||||
XLogRecGetBlockTag(record, 0, &rlocator, NULL, NULL);
|
||||
|
||||
ResolveRecoveryConflictWithSnapshot(xlrec->snapshotConflictHorizon,
|
||||
xlrec->isCatalogRel,
|
||||
rlocator);
|
||||
}
|
||||
|
||||
@ -1007,6 +1008,7 @@ btree_xlog_reuse_page(XLogReaderState *record)
|
||||
|
||||
if (InHotStandby)
|
||||
ResolveRecoveryConflictWithSnapshotFullXid(xlrec->snapshotConflictHorizon,
|
||||
xlrec->isCatalogRel,
|
||||
xlrec->locator);
|
||||
}
|
||||
|
||||
|
@ -879,6 +879,7 @@ spgRedoVacuumRedirect(XLogReaderState *record)
|
||||
|
||||
XLogRecGetBlockTag(record, 0, &locator, NULL, NULL);
|
||||
ResolveRecoveryConflictWithSnapshot(xldata->snapshotConflictHorizon,
|
||||
xldata->isCatalogRel,
|
||||
locator);
|
||||
}
|
||||
|
||||
|
@ -7970,6 +7970,21 @@ xlog_redo(XLogReaderState *record)
|
||||
/* Update our copy of the parameters in pg_control */
|
||||
memcpy(&xlrec, XLogRecGetData(record), sizeof(xl_parameter_change));
|
||||
|
||||
/*
|
||||
* Invalidate logical slots if we are in hot standby and the primary
|
||||
* does not have a WAL level sufficient for logical decoding. No need
|
||||
* to search for potentially conflicting logically slots if standby is
|
||||
* running with wal_level lower than logical, because in that case, we
|
||||
* would have either disallowed creation of logical slots or
|
||||
* invalidated existing ones.
|
||||
*/
|
||||
if (InRecovery && InHotStandby &&
|
||||
xlrec.wal_level < WAL_LEVEL_LOGICAL &&
|
||||
wal_level >= WAL_LEVEL_LOGICAL)
|
||||
InvalidateObsoleteReplicationSlots(RS_INVAL_WAL_LEVEL,
|
||||
0, InvalidOid,
|
||||
InvalidTransactionId);
|
||||
|
||||
LWLockAcquire(ControlFileLock, LW_EXCLUSIVE);
|
||||
ControlFile->MaxConnections = xlrec.MaxConnections;
|
||||
ControlFile->max_worker_processes = xlrec.max_worker_processes;
|
||||
|
@ -1069,7 +1069,8 @@ CREATE VIEW pg_stat_database_conflicts AS
|
||||
pg_stat_get_db_conflict_lock(D.oid) AS confl_lock,
|
||||
pg_stat_get_db_conflict_snapshot(D.oid) AS confl_snapshot,
|
||||
pg_stat_get_db_conflict_bufferpin(D.oid) AS confl_bufferpin,
|
||||
pg_stat_get_db_conflict_startup_deadlock(D.oid) AS confl_deadlock
|
||||
pg_stat_get_db_conflict_startup_deadlock(D.oid) AS confl_deadlock,
|
||||
pg_stat_get_db_conflict_logicalslot(D.oid) AS confl_active_logicalslot
|
||||
FROM pg_database D;
|
||||
|
||||
CREATE VIEW pg_stat_user_functions AS
|
||||
|
@ -1442,7 +1442,13 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlotInvalidationCause cause,
|
||||
slotname, restart_lsn,
|
||||
oldestLSN, snapshotConflictHorizon);
|
||||
|
||||
(void) kill(active_pid, SIGTERM);
|
||||
if (MyBackendType == B_STARTUP)
|
||||
(void) SendProcSignal(active_pid,
|
||||
PROCSIG_RECOVERY_CONFLICT_LOGICALSLOT,
|
||||
InvalidBackendId);
|
||||
else
|
||||
(void) kill(active_pid, SIGTERM);
|
||||
|
||||
last_signaled_pid = active_pid;
|
||||
}
|
||||
|
||||
|
@ -673,6 +673,9 @@ procsignal_sigusr1_handler(SIGNAL_ARGS)
|
||||
if (CheckProcSignal(PROCSIG_RECOVERY_CONFLICT_SNAPSHOT))
|
||||
RecoveryConflictInterrupt(PROCSIG_RECOVERY_CONFLICT_SNAPSHOT);
|
||||
|
||||
if (CheckProcSignal(PROCSIG_RECOVERY_CONFLICT_LOGICALSLOT))
|
||||
RecoveryConflictInterrupt(PROCSIG_RECOVERY_CONFLICT_LOGICALSLOT);
|
||||
|
||||
if (CheckProcSignal(PROCSIG_RECOVERY_CONFLICT_STARTUP_DEADLOCK))
|
||||
RecoveryConflictInterrupt(PROCSIG_RECOVERY_CONFLICT_STARTUP_DEADLOCK);
|
||||
|
||||
|
@ -24,6 +24,7 @@
|
||||
#include "access/xlogutils.h"
|
||||
#include "miscadmin.h"
|
||||
#include "pgstat.h"
|
||||
#include "replication/slot.h"
|
||||
#include "storage/bufmgr.h"
|
||||
#include "storage/lmgr.h"
|
||||
#include "storage/proc.h"
|
||||
@ -466,6 +467,7 @@ ResolveRecoveryConflictWithVirtualXIDs(VirtualTransactionId *waitlist,
|
||||
*/
|
||||
void
|
||||
ResolveRecoveryConflictWithSnapshot(TransactionId snapshotConflictHorizon,
|
||||
bool isCatalogRel,
|
||||
RelFileLocator locator)
|
||||
{
|
||||
VirtualTransactionId *backends;
|
||||
@ -491,6 +493,16 @@ ResolveRecoveryConflictWithSnapshot(TransactionId snapshotConflictHorizon,
|
||||
PROCSIG_RECOVERY_CONFLICT_SNAPSHOT,
|
||||
WAIT_EVENT_RECOVERY_CONFLICT_SNAPSHOT,
|
||||
true);
|
||||
|
||||
/*
|
||||
* Note that WaitExceedsMaxStandbyDelay() is not taken into account here
|
||||
* (as opposed to ResolveRecoveryConflictWithVirtualXIDs() above). That
|
||||
* seems OK, given that this kind of conflict should not normally be
|
||||
* reached, e.g. due to using a physical replication slot.
|
||||
*/
|
||||
if (wal_level >= WAL_LEVEL_LOGICAL && isCatalogRel)
|
||||
InvalidateObsoleteReplicationSlots(RS_INVAL_HORIZON, 0, locator.dbOid,
|
||||
snapshotConflictHorizon);
|
||||
}
|
||||
|
||||
/*
|
||||
@ -499,6 +511,7 @@ ResolveRecoveryConflictWithSnapshot(TransactionId snapshotConflictHorizon,
|
||||
*/
|
||||
void
|
||||
ResolveRecoveryConflictWithSnapshotFullXid(FullTransactionId snapshotConflictHorizon,
|
||||
bool isCatalogRel,
|
||||
RelFileLocator locator)
|
||||
{
|
||||
/*
|
||||
@ -517,7 +530,9 @@ ResolveRecoveryConflictWithSnapshotFullXid(FullTransactionId snapshotConflictHor
|
||||
TransactionId truncated;
|
||||
|
||||
truncated = XidFromFullTransactionId(snapshotConflictHorizon);
|
||||
ResolveRecoveryConflictWithSnapshot(truncated, locator);
|
||||
ResolveRecoveryConflictWithSnapshot(truncated,
|
||||
isCatalogRel,
|
||||
locator);
|
||||
}
|
||||
}
|
||||
|
||||
@ -1478,6 +1493,9 @@ get_recovery_conflict_desc(ProcSignalReason reason)
|
||||
case PROCSIG_RECOVERY_CONFLICT_SNAPSHOT:
|
||||
reasonDesc = _("recovery conflict on snapshot");
|
||||
break;
|
||||
case PROCSIG_RECOVERY_CONFLICT_LOGICALSLOT:
|
||||
reasonDesc = _("recovery conflict on replication slot");
|
||||
break;
|
||||
case PROCSIG_RECOVERY_CONFLICT_STARTUP_DEADLOCK:
|
||||
reasonDesc = _("recovery conflict on buffer deadlock");
|
||||
break;
|
||||
|
@ -2526,6 +2526,9 @@ errdetail_recovery_conflict(void)
|
||||
case PROCSIG_RECOVERY_CONFLICT_SNAPSHOT:
|
||||
errdetail("User query might have needed to see row versions that must be removed.");
|
||||
break;
|
||||
case PROCSIG_RECOVERY_CONFLICT_LOGICALSLOT:
|
||||
errdetail("User was using a logical slot that must be invalidated.");
|
||||
break;
|
||||
case PROCSIG_RECOVERY_CONFLICT_STARTUP_DEADLOCK:
|
||||
errdetail("User transaction caused buffer deadlock with recovery.");
|
||||
break;
|
||||
@ -3143,6 +3146,12 @@ RecoveryConflictInterrupt(ProcSignalReason reason)
|
||||
InterruptPending = true;
|
||||
break;
|
||||
|
||||
case PROCSIG_RECOVERY_CONFLICT_LOGICALSLOT:
|
||||
RecoveryConflictPending = true;
|
||||
QueryCancelPending = true;
|
||||
InterruptPending = true;
|
||||
break;
|
||||
|
||||
default:
|
||||
elog(FATAL, "unrecognized conflict mode: %d",
|
||||
(int) reason);
|
||||
|
@ -109,6 +109,9 @@ pgstat_report_recovery_conflict(int reason)
|
||||
case PROCSIG_RECOVERY_CONFLICT_BUFFERPIN:
|
||||
dbentry->conflict_bufferpin++;
|
||||
break;
|
||||
case PROCSIG_RECOVERY_CONFLICT_LOGICALSLOT:
|
||||
dbentry->conflict_logicalslot++;
|
||||
break;
|
||||
case PROCSIG_RECOVERY_CONFLICT_STARTUP_DEADLOCK:
|
||||
dbentry->conflict_startup_deadlock++;
|
||||
break;
|
||||
@ -387,6 +390,7 @@ pgstat_database_flush_cb(PgStat_EntryRef *entry_ref, bool nowait)
|
||||
PGSTAT_ACCUM_DBCOUNT(conflict_tablespace);
|
||||
PGSTAT_ACCUM_DBCOUNT(conflict_lock);
|
||||
PGSTAT_ACCUM_DBCOUNT(conflict_snapshot);
|
||||
PGSTAT_ACCUM_DBCOUNT(conflict_logicalslot);
|
||||
PGSTAT_ACCUM_DBCOUNT(conflict_bufferpin);
|
||||
PGSTAT_ACCUM_DBCOUNT(conflict_startup_deadlock);
|
||||
|
||||
|
@ -1071,6 +1071,8 @@ PG_STAT_GET_DBENTRY_INT64(xact_commit)
|
||||
/* pg_stat_get_db_xact_rollback */
|
||||
PG_STAT_GET_DBENTRY_INT64(xact_rollback)
|
||||
|
||||
/* pg_stat_get_db_conflict_logicalslot */
|
||||
PG_STAT_GET_DBENTRY_INT64(conflict_logicalslot)
|
||||
|
||||
Datum
|
||||
pg_stat_get_db_stat_reset_time(PG_FUNCTION_ARGS)
|
||||
@ -1104,6 +1106,7 @@ pg_stat_get_db_conflict_all(PG_FUNCTION_ARGS)
|
||||
result = (int64) (dbentry->conflict_tablespace +
|
||||
dbentry->conflict_lock +
|
||||
dbentry->conflict_snapshot +
|
||||
dbentry->conflict_logicalslot +
|
||||
dbentry->conflict_bufferpin +
|
||||
dbentry->conflict_startup_deadlock);
|
||||
|
||||
|
@ -57,6 +57,6 @@
|
||||
*/
|
||||
|
||||
/* yyyymmddN */
|
||||
#define CATALOG_VERSION_NO 202304073
|
||||
#define CATALOG_VERSION_NO 202304074
|
||||
|
||||
#endif
|
||||
|
@ -5611,6 +5611,11 @@
|
||||
proname => 'pg_stat_get_db_conflict_snapshot', provolatile => 's',
|
||||
proparallel => 'r', prorettype => 'int8', proargtypes => 'oid',
|
||||
prosrc => 'pg_stat_get_db_conflict_snapshot' },
|
||||
{ oid => '9901',
|
||||
descr => 'statistics: recovery conflicts in database caused by logical replication slot',
|
||||
proname => 'pg_stat_get_db_conflict_logicalslot', provolatile => 's',
|
||||
proparallel => 'r', prorettype => 'int8', proargtypes => 'oid',
|
||||
prosrc => 'pg_stat_get_db_conflict_logicalslot' },
|
||||
{ oid => '3068',
|
||||
descr => 'statistics: recovery conflicts in database caused by shared buffer pin',
|
||||
proname => 'pg_stat_get_db_conflict_bufferpin', provolatile => 's',
|
||||
|
@ -235,7 +235,7 @@ typedef struct PgStat_TableXactStatus
|
||||
* ------------------------------------------------------------
|
||||
*/
|
||||
|
||||
#define PGSTAT_FILE_FORMAT_ID 0x01A5BCAB
|
||||
#define PGSTAT_FILE_FORMAT_ID 0x01A5BCAC
|
||||
|
||||
typedef struct PgStat_ArchiverStats
|
||||
{
|
||||
@ -332,6 +332,7 @@ typedef struct PgStat_StatDBEntry
|
||||
PgStat_Counter conflict_tablespace;
|
||||
PgStat_Counter conflict_lock;
|
||||
PgStat_Counter conflict_snapshot;
|
||||
PgStat_Counter conflict_logicalslot;
|
||||
PgStat_Counter conflict_bufferpin;
|
||||
PgStat_Counter conflict_startup_deadlock;
|
||||
PgStat_Counter temp_files;
|
||||
|
@ -42,6 +42,7 @@ typedef enum
|
||||
PROCSIG_RECOVERY_CONFLICT_TABLESPACE,
|
||||
PROCSIG_RECOVERY_CONFLICT_LOCK,
|
||||
PROCSIG_RECOVERY_CONFLICT_SNAPSHOT,
|
||||
PROCSIG_RECOVERY_CONFLICT_LOGICALSLOT,
|
||||
PROCSIG_RECOVERY_CONFLICT_BUFFERPIN,
|
||||
PROCSIG_RECOVERY_CONFLICT_STARTUP_DEADLOCK,
|
||||
|
||||
|
@ -30,8 +30,10 @@ extern void InitRecoveryTransactionEnvironment(void);
|
||||
extern void ShutdownRecoveryTransactionEnvironment(void);
|
||||
|
||||
extern void ResolveRecoveryConflictWithSnapshot(TransactionId snapshotConflictHorizon,
|
||||
bool isCatalogRel,
|
||||
RelFileLocator locator);
|
||||
extern void ResolveRecoveryConflictWithSnapshotFullXid(FullTransactionId snapshotConflictHorizon,
|
||||
bool isCatalogRel,
|
||||
RelFileLocator locator);
|
||||
extern void ResolveRecoveryConflictWithTablespace(Oid tsid);
|
||||
extern void ResolveRecoveryConflictWithDatabase(Oid dbid);
|
||||
|
@ -1870,7 +1870,8 @@ pg_stat_database_conflicts| SELECT oid AS datid,
|
||||
pg_stat_get_db_conflict_lock(oid) AS confl_lock,
|
||||
pg_stat_get_db_conflict_snapshot(oid) AS confl_snapshot,
|
||||
pg_stat_get_db_conflict_bufferpin(oid) AS confl_bufferpin,
|
||||
pg_stat_get_db_conflict_startup_deadlock(oid) AS confl_deadlock
|
||||
pg_stat_get_db_conflict_startup_deadlock(oid) AS confl_deadlock,
|
||||
pg_stat_get_db_conflict_logicalslot(oid) AS confl_active_logicalslot
|
||||
FROM pg_database d;
|
||||
pg_stat_gssapi| SELECT pid,
|
||||
gss_auth AS gss_authenticated,
|
||||
|
Loading…
Reference in New Issue
Block a user