mirror of
https://git.postgresql.org/git/postgresql.git
synced 2025-02-17 19:30:00 +08:00
Allow pg_create_physical_replication_slot() to reserve WAL.
When creating a physical slot it's often useful to immediately reserve the current WAL position instead of only doing after the first feedback message arrives. That e.g. allows slots to guarantee that all the WAL for a base backup will be available afterwards. Logical slots already have to reserve WAL during creation, so generalize that logic into being usable for both physical and logical slots. Catversion bump because of the new parameter. Author: Gurjeet Singh Reviewed-By: Andres Freund Discussion: CABwTF4Wh_dBCzTU=49pFXR6coR4NW1ynb+vBqT+Po=7fuq5iCw@mail.gmail.com
This commit is contained in:
parent
093d0c83c1
commit
6fcd88511f
@ -17211,7 +17211,7 @@ postgres=# SELECT * FROM pg_xlogfile_name_offset(pg_stop_backup());
|
||||
<indexterm>
|
||||
<primary>pg_create_physical_replication_slot</primary>
|
||||
</indexterm>
|
||||
<literal><function>pg_create_physical_replication_slot(<parameter>slot_name</parameter> <type>name</type>)</function></literal>
|
||||
<literal><function>pg_create_physical_replication_slot(<parameter>slot_name</parameter> <type>name</type><optional>, <parameter>immediately_reserve</> <type>boolean</> </optional>)</function></literal>
|
||||
</entry>
|
||||
<entry>
|
||||
(<parameter>slot_name</parameter> <type>name</type>, <parameter>xlog_position</parameter> <type>pg_lsn</type>)
|
||||
@ -17221,7 +17221,11 @@ postgres=# SELECT * FROM pg_xlogfile_name_offset(pg_stop_backup());
|
||||
<parameter>slot_name</parameter>. Streaming changes from a physical slot
|
||||
is only possible with the streaming-replication protocol - see <xref
|
||||
linkend="protocol-replication">. Corresponds to the replication protocol
|
||||
command <literal>CREATE_REPLICATION_SLOT ... PHYSICAL</literal>.
|
||||
command <literal>CREATE_REPLICATION_SLOT ... PHYSICAL</literal>. The optional
|
||||
second parameter, when <literal>true</>, specifies that the <acronym>LSN</>
|
||||
for this replication slot be reserved immediately; the <acronym<LSN</>
|
||||
is otherwise reserved on first connection from a streaming replication
|
||||
client.
|
||||
</entry>
|
||||
</row>
|
||||
<row>
|
||||
|
@ -917,6 +917,13 @@ LANGUAGE INTERNAL
|
||||
VOLATILE ROWS 1000 COST 1000
|
||||
AS 'pg_logical_slot_peek_binary_changes';
|
||||
|
||||
CREATE OR REPLACE FUNCTION pg_create_physical_replication_slot(
|
||||
IN slot_name name, IN immediately_reserve boolean DEFAULT false,
|
||||
OUT slot_name name, OUT xlog_position pg_lsn)
|
||||
RETURNS RECORD
|
||||
LANGUAGE INTERNAL
|
||||
AS 'pg_create_physical_replication_slot';
|
||||
|
||||
CREATE OR REPLACE FUNCTION
|
||||
make_interval(years int4 DEFAULT 0, months int4 DEFAULT 0, weeks int4 DEFAULT 0,
|
||||
days int4 DEFAULT 0, hours int4 DEFAULT 0, mins int4 DEFAULT 0,
|
||||
|
@ -250,52 +250,7 @@ CreateInitDecodingContext(char *plugin,
|
||||
StrNCpy(NameStr(slot->data.plugin), plugin, NAMEDATALEN);
|
||||
SpinLockRelease(&slot->mutex);
|
||||
|
||||
/*
|
||||
* The replication slot mechanism is used to prevent removal of required
|
||||
* WAL. As there is no interlock between this and checkpoints required WAL
|
||||
* could be removed before ReplicationSlotsComputeRequiredLSN() has been
|
||||
* called to prevent that. In the very unlikely case that this happens
|
||||
* we'll just retry.
|
||||
*/
|
||||
while (true)
|
||||
{
|
||||
XLogSegNo segno;
|
||||
|
||||
/*
|
||||
* Let's start with enough information if we can, so log a standby
|
||||
* snapshot and start decoding at exactly that position.
|
||||
*/
|
||||
if (!RecoveryInProgress())
|
||||
{
|
||||
XLogRecPtr flushptr;
|
||||
|
||||
/* start at current insert position */
|
||||
slot->data.restart_lsn = GetXLogInsertRecPtr();
|
||||
|
||||
/* make sure we have enough information to start */
|
||||
flushptr = LogStandbySnapshot();
|
||||
|
||||
/* and make sure it's fsynced to disk */
|
||||
XLogFlush(flushptr);
|
||||
}
|
||||
else
|
||||
slot->data.restart_lsn = GetRedoRecPtr();
|
||||
|
||||
/* prevent WAL removal as fast as possible */
|
||||
ReplicationSlotsComputeRequiredLSN();
|
||||
|
||||
/*
|
||||
* If all required WAL is still there, great, otherwise retry. The
|
||||
* slot should prevent further removal of WAL, unless there's a
|
||||
* concurrent ReplicationSlotsComputeRequiredLSN() after we've written
|
||||
* the new restart_lsn above, so normally we should never need to loop
|
||||
* more than twice.
|
||||
*/
|
||||
XLByteToSeg(slot->data.restart_lsn, segno);
|
||||
if (XLogGetLastRemovedSegno() < segno)
|
||||
break;
|
||||
}
|
||||
|
||||
ReplicationSlotReserveWal();
|
||||
|
||||
/* ----
|
||||
* This is a bit tricky: We need to determine a safe xmin horizon to start
|
||||
|
@ -40,6 +40,7 @@
|
||||
#include <sys/stat.h>
|
||||
|
||||
#include "access/transam.h"
|
||||
#include "access/xlog_internal.h"
|
||||
#include "common/string.h"
|
||||
#include "miscadmin.h"
|
||||
#include "replication/slot.h"
|
||||
@ -781,6 +782,76 @@ CheckSlotRequirements(void)
|
||||
errmsg("replication slots can only be used if wal_level >= archive")));
|
||||
}
|
||||
|
||||
/*
|
||||
* Reserve WAL for the currently active slot.
|
||||
*
|
||||
* Compute and set restart_lsn in a manner that's appropriate for the type of
|
||||
* the slot and concurrency safe.
|
||||
*/
|
||||
void
|
||||
ReplicationSlotReserveWal(void)
|
||||
{
|
||||
ReplicationSlot *slot = MyReplicationSlot;
|
||||
|
||||
Assert(slot != NULL);
|
||||
Assert(slot->data.restart_lsn == InvalidXLogRecPtr);
|
||||
|
||||
/*
|
||||
* The replication slot mechanism is used to prevent removal of required
|
||||
* WAL. As there is no interlock between this routine and checkpoints, WAL
|
||||
* segments could concurrently be removed when a now stale return value of
|
||||
* ReplicationSlotsComputeRequiredLSN() is used. In the unlikely case that
|
||||
* this happens we'll just retry.
|
||||
*/
|
||||
while (true)
|
||||
{
|
||||
XLogSegNo segno;
|
||||
|
||||
/*
|
||||
* For logical slots log a standby snapshot and start logical decoding
|
||||
* at exactly that position. That allows the slot to start up more
|
||||
* quickly.
|
||||
*
|
||||
* That's not needed (or indeed helpful) for physical slots as they'll
|
||||
* start replay at the last logged checkpoint anyway. Instead return
|
||||
* the location of the last redo LSN. While that slightly increases
|
||||
* the chance that we have to retry, it's where a base backup has to
|
||||
* start replay at.
|
||||
*/
|
||||
if (!RecoveryInProgress() && SlotIsLogical(slot))
|
||||
{
|
||||
XLogRecPtr flushptr;
|
||||
|
||||
/* start at current insert position */
|
||||
slot->data.restart_lsn = GetXLogInsertRecPtr();
|
||||
|
||||
/* make sure we have enough information to start */
|
||||
flushptr = LogStandbySnapshot();
|
||||
|
||||
/* and make sure it's fsynced to disk */
|
||||
XLogFlush(flushptr);
|
||||
}
|
||||
else
|
||||
{
|
||||
slot->data.restart_lsn = GetRedoRecPtr();
|
||||
}
|
||||
|
||||
/* prevent WAL removal as fast as possible */
|
||||
ReplicationSlotsComputeRequiredLSN();
|
||||
|
||||
/*
|
||||
* If all required WAL is still there, great, otherwise retry. The
|
||||
* slot should prevent further removal of WAL, unless there's a
|
||||
* concurrent ReplicationSlotsComputeRequiredLSN() after we've written
|
||||
* the new restart_lsn above, so normally we should never need to loop
|
||||
* more than twice.
|
||||
*/
|
||||
XLByteToSeg(slot->data.restart_lsn, segno);
|
||||
if (XLogGetLastRemovedSegno() < segno)
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* Flush all replication slots to disk.
|
||||
*
|
||||
|
@ -40,6 +40,7 @@ Datum
|
||||
pg_create_physical_replication_slot(PG_FUNCTION_ARGS)
|
||||
{
|
||||
Name name = PG_GETARG_NAME(0);
|
||||
bool immediately_reserve = PG_GETARG_BOOL(1);
|
||||
Datum values[2];
|
||||
bool nulls[2];
|
||||
TupleDesc tupdesc;
|
||||
@ -59,9 +60,25 @@ pg_create_physical_replication_slot(PG_FUNCTION_ARGS)
|
||||
ReplicationSlotCreate(NameStr(*name), false, RS_PERSISTENT);
|
||||
|
||||
values[0] = NameGetDatum(&MyReplicationSlot->data.name);
|
||||
|
||||
nulls[0] = false;
|
||||
nulls[1] = true;
|
||||
|
||||
if (immediately_reserve)
|
||||
{
|
||||
/* Reserve WAL as the user asked for it */
|
||||
ReplicationSlotReserveWal();
|
||||
|
||||
/* Write this slot to disk */
|
||||
ReplicationSlotMarkDirty();
|
||||
ReplicationSlotSave();
|
||||
|
||||
values[1] = LSNGetDatum(MyReplicationSlot->data.restart_lsn);
|
||||
nulls[1] = false;
|
||||
}
|
||||
else
|
||||
{
|
||||
values[0] = NameGetDatum(&MyReplicationSlot->data.name);
|
||||
nulls[1] = true;
|
||||
}
|
||||
|
||||
tuple = heap_form_tuple(tupdesc, values, nulls);
|
||||
result = HeapTupleGetDatum(tuple);
|
||||
|
@ -53,6 +53,6 @@
|
||||
*/
|
||||
|
||||
/* yyyymmddN */
|
||||
#define CATALOG_VERSION_NO 201508101
|
||||
#define CATALOG_VERSION_NO 201508111
|
||||
|
||||
#endif
|
||||
|
@ -5193,7 +5193,7 @@ DATA(insert OID = 3473 ( spg_range_quad_leaf_consistent PGNSP PGUID 12 1 0 0 0
|
||||
DESCR("SP-GiST support for quad tree over range");
|
||||
|
||||
/* replication slots */
|
||||
DATA(insert OID = 3779 ( pg_create_physical_replication_slot PGNSP PGUID 12 1 0 0 0 f f f f f f v 1 0 2249 "19" "{19,19,3220}" "{i,o,o}" "{slot_name,slot_name,xlog_position}" _null_ _null_ pg_create_physical_replication_slot _null_ _null_ _null_ ));
|
||||
DATA(insert OID = 3779 ( pg_create_physical_replication_slot PGNSP PGUID 12 1 0 0 0 f f f f f f v 2 0 2249 "19 16" "{19,16,19,3220}" "{i,i,o,o}" "{slot_name,immediately_reserve,slot_name,xlog_position}" _null_ _null_ pg_create_physical_replication_slot _null_ _null_ _null_ ));
|
||||
DESCR("create a physical replication slot");
|
||||
DATA(insert OID = 3780 ( pg_drop_replication_slot PGNSP PGUID 12 1 0 0 0 f f f f f f v 1 0 2278 "19" _null_ _null_ _null_ _null_ _null_ pg_drop_replication_slot _null_ _null_ _null_ ));
|
||||
DESCR("drop a replication slot");
|
||||
|
@ -166,6 +166,7 @@ extern void ReplicationSlotMarkDirty(void);
|
||||
|
||||
/* misc stuff */
|
||||
extern bool ReplicationSlotValidateName(const char *name, int elevel);
|
||||
extern void ReplicationSlotReserveWal(void);
|
||||
extern void ReplicationSlotsComputeRequiredXmin(bool already_locked);
|
||||
extern void ReplicationSlotsComputeRequiredLSN(void);
|
||||
extern XLogRecPtr ReplicationSlotsComputeLogicalRestartLSN(void);
|
||||
|
Loading…
Reference in New Issue
Block a user