mirror of
https://git.postgresql.org/git/postgresql.git
synced 2025-02-23 19:39:53 +08:00
Don't call palloc() while holding a spinlock, either.
Fix some more violations of the "only straight-line code inside a spinlock" rule. These are hazardous not only because they risk holding the lock for an excessively long time, but because it's possible for palloc to throw elog(ERROR), leaving a stuck spinlock behind. copy_replication_slot() had two separate places that did pallocs while holding a spinlock. We can make the code simpler and safer by copying the whole ReplicationSlot struct into a local variable while holding the spinlock, and then referencing that copy. (While that's arguably more cycles than we really need to spend holding the lock, the struct isn't all that big, and this way seems far more maintainable than copying fields piecemeal. Anyway this is surely much cheaper than a palloc.) That bug goes back to v12. InvalidateObsoleteReplicationSlots() not only did a palloc while holding a spinlock, but for extra sloppiness then leaked the memory --- probably for the lifetime of the checkpointer process, though I didn't try to verify that. Fortunately that silliness is new in HEAD. pg_get_replication_slots() had a cosmetic violation of the rule, in that it only assumed it's safe to call namecpy() while holding a spinlock. Still, that's a hazard waiting to bite somebody, and there were some other cosmetic coding-rule violations in the same function, so clean it up. I back-patched this as far as v10; the code exists before that but it looks different, and this didn't seem important enough to adapt the patch further back. Discussion: https://postgr.es/m/20200602.161518.1399689010416646074.horikyota.ntt@gmail.com
This commit is contained in:
parent
4d685f6d7b
commit
f88bd3139f
@ -1099,7 +1099,7 @@ restart:
|
||||
{
|
||||
ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
|
||||
XLogRecPtr restart_lsn = InvalidXLogRecPtr;
|
||||
char *slotname;
|
||||
NameData slotname;
|
||||
|
||||
if (!s->in_use)
|
||||
continue;
|
||||
@ -1112,7 +1112,7 @@ restart:
|
||||
continue;
|
||||
}
|
||||
|
||||
slotname = pstrdup(NameStr(s->data.name));
|
||||
slotname = s->data.name;
|
||||
restart_lsn = s->data.restart_lsn;
|
||||
|
||||
SpinLockRelease(&s->mutex);
|
||||
@ -1120,7 +1120,8 @@ restart:
|
||||
|
||||
for (;;)
|
||||
{
|
||||
int wspid = ReplicationSlotAcquire(slotname, SAB_Inquire);
|
||||
int wspid = ReplicationSlotAcquire(NameStr(slotname),
|
||||
SAB_Inquire);
|
||||
|
||||
/* no walsender? success! */
|
||||
if (wspid == 0)
|
||||
@ -1128,7 +1129,7 @@ restart:
|
||||
|
||||
ereport(LOG,
|
||||
(errmsg("terminating walsender %d because replication slot \"%s\" is too far behind",
|
||||
wspid, slotname)));
|
||||
wspid, NameStr(slotname))));
|
||||
(void) kill(wspid, SIGTERM);
|
||||
|
||||
ConditionVariableTimedSleep(&s->active_cv, 10,
|
||||
@ -1138,7 +1139,7 @@ restart:
|
||||
|
||||
ereport(LOG,
|
||||
(errmsg("invalidating slot \"%s\" because its restart_lsn %X/%X exceeds max_slot_wal_keep_size",
|
||||
slotname,
|
||||
NameStr(slotname),
|
||||
(uint32) (restart_lsn >> 32),
|
||||
(uint32) restart_lsn)));
|
||||
|
||||
|
@ -278,18 +278,9 @@ pg_get_replication_slots(PG_FUNCTION_ARGS)
|
||||
for (slotno = 0; slotno < max_replication_slots; slotno++)
|
||||
{
|
||||
ReplicationSlot *slot = &ReplicationSlotCtl->replication_slots[slotno];
|
||||
ReplicationSlot slot_contents;
|
||||
Datum values[PG_GET_REPLICATION_SLOTS_COLS];
|
||||
bool nulls[PG_GET_REPLICATION_SLOTS_COLS];
|
||||
|
||||
ReplicationSlotPersistency persistency;
|
||||
TransactionId xmin;
|
||||
TransactionId catalog_xmin;
|
||||
XLogRecPtr restart_lsn;
|
||||
XLogRecPtr confirmed_flush_lsn;
|
||||
pid_t active_pid;
|
||||
Oid database;
|
||||
NameData slot_name;
|
||||
NameData plugin;
|
||||
WALAvailability walstate;
|
||||
XLogSegNo last_removed_seg;
|
||||
int i;
|
||||
@ -297,69 +288,61 @@ pg_get_replication_slots(PG_FUNCTION_ARGS)
|
||||
if (!slot->in_use)
|
||||
continue;
|
||||
|
||||
/* Copy slot contents while holding spinlock, then examine at leisure */
|
||||
SpinLockAcquire(&slot->mutex);
|
||||
|
||||
xmin = slot->data.xmin;
|
||||
catalog_xmin = slot->data.catalog_xmin;
|
||||
database = slot->data.database;
|
||||
restart_lsn = slot->data.restart_lsn;
|
||||
confirmed_flush_lsn = slot->data.confirmed_flush;
|
||||
namecpy(&slot_name, &slot->data.name);
|
||||
namecpy(&plugin, &slot->data.plugin);
|
||||
active_pid = slot->active_pid;
|
||||
persistency = slot->data.persistency;
|
||||
|
||||
slot_contents = *slot;
|
||||
SpinLockRelease(&slot->mutex);
|
||||
|
||||
memset(values, 0, sizeof(values));
|
||||
memset(nulls, 0, sizeof(nulls));
|
||||
|
||||
i = 0;
|
||||
values[i++] = NameGetDatum(&slot_name);
|
||||
values[i++] = NameGetDatum(&slot_contents.data.name);
|
||||
|
||||
if (database == InvalidOid)
|
||||
if (slot_contents.data.database == InvalidOid)
|
||||
nulls[i++] = true;
|
||||
else
|
||||
values[i++] = NameGetDatum(&plugin);
|
||||
values[i++] = NameGetDatum(&slot_contents.data.plugin);
|
||||
|
||||
if (database == InvalidOid)
|
||||
if (slot_contents.data.database == InvalidOid)
|
||||
values[i++] = CStringGetTextDatum("physical");
|
||||
else
|
||||
values[i++] = CStringGetTextDatum("logical");
|
||||
|
||||
if (database == InvalidOid)
|
||||
if (slot_contents.data.database == InvalidOid)
|
||||
nulls[i++] = true;
|
||||
else
|
||||
values[i++] = database;
|
||||
values[i++] = ObjectIdGetDatum(slot_contents.data.database);
|
||||
|
||||
values[i++] = BoolGetDatum(persistency == RS_TEMPORARY);
|
||||
values[i++] = BoolGetDatum(active_pid != 0);
|
||||
values[i++] = BoolGetDatum(slot_contents.data.persistency == RS_TEMPORARY);
|
||||
values[i++] = BoolGetDatum(slot_contents.active_pid != 0);
|
||||
|
||||
if (active_pid != 0)
|
||||
values[i++] = Int32GetDatum(active_pid);
|
||||
if (slot_contents.active_pid != 0)
|
||||
values[i++] = Int32GetDatum(slot_contents.active_pid);
|
||||
else
|
||||
nulls[i++] = true;
|
||||
|
||||
if (xmin != InvalidTransactionId)
|
||||
values[i++] = TransactionIdGetDatum(xmin);
|
||||
if (slot_contents.data.xmin != InvalidTransactionId)
|
||||
values[i++] = TransactionIdGetDatum(slot_contents.data.xmin);
|
||||
else
|
||||
nulls[i++] = true;
|
||||
|
||||
if (catalog_xmin != InvalidTransactionId)
|
||||
values[i++] = TransactionIdGetDatum(catalog_xmin);
|
||||
if (slot_contents.data.catalog_xmin != InvalidTransactionId)
|
||||
values[i++] = TransactionIdGetDatum(slot_contents.data.catalog_xmin);
|
||||
else
|
||||
nulls[i++] = true;
|
||||
|
||||
if (restart_lsn != InvalidXLogRecPtr)
|
||||
values[i++] = LSNGetDatum(restart_lsn);
|
||||
if (slot_contents.data.restart_lsn != InvalidXLogRecPtr)
|
||||
values[i++] = LSNGetDatum(slot_contents.data.restart_lsn);
|
||||
else
|
||||
nulls[i++] = true;
|
||||
|
||||
if (confirmed_flush_lsn != InvalidXLogRecPtr)
|
||||
values[i++] = LSNGetDatum(confirmed_flush_lsn);
|
||||
if (slot_contents.data.confirmed_flush != InvalidXLogRecPtr)
|
||||
values[i++] = LSNGetDatum(slot_contents.data.confirmed_flush);
|
||||
else
|
||||
nulls[i++] = true;
|
||||
|
||||
walstate = GetWALAvailability(restart_lsn);
|
||||
walstate = GetWALAvailability(slot_contents.data.restart_lsn);
|
||||
|
||||
switch (walstate)
|
||||
{
|
||||
@ -378,6 +361,9 @@ pg_get_replication_slots(PG_FUNCTION_ARGS)
|
||||
case WALAVAIL_REMOVED:
|
||||
values[i++] = CStringGetTextDatum("lost");
|
||||
break;
|
||||
|
||||
default:
|
||||
elog(ERROR, "invalid walstate: %d", (int) walstate);
|
||||
}
|
||||
|
||||
if (max_slot_wal_keep_size_mb >= 0 &&
|
||||
@ -393,8 +379,11 @@ pg_get_replication_slots(PG_FUNCTION_ARGS)
|
||||
else
|
||||
nulls[i++] = true;
|
||||
|
||||
Assert(i == PG_GET_REPLICATION_SLOTS_COLS);
|
||||
|
||||
tuplestore_putvalues(tupstore, tupdesc, values, nulls);
|
||||
}
|
||||
|
||||
LWLockRelease(ReplicationSlotControlLock);
|
||||
|
||||
tuplestore_donestoring(tupstore);
|
||||
@ -653,6 +642,8 @@ copy_replication_slot(FunctionCallInfo fcinfo, bool logical_slot)
|
||||
Name src_name = PG_GETARG_NAME(0);
|
||||
Name dst_name = PG_GETARG_NAME(1);
|
||||
ReplicationSlot *src = NULL;
|
||||
ReplicationSlot first_slot_contents;
|
||||
ReplicationSlot second_slot_contents;
|
||||
XLogRecPtr src_restart_lsn;
|
||||
bool src_islogical;
|
||||
bool temporary;
|
||||
@ -692,13 +683,10 @@ copy_replication_slot(FunctionCallInfo fcinfo, bool logical_slot)
|
||||
|
||||
if (s->in_use && strcmp(NameStr(s->data.name), NameStr(*src_name)) == 0)
|
||||
{
|
||||
/* Copy the slot contents while holding spinlock */
|
||||
SpinLockAcquire(&s->mutex);
|
||||
src_islogical = SlotIsLogical(s);
|
||||
src_restart_lsn = s->data.restart_lsn;
|
||||
temporary = s->data.persistency == RS_TEMPORARY;
|
||||
plugin = logical_slot ? pstrdup(NameStr(s->data.plugin)) : NULL;
|
||||
first_slot_contents = *s;
|
||||
SpinLockRelease(&s->mutex);
|
||||
|
||||
src = s;
|
||||
break;
|
||||
}
|
||||
@ -711,6 +699,11 @@ copy_replication_slot(FunctionCallInfo fcinfo, bool logical_slot)
|
||||
(errcode(ERRCODE_UNDEFINED_OBJECT),
|
||||
errmsg("replication slot \"%s\" does not exist", NameStr(*src_name))));
|
||||
|
||||
src_islogical = SlotIsLogical(&first_slot_contents);
|
||||
src_restart_lsn = first_slot_contents.data.restart_lsn;
|
||||
temporary = (first_slot_contents.data.persistency == RS_TEMPORARY);
|
||||
plugin = logical_slot ? NameStr(first_slot_contents.data.plugin) : NULL;
|
||||
|
||||
/* Check type of replication slot */
|
||||
if (src_islogical != logical_slot)
|
||||
ereport(ERROR,
|
||||
@ -775,18 +768,20 @@ copy_replication_slot(FunctionCallInfo fcinfo, bool logical_slot)
|
||||
|
||||
/* Copy data of source slot again */
|
||||
SpinLockAcquire(&src->mutex);
|
||||
copy_effective_xmin = src->effective_xmin;
|
||||
copy_effective_catalog_xmin = src->effective_catalog_xmin;
|
||||
second_slot_contents = *src;
|
||||
SpinLockRelease(&src->mutex);
|
||||
|
||||
copy_xmin = src->data.xmin;
|
||||
copy_catalog_xmin = src->data.catalog_xmin;
|
||||
copy_restart_lsn = src->data.restart_lsn;
|
||||
copy_confirmed_flush = src->data.confirmed_flush;
|
||||
copy_effective_xmin = second_slot_contents.effective_xmin;
|
||||
copy_effective_catalog_xmin = second_slot_contents.effective_catalog_xmin;
|
||||
|
||||
copy_xmin = second_slot_contents.data.xmin;
|
||||
copy_catalog_xmin = second_slot_contents.data.catalog_xmin;
|
||||
copy_restart_lsn = second_slot_contents.data.restart_lsn;
|
||||
copy_confirmed_flush = second_slot_contents.data.confirmed_flush;
|
||||
|
||||
/* for existence check */
|
||||
copy_name = pstrdup(NameStr(src->data.name));
|
||||
copy_islogical = SlotIsLogical(src);
|
||||
SpinLockRelease(&src->mutex);
|
||||
copy_name = NameStr(second_slot_contents.data.name);
|
||||
copy_islogical = SlotIsLogical(&second_slot_contents);
|
||||
|
||||
/*
|
||||
* Check if the source slot still exists and is valid. We regard it as
|
||||
|
@ -158,8 +158,8 @@ typedef struct ReplicationSlot
|
||||
XLogRecPtr candidate_restart_lsn;
|
||||
} ReplicationSlot;
|
||||
|
||||
#define SlotIsPhysical(slot) (slot->data.database == InvalidOid)
|
||||
#define SlotIsLogical(slot) (slot->data.database != InvalidOid)
|
||||
#define SlotIsPhysical(slot) ((slot)->data.database == InvalidOid)
|
||||
#define SlotIsLogical(slot) ((slot)->data.database != InvalidOid)
|
||||
|
||||
/*
|
||||
* Shared memory control area for all of replication slots.
|
||||
|
Loading…
Reference in New Issue
Block a user