diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c index 0b25efafe2b..8ea846bfc3b 100644 --- a/src/backend/replication/logical/logical.c +++ b/src/backend/replication/logical/logical.c @@ -542,28 +542,9 @@ CreateDecodingContext(XLogRecPtr start_lsn, errdetail("This replication slot is being synchronized from the primary server."), errhint("Specify another replication slot.")); - /* - * Check if slot has been invalidated due to max_slot_wal_keep_size. Avoid - * "cannot get changes" wording in this errmsg because that'd be - * confusingly ambiguous about no changes being available when called from - * pg_logical_slot_get_changes_guts(). - */ - if (MyReplicationSlot->data.invalidated == RS_INVAL_WAL_REMOVED) - ereport(ERROR, - (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), - errmsg("can no longer get changes from replication slot \"%s\"", - NameStr(MyReplicationSlot->data.name)), - errdetail("This slot has been invalidated because it exceeded the maximum reserved size."))); - - if (MyReplicationSlot->data.invalidated != RS_INVAL_NONE) - ereport(ERROR, - (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), - errmsg("can no longer get changes from replication slot \"%s\"", - NameStr(MyReplicationSlot->data.name)), - errdetail("This slot has been invalidated because it was conflicting with recovery."))); - - Assert(MyReplicationSlot->data.invalidated == RS_INVAL_NONE); - Assert(MyReplicationSlot->data.restart_lsn != InvalidXLogRecPtr); + /* slot must be valid to allow decoding */ + Assert(slot->data.invalidated == RS_INVAL_NONE); + Assert(slot->data.restart_lsn != InvalidXLogRecPtr); if (start_lsn == InvalidXLogRecPtr) { diff --git a/src/backend/replication/logical/logicalfuncs.c b/src/backend/replication/logical/logicalfuncs.c index 0148ec36788..ca53caac2f2 100644 --- a/src/backend/replication/logical/logicalfuncs.c +++ b/src/backend/replication/logical/logicalfuncs.c @@ -197,7 +197,7 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin else end_of_wal = GetXLogReplayRecPtr(NULL); - ReplicationSlotAcquire(NameStr(*name), true); + ReplicationSlotAcquire(NameStr(*name), true, true); PG_TRY(); { diff --git a/src/backend/replication/logical/slotsync.c b/src/backend/replication/logical/slotsync.c index f6945af1d43..be6f87f00b2 100644 --- a/src/backend/replication/logical/slotsync.c +++ b/src/backend/replication/logical/slotsync.c @@ -446,7 +446,7 @@ drop_local_obsolete_slots(List *remote_slot_list) if (synced_slot) { - ReplicationSlotAcquire(NameStr(local_slot->data.name), true); + ReplicationSlotAcquire(NameStr(local_slot->data.name), true, false); ReplicationSlotDropAcquired(); } @@ -665,7 +665,7 @@ synchronize_one_slot(RemoteSlot *remote_slot, Oid remote_dbid) * pre-check to ensure that at least one of the slot properties is * changed before acquiring the slot. */ - ReplicationSlotAcquire(remote_slot->name, true); + ReplicationSlotAcquire(remote_slot->name, true, false); Assert(slot == MyReplicationSlot); diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c index b30e0473e1c..c57a13d8208 100644 --- a/src/backend/replication/slot.c +++ b/src/backend/replication/slot.c @@ -535,9 +535,13 @@ ReplicationSlotName(int index, Name name) * * An error is raised if nowait is true and the slot is currently in use. If * nowait is false, we sleep until the slot is released by the owning process. + * + * An error is raised if error_if_invalid is true and the slot is found to + * be invalid. It should always be set to true, except when we are temporarily + * acquiring the slot and don't intend to change it. */ void -ReplicationSlotAcquire(const char *name, bool nowait) +ReplicationSlotAcquire(const char *name, bool nowait, bool error_if_invalid) { ReplicationSlot *s; int active_pid; @@ -561,6 +565,19 @@ retry: name))); } + /* Invalid slots can't be modified or used before accessing the WAL. */ + if (error_if_invalid && s->data.invalidated != RS_INVAL_NONE) + { + LWLockRelease(ReplicationSlotControlLock); + + ereport(ERROR, + errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("can no longer access replication slot \"%s\"", + NameStr(s->data.name)), + errdetail("This replication slot has been invalidated due to \"%s\".", + SlotInvalidationCauses[s->data.invalidated])); + } + /* * This is the slot we want; check if it's active under some other * process. In single user mode, we don't need this check. @@ -785,7 +802,7 @@ ReplicationSlotDrop(const char *name, bool nowait) { Assert(MyReplicationSlot == NULL); - ReplicationSlotAcquire(name, nowait); + ReplicationSlotAcquire(name, nowait, false); /* * Do not allow users to drop the slots which are currently being synced @@ -812,7 +829,7 @@ ReplicationSlotAlter(const char *name, const bool *failover, Assert(MyReplicationSlot == NULL); Assert(failover || two_phase); - ReplicationSlotAcquire(name, false); + ReplicationSlotAcquire(name, false, true); if (SlotIsPhysical(MyReplicationSlot)) ereport(ERROR, @@ -820,13 +837,6 @@ ReplicationSlotAlter(const char *name, const bool *failover, errmsg("cannot use %s with a physical replication slot", "ALTER_REPLICATION_SLOT")); - if (MyReplicationSlot->data.invalidated != RS_INVAL_NONE) - ereport(ERROR, - errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), - errmsg("cannot alter invalid replication slot \"%s\"", name), - errdetail("This replication slot has been invalidated due to \"%s\".", - SlotInvalidationCauses[MyReplicationSlot->data.invalidated])); - if (RecoveryInProgress()) { /* diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c index 977146789fe..8be4b8c65b5 100644 --- a/src/backend/replication/slotfuncs.c +++ b/src/backend/replication/slotfuncs.c @@ -536,7 +536,7 @@ pg_replication_slot_advance(PG_FUNCTION_ARGS) moveto = Min(moveto, GetXLogReplayRecPtr(NULL)); /* Acquire the slot so we "own" it */ - ReplicationSlotAcquire(NameStr(*slotname), true); + ReplicationSlotAcquire(NameStr(*slotname), true, true); /* A slot whose restart_lsn has never been reserved cannot be advanced */ if (XLogRecPtrIsInvalid(MyReplicationSlot->data.restart_lsn)) diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index bac504b554e..446d10c1a7d 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -816,7 +816,7 @@ StartReplication(StartReplicationCmd *cmd) if (cmd->slotname) { - ReplicationSlotAcquire(cmd->slotname, true); + ReplicationSlotAcquire(cmd->slotname, true, true); if (SlotIsLogical(MyReplicationSlot)) ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), @@ -1434,7 +1434,7 @@ StartLogicalReplication(StartReplicationCmd *cmd) Assert(!MyReplicationSlot); - ReplicationSlotAcquire(cmd->slotname, true); + ReplicationSlotAcquire(cmd->slotname, true, true); /* * Force a disconnect, so that the decoding code doesn't need to care diff --git a/src/backend/utils/adt/pg_upgrade_support.c b/src/backend/utils/adt/pg_upgrade_support.c index 9a10907d05b..d44f8c262ba 100644 --- a/src/backend/utils/adt/pg_upgrade_support.c +++ b/src/backend/utils/adt/pg_upgrade_support.c @@ -298,7 +298,7 @@ binary_upgrade_logical_slot_has_caught_up(PG_FUNCTION_ARGS) slot_name = PG_GETARG_NAME(0); /* Acquire the given slot */ - ReplicationSlotAcquire(NameStr(*slot_name), true); + ReplicationSlotAcquire(NameStr(*slot_name), true, true); Assert(SlotIsLogical(MyReplicationSlot)); diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h index bf62b36ad07..47ebdaecb6a 100644 --- a/src/include/replication/slot.h +++ b/src/include/replication/slot.h @@ -253,7 +253,8 @@ extern void ReplicationSlotDropAcquired(void); extern void ReplicationSlotAlter(const char *name, const bool *failover, const bool *two_phase); -extern void ReplicationSlotAcquire(const char *name, bool nowait); +extern void ReplicationSlotAcquire(const char *name, bool nowait, + bool error_if_invalid); extern void ReplicationSlotRelease(void); extern void ReplicationSlotCleanup(bool synced_only); extern void ReplicationSlotSave(void); diff --git a/src/test/recovery/t/019_replslot_limit.pl b/src/test/recovery/t/019_replslot_limit.pl index ae2ad5c933a..6468784b83d 100644 --- a/src/test/recovery/t/019_replslot_limit.pl +++ b/src/test/recovery/t/019_replslot_limit.pl @@ -234,7 +234,7 @@ my $failed = 0; for (my $i = 0; $i < 10 * $PostgreSQL::Test::Utils::timeout_default; $i++) { if ($node_standby->log_contains( - "requested WAL segment [0-9A-F]+ has already been removed", + "This replication slot has been invalidated due to \"wal_removed\".", $logstart)) { $failed = 1; diff --git a/src/test/recovery/t/035_standby_logical_decoding.pl b/src/test/recovery/t/035_standby_logical_decoding.pl index 7e794c5bea3..505e85d1eb6 100644 --- a/src/test/recovery/t/035_standby_logical_decoding.pl +++ b/src/test/recovery/t/035_standby_logical_decoding.pl @@ -533,7 +533,7 @@ check_slots_conflict_reason('vacuum_full_', 'rows_removed'); qq[ALTER_REPLICATION_SLOT vacuum_full_inactiveslot (failover);], replication => 'database'); ok( $stderr =~ - /ERROR: cannot alter invalid replication slot "vacuum_full_inactiveslot"/ + /ERROR: can no longer access replication slot "vacuum_full_inactiveslot"/ && $stderr =~ /DETAIL: This replication slot has been invalidated due to "rows_removed"./, "invalidated slot cannot be altered"); @@ -551,8 +551,7 @@ $handle = # We are not able to read from the slot as it has been invalidated check_pg_recvlogical_stderr($handle, - "can no longer get changes from replication slot \"vacuum_full_activeslot\"" -); + "can no longer access replication slot \"vacuum_full_activeslot\""); # Turn hot_standby_feedback back on change_hot_standby_feedback_and_wait_for_xmins(1, 1); @@ -632,8 +631,7 @@ $handle = # We are not able to read from the slot as it has been invalidated check_pg_recvlogical_stderr($handle, - "can no longer get changes from replication slot \"row_removal_activeslot\"" -); + "can no longer access replication slot \"row_removal_activeslot\""); ################################################## # Recovery conflict: Same as Scenario 2 but on a shared catalog table @@ -668,7 +666,7 @@ $handle = make_slot_active($node_standby, 'shared_row_removal_', 0, \$stdout, # We are not able to read from the slot as it has been invalidated check_pg_recvlogical_stderr($handle, - "can no longer get changes from replication slot \"shared_row_removal_activeslot\"" + "can no longer access replication slot \"shared_row_removal_activeslot\"" ); ################################################## @@ -759,7 +757,7 @@ $handle = make_slot_active($node_standby, 'pruning_', 0, \$stdout, \$stderr); # We are not able to read from the slot as it has been invalidated check_pg_recvlogical_stderr($handle, - "can no longer get changes from replication slot \"pruning_activeslot\""); + "can no longer access replication slot \"pruning_activeslot\""); # Turn hot_standby_feedback back on change_hot_standby_feedback_and_wait_for_xmins(1, 1); @@ -818,8 +816,7 @@ $handle = make_slot_active($node_standby, 'wal_level_', 0, \$stdout, \$stderr); # as the slot has been invalidated we should not be able to read check_pg_recvlogical_stderr($handle, - "can no longer get changes from replication slot \"wal_level_activeslot\"" -); + "can no longer access replication slot \"wal_level_activeslot\""); ################################################## # DROP DATABASE should drop its slots, including active slots.