mirror of
https://git.postgresql.org/git/postgresql.git
synced 2024-12-21 08:29:39 +08:00
Fix WAL replay in presence of an incomplete record
Physical replication always ships WAL segment files to replicas once
they are complete. This is a problem if one WAL record is split across
a segment boundary and the primary server crashes before writing down
the segment with the next portion of the WAL record: WAL writing after
crash recovery would happily resume at the point where the broken record
started, overwriting that record ... but any standby or backup may have
already received a copy of that segment, and they are not rewinding.
This causes standbys to stop following the primary after the latter
crashes:
LOG: invalid contrecord length 7262 at A8/D9FFFBC8
because the standby is still trying to read the continuation record
(contrecord) for the original long WAL record, but it is not there and
it will never be. A workaround is to stop the replica, delete the WAL
file, and restart it -- at which point a fresh copy is brought over from
the primary. But that's pretty labor intensive, and I bet many users
would just give up and re-clone the standby instead.
A fix for this problem was already attempted in commit 515e3d84a0
, but
it only addressed the case for the scenario of WAL archiving, so
streaming replication would still be a problem (as well as other things
such as taking a filesystem-level backup while the server is down after
having crashed), and it had performance scalability problems too; so it
had to be reverted.
This commit fixes the problem using an approach suggested by Andres
Freund, whereby the initial portion(s) of the split-up WAL record are
kept, and a special type of WAL record is written where the contrecord
was lost, so that WAL replay in the replica knows to skip the broken
parts. With this approach, we can continue to stream/archive segment
files as soon as they are complete, and replay of the broken records
will proceed across the crash point without a hitch.
Because a new type of WAL record is added, users should be careful to
upgrade standbys first, primaries later. Otherwise they risk the standby
being unable to start if the primary happens to write such a record.
A new TAP test that exercises this is added, but the portability of it
is yet to be seen.
This has been wrong since the introduction of physical replication, so
backpatch all the way back. In stable branches, keep the new
XLogReaderState members at the end of the struct, to avoid an ABI
break.
Author: Álvaro Herrera <alvherre@alvh.no-ip.org>
Reviewed-by: Kyotaro Horiguchi <horikyota.ntt@gmail.com>
Reviewed-by: Nathan Bossart <bossartn@amazon.com>
Discussion: https://postgr.es/m/202108232252.dh7uxf6oxwcy@alvherre.pgsql
This commit is contained in:
parent
2acb7cc6b5
commit
ff9f111bce
@ -139,6 +139,15 @@ xlog_desc(StringInfo buf, XLogReaderState *record)
|
||||
xlrec.ThisTimeLineID, xlrec.PrevTimeLineID,
|
||||
timestamptz_to_str(xlrec.end_time));
|
||||
}
|
||||
else if (info == XLOG_OVERWRITE_CONTRECORD)
|
||||
{
|
||||
xl_overwrite_contrecord xlrec;
|
||||
|
||||
memcpy(&xlrec, rec, sizeof(xl_overwrite_contrecord));
|
||||
appendStringInfo(buf, "lsn %X/%X; time %s",
|
||||
LSN_FORMAT_ARGS(xlrec.overwritten_lsn),
|
||||
timestamptz_to_str(xlrec.overwrite_time));
|
||||
}
|
||||
}
|
||||
|
||||
const char *
|
||||
@ -178,6 +187,9 @@ xlog_identify(uint8 info)
|
||||
case XLOG_END_OF_RECOVERY:
|
||||
id = "END_OF_RECOVERY";
|
||||
break;
|
||||
case XLOG_OVERWRITE_CONTRECORD:
|
||||
id = "OVERWRITE_CONTRECORD";
|
||||
break;
|
||||
case XLOG_FPI:
|
||||
id = "FPI";
|
||||
break;
|
||||
|
@ -199,6 +199,15 @@ static XLogRecPtr LastRec;
|
||||
static XLogRecPtr flushedUpto = 0;
|
||||
static TimeLineID receiveTLI = 0;
|
||||
|
||||
/*
|
||||
* abortedRecPtr is the start pointer of a broken record at end of WAL when
|
||||
* recovery completes; missingContrecPtr is the location of the first
|
||||
* contrecord that went missing. See CreateOverwriteContrecordRecord for
|
||||
* details.
|
||||
*/
|
||||
static XLogRecPtr abortedRecPtr;
|
||||
static XLogRecPtr missingContrecPtr;
|
||||
|
||||
/*
|
||||
* During recovery, lastFullPageWrites keeps track of full_page_writes that
|
||||
* the replayed WAL records indicate. It's initialized with full_page_writes
|
||||
@ -892,8 +901,11 @@ static void CheckRequiredParameterValues(void);
|
||||
static void XLogReportParameters(void);
|
||||
static void checkTimeLineSwitch(XLogRecPtr lsn, TimeLineID newTLI,
|
||||
TimeLineID prevTLI);
|
||||
static void VerifyOverwriteContrecord(xl_overwrite_contrecord *xlrec,
|
||||
XLogReaderState *state);
|
||||
static void LocalSetXLogInsertAllowed(void);
|
||||
static void CreateEndOfRecoveryRecord(void);
|
||||
static XLogRecPtr CreateOverwriteContrecordRecord(XLogRecPtr aborted_lsn);
|
||||
static void CheckPointGuts(XLogRecPtr checkPointRedo, int flags);
|
||||
static void KeepLogSeg(XLogRecPtr recptr, XLogSegNo *logSegNo);
|
||||
static XLogRecPtr XLogGetReplicationSlotMinimumLSN(void);
|
||||
@ -2246,6 +2258,18 @@ AdvanceXLInsertBuffer(XLogRecPtr upto, bool opportunistic)
|
||||
if (!Insert->forcePageWrites)
|
||||
NewPage->xlp_info |= XLP_BKP_REMOVABLE;
|
||||
|
||||
/*
|
||||
* If a record was found to be broken at the end of recovery, and
|
||||
* we're going to write on the page where its first contrecord was
|
||||
* lost, set the XLP_FIRST_IS_OVERWRITE_CONTRECORD flag on the page
|
||||
* header. See CreateOverwriteContrecordRecord().
|
||||
*/
|
||||
if (missingContrecPtr == NewPageBeginPtr)
|
||||
{
|
||||
NewPage->xlp_info |= XLP_FIRST_IS_OVERWRITE_CONTRECORD;
|
||||
missingContrecPtr = InvalidXLogRecPtr;
|
||||
}
|
||||
|
||||
/*
|
||||
* If first page of an XLOG segment file, make it a long header.
|
||||
*/
|
||||
@ -4394,6 +4418,19 @@ ReadRecord(XLogReaderState *xlogreader, int emode,
|
||||
EndRecPtr = xlogreader->EndRecPtr;
|
||||
if (record == NULL)
|
||||
{
|
||||
/*
|
||||
* When not in standby mode we find that WAL ends in an incomplete
|
||||
* record, keep track of that record. After recovery is done,
|
||||
* we'll write a record to indicate downstream WAL readers that
|
||||
* that portion is to be ignored.
|
||||
*/
|
||||
if (!StandbyMode &&
|
||||
!XLogRecPtrIsInvalid(xlogreader->abortedRecPtr))
|
||||
{
|
||||
abortedRecPtr = xlogreader->abortedRecPtr;
|
||||
missingContrecPtr = xlogreader->missingContrecPtr;
|
||||
}
|
||||
|
||||
if (readFile >= 0)
|
||||
{
|
||||
close(readFile);
|
||||
@ -7069,6 +7106,12 @@ StartupXLOG(void)
|
||||
InRecovery = true;
|
||||
}
|
||||
|
||||
/*
|
||||
* Start recovery assuming that the final record isn't lost.
|
||||
*/
|
||||
abortedRecPtr = InvalidXLogRecPtr;
|
||||
missingContrecPtr = InvalidXLogRecPtr;
|
||||
|
||||
/* REDO */
|
||||
if (InRecovery)
|
||||
{
|
||||
@ -7655,8 +7698,9 @@ StartupXLOG(void)
|
||||
|
||||
/*
|
||||
* Kill WAL receiver, if it's still running, before we continue to write
|
||||
* the startup checkpoint record. It will trump over the checkpoint and
|
||||
* subsequent records if it's still alive when we start writing WAL.
|
||||
* the startup checkpoint and aborted-contrecord records. It will trump
|
||||
* over these records and subsequent ones if it's still alive when we
|
||||
* start writing WAL.
|
||||
*/
|
||||
XLogShutdownWalRcv();
|
||||
|
||||
@ -7689,8 +7733,12 @@ StartupXLOG(void)
|
||||
StandbyMode = false;
|
||||
|
||||
/*
|
||||
* Re-fetch the last valid or last applied record, so we can identify the
|
||||
* exact endpoint of what we consider the valid portion of WAL.
|
||||
* Determine where to start writing WAL next.
|
||||
*
|
||||
* When recovery ended in an incomplete record, write a WAL record about
|
||||
* that and continue after it. In all other cases, re-fetch the last
|
||||
* valid or last applied record, so we can identify the exact endpoint of
|
||||
* what we consider the valid portion of WAL.
|
||||
*/
|
||||
XLogBeginRead(xlogreader, LastRec);
|
||||
record = ReadRecord(xlogreader, PANIC, false);
|
||||
@ -7821,6 +7869,18 @@ StartupXLOG(void)
|
||||
XLogCtl->ThisTimeLineID = ThisTimeLineID;
|
||||
XLogCtl->PrevTimeLineID = PrevTimeLineID;
|
||||
|
||||
/*
|
||||
* Actually, if WAL ended in an incomplete record, skip the parts that
|
||||
* made it through and start writing after the portion that persisted.
|
||||
* (It's critical to first write an OVERWRITE_CONTRECORD message, which
|
||||
* we'll do as soon as we're open for writing new WAL.)
|
||||
*/
|
||||
if (!XLogRecPtrIsInvalid(missingContrecPtr))
|
||||
{
|
||||
Assert(!XLogRecPtrIsInvalid(abortedRecPtr));
|
||||
EndOfLog = missingContrecPtr;
|
||||
}
|
||||
|
||||
/*
|
||||
* Prepare to write WAL starting at EndOfLog location, and init xlog
|
||||
* buffer cache using the block containing the last record from the
|
||||
@ -7873,13 +7933,23 @@ StartupXLOG(void)
|
||||
XLogCtl->LogwrtRqst.Write = EndOfLog;
|
||||
XLogCtl->LogwrtRqst.Flush = EndOfLog;
|
||||
|
||||
LocalSetXLogInsertAllowed();
|
||||
|
||||
/* If necessary, write overwrite-contrecord before doing anything else */
|
||||
if (!XLogRecPtrIsInvalid(abortedRecPtr))
|
||||
{
|
||||
Assert(!XLogRecPtrIsInvalid(missingContrecPtr));
|
||||
CreateOverwriteContrecordRecord(abortedRecPtr);
|
||||
abortedRecPtr = InvalidXLogRecPtr;
|
||||
missingContrecPtr = InvalidXLogRecPtr;
|
||||
}
|
||||
|
||||
/*
|
||||
* Update full_page_writes in shared memory and write an XLOG_FPW_CHANGE
|
||||
* record before resource manager writes cleanup WAL records or checkpoint
|
||||
* record is written.
|
||||
*/
|
||||
Insert->fullPageWrites = lastFullPageWrites;
|
||||
LocalSetXLogInsertAllowed();
|
||||
UpdateFullPageWrites();
|
||||
LocalXLogInsertAllowed = -1;
|
||||
|
||||
@ -9365,6 +9435,53 @@ CreateEndOfRecoveryRecord(void)
|
||||
LocalXLogInsertAllowed = -1; /* return to "check" state */
|
||||
}
|
||||
|
||||
/*
|
||||
* Write an OVERWRITE_CONTRECORD message.
|
||||
*
|
||||
* When on WAL replay we expect a continuation record at the start of a page
|
||||
* that is not there, recovery ends and WAL writing resumes at that point.
|
||||
* But it's wrong to resume writing new WAL back at the start of the record
|
||||
* that was broken, because downstream consumers of that WAL (physical
|
||||
* replicas) are not prepared to "rewind". So the first action after
|
||||
* finishing replay of all valid WAL must be to write a record of this type
|
||||
* at the point where the contrecord was missing; to support xlogreader
|
||||
* detecting the special case, XLP_FIRST_IS_OVERWRITE_CONTRECORD is also added
|
||||
* to the page header where the record occurs. xlogreader has an ad-hoc
|
||||
* mechanism to report metadata about the broken record, which is what we
|
||||
* use here.
|
||||
*
|
||||
* At replay time, XLP_FIRST_IS_OVERWRITE_CONTRECORD instructs xlogreader to
|
||||
* skip the record it was reading, and pass back the LSN of the skipped
|
||||
* record, so that its caller can verify (on "replay" of that record) that the
|
||||
* XLOG_OVERWRITE_CONTRECORD matches what was effectively overwritten.
|
||||
*/
|
||||
static XLogRecPtr
|
||||
CreateOverwriteContrecordRecord(XLogRecPtr aborted_lsn)
|
||||
{
|
||||
xl_overwrite_contrecord xlrec;
|
||||
XLogRecPtr recptr;
|
||||
|
||||
/* sanity check */
|
||||
if (!RecoveryInProgress())
|
||||
elog(ERROR, "can only be used at end of recovery");
|
||||
|
||||
xlrec.overwritten_lsn = aborted_lsn;
|
||||
xlrec.overwrite_time = GetCurrentTimestamp();
|
||||
|
||||
START_CRIT_SECTION();
|
||||
|
||||
XLogBeginInsert();
|
||||
XLogRegisterData((char *) &xlrec, sizeof(xl_overwrite_contrecord));
|
||||
|
||||
recptr = XLogInsert(RM_XLOG_ID, XLOG_OVERWRITE_CONTRECORD);
|
||||
|
||||
XLogFlush(recptr);
|
||||
|
||||
END_CRIT_SECTION();
|
||||
|
||||
return recptr;
|
||||
}
|
||||
|
||||
/*
|
||||
* Flush all data in shared memory to disk, and fsync
|
||||
*
|
||||
@ -10295,6 +10412,13 @@ xlog_redo(XLogReaderState *record)
|
||||
|
||||
RecoveryRestartPoint(&checkPoint);
|
||||
}
|
||||
else if (info == XLOG_OVERWRITE_CONTRECORD)
|
||||
{
|
||||
xl_overwrite_contrecord xlrec;
|
||||
|
||||
memcpy(&xlrec, XLogRecGetData(record), sizeof(xl_overwrite_contrecord));
|
||||
VerifyOverwriteContrecord(&xlrec, record);
|
||||
}
|
||||
else if (info == XLOG_END_OF_RECOVERY)
|
||||
{
|
||||
xl_end_of_recovery xlrec;
|
||||
@ -10462,6 +10586,26 @@ xlog_redo(XLogReaderState *record)
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* Verify the payload of a XLOG_OVERWRITE_CONTRECORD record.
|
||||
*/
|
||||
static void
|
||||
VerifyOverwriteContrecord(xl_overwrite_contrecord *xlrec, XLogReaderState *state)
|
||||
{
|
||||
if (xlrec->overwritten_lsn != state->overwrittenRecPtr)
|
||||
elog(FATAL, "mismatching overwritten LSN %X/%X -> %X/%X",
|
||||
LSN_FORMAT_ARGS(xlrec->overwritten_lsn),
|
||||
LSN_FORMAT_ARGS(state->overwrittenRecPtr));
|
||||
|
||||
ereport(LOG,
|
||||
(errmsg("sucessfully skipped missing contrecord at %X/%X, overwritten at %s",
|
||||
LSN_FORMAT_ARGS(xlrec->overwritten_lsn),
|
||||
timestamptz_to_str(xlrec->overwrite_time))));
|
||||
|
||||
/* Verifying the record should only happen once */
|
||||
state->overwrittenRecPtr = InvalidXLogRecPtr;
|
||||
}
|
||||
|
||||
#ifdef WAL_DEBUG
|
||||
|
||||
static void
|
||||
|
@ -278,6 +278,7 @@ XLogReadRecord(XLogReaderState *state, char **errormsg)
|
||||
total_len;
|
||||
uint32 targetRecOff;
|
||||
uint32 pageHeaderSize;
|
||||
bool assembled;
|
||||
bool gotheader;
|
||||
int readOff;
|
||||
|
||||
@ -293,6 +294,8 @@ XLogReadRecord(XLogReaderState *state, char **errormsg)
|
||||
state->errormsg_buf[0] = '\0';
|
||||
|
||||
ResetDecoder(state);
|
||||
state->abortedRecPtr = InvalidXLogRecPtr;
|
||||
state->missingContrecPtr = InvalidXLogRecPtr;
|
||||
|
||||
RecPtr = state->EndRecPtr;
|
||||
|
||||
@ -319,7 +322,9 @@ XLogReadRecord(XLogReaderState *state, char **errormsg)
|
||||
randAccess = true;
|
||||
}
|
||||
|
||||
restart:
|
||||
state->currRecPtr = RecPtr;
|
||||
assembled = false;
|
||||
|
||||
targetPagePtr = RecPtr - (RecPtr % XLOG_BLCKSZ);
|
||||
targetRecOff = RecPtr % XLOG_BLCKSZ;
|
||||
@ -415,6 +420,8 @@ XLogReadRecord(XLogReaderState *state, char **errormsg)
|
||||
char *buffer;
|
||||
uint32 gotlen;
|
||||
|
||||
assembled = true;
|
||||
|
||||
/*
|
||||
* Enlarge readRecordBuf as needed.
|
||||
*/
|
||||
@ -448,8 +455,25 @@ XLogReadRecord(XLogReaderState *state, char **errormsg)
|
||||
|
||||
Assert(SizeOfXLogShortPHD <= readOff);
|
||||
|
||||
/* Check that the continuation on next page looks valid */
|
||||
pageHeader = (XLogPageHeader) state->readBuf;
|
||||
|
||||
/*
|
||||
* If we were expecting a continuation record and got an
|
||||
* "overwrite contrecord" flag, that means the continuation record
|
||||
* was overwritten with a different record. Restart the read by
|
||||
* assuming the address to read is the location where we found
|
||||
* this flag; but keep track of the LSN of the record we were
|
||||
* reading, for later verification.
|
||||
*/
|
||||
if (pageHeader->xlp_info & XLP_FIRST_IS_OVERWRITE_CONTRECORD)
|
||||
{
|
||||
state->overwrittenRecPtr = state->currRecPtr;
|
||||
ResetDecoder(state);
|
||||
RecPtr = targetPagePtr;
|
||||
goto restart;
|
||||
}
|
||||
|
||||
/* Check that the continuation on next page looks valid */
|
||||
if (!(pageHeader->xlp_info & XLP_FIRST_IS_CONTRECORD))
|
||||
{
|
||||
report_invalid_record(state,
|
||||
@ -551,6 +575,20 @@ XLogReadRecord(XLogReaderState *state, char **errormsg)
|
||||
return NULL;
|
||||
|
||||
err:
|
||||
if (assembled)
|
||||
{
|
||||
/*
|
||||
* We get here when a record that spans multiple pages needs to be
|
||||
* assembled, but something went wrong -- perhaps a contrecord piece
|
||||
* was lost. If caller is WAL replay, it will know where the aborted
|
||||
* record was and where to direct followup WAL to be written, marking
|
||||
* the next piece with XLP_FIRST_IS_OVERWRITE_CONTRECORD, which will
|
||||
* in turn signal downstream WAL consumers that the broken WAL record
|
||||
* is to be ignored.
|
||||
*/
|
||||
state->abortedRecPtr = RecPtr;
|
||||
state->missingContrecPtr = targetPagePtr;
|
||||
}
|
||||
|
||||
/*
|
||||
* Invalidate the read state. We might read from a different source after
|
||||
|
@ -76,8 +76,10 @@ typedef XLogLongPageHeaderData *XLogLongPageHeader;
|
||||
#define XLP_LONG_HEADER 0x0002
|
||||
/* This flag indicates backup blocks starting in this page are optional */
|
||||
#define XLP_BKP_REMOVABLE 0x0004
|
||||
/* Replaces a missing contrecord; see CreateOverwriteContrecordRecord */
|
||||
#define XLP_FIRST_IS_OVERWRITE_CONTRECORD 0x0008
|
||||
/* All defined flag bits in xlp_info (used for validity checking of header) */
|
||||
#define XLP_ALL_FLAGS 0x0007
|
||||
#define XLP_ALL_FLAGS 0x000F
|
||||
|
||||
#define XLogPageHeaderSize(hdr) \
|
||||
(((hdr)->xlp_info & XLP_LONG_HEADER) ? SizeOfXLogLongPHD : SizeOfXLogShortPHD)
|
||||
@ -249,6 +251,13 @@ typedef struct xl_restore_point
|
||||
char rp_name[MAXFNAMELEN];
|
||||
} xl_restore_point;
|
||||
|
||||
/* Overwrite of prior contrecord */
|
||||
typedef struct xl_overwrite_contrecord
|
||||
{
|
||||
XLogRecPtr overwritten_lsn;
|
||||
TimestampTz overwrite_time;
|
||||
} xl_overwrite_contrecord;
|
||||
|
||||
/* End of recovery mark, when we don't do an END_OF_RECOVERY checkpoint */
|
||||
typedef struct xl_end_of_recovery
|
||||
{
|
||||
|
@ -175,6 +175,16 @@ struct XLogReaderState
|
||||
XLogRecPtr ReadRecPtr; /* start of last record read */
|
||||
XLogRecPtr EndRecPtr; /* end+1 of last record read */
|
||||
|
||||
/*
|
||||
* Set at the end of recovery: the start point of a partial record at the
|
||||
* end of WAL (InvalidXLogRecPtr if there wasn't one), and the start
|
||||
* location of its first contrecord that went missing.
|
||||
*/
|
||||
XLogRecPtr abortedRecPtr;
|
||||
XLogRecPtr missingContrecPtr;
|
||||
/* Set when XLP_FIRST_IS_OVERWRITE_CONTRECORD is found */
|
||||
XLogRecPtr overwrittenRecPtr;
|
||||
|
||||
|
||||
/* ----------------------------------------
|
||||
* Decoded representation of current record
|
||||
|
@ -76,6 +76,8 @@ typedef struct CheckPoint
|
||||
#define XLOG_END_OF_RECOVERY 0x90
|
||||
#define XLOG_FPI_FOR_HINT 0xA0
|
||||
#define XLOG_FPI 0xB0
|
||||
/* 0xC0 is used in Postgres 9.5-11 */
|
||||
#define XLOG_OVERWRITE_CONTRECORD 0xD0
|
||||
|
||||
|
||||
/*
|
||||
|
207
src/test/recovery/t/026_overwrite_contrecord.pl
Normal file
207
src/test/recovery/t/026_overwrite_contrecord.pl
Normal file
@ -0,0 +1,207 @@
|
||||
# Copyright (c) 2021, PostgreSQL Global Development Group
|
||||
|
||||
# Tests for already-propagated WAL segments ending in incomplete WAL records.
|
||||
|
||||
use strict;
|
||||
use warnings;
|
||||
|
||||
use FindBin;
|
||||
use PostgresNode;
|
||||
use TestLib;
|
||||
use Test::More;
|
||||
|
||||
plan tests => 5;
|
||||
|
||||
# Test: Create a physical replica that's missing the last WAL file,
|
||||
# then restart the primary to create a divergent WAL file and observe
|
||||
# that the replica replays the "overwrite contrecord" from that new
|
||||
# file.
|
||||
|
||||
my $node = PostgresNode->new('primary');
|
||||
$node->init(allows_streaming => 1);
|
||||
$node->append_conf('postgresql.conf', 'wal_keep_size=1GB');
|
||||
$node->start;
|
||||
|
||||
$node->safe_psql('postgres', 'create table filler (a int)');
|
||||
# First, measure how many bytes does the insertion of 1000 rows produce
|
||||
my $start_lsn =
|
||||
$node->safe_psql('postgres', q{select pg_current_wal_insert_lsn() - '0/0'});
|
||||
$node->safe_psql('postgres',
|
||||
'insert into filler select * from generate_series(1, 1000)');
|
||||
my $end_lsn =
|
||||
$node->safe_psql('postgres', q{select pg_current_wal_insert_lsn() - '0/0'});
|
||||
my $rows_walsize = $end_lsn - $start_lsn;
|
||||
|
||||
# Now consume all remaining room in the current WAL segment, leaving
|
||||
# space enough only for the start of a largish record.
|
||||
$node->safe_psql(
|
||||
'postgres', qq{
|
||||
WITH setting AS (
|
||||
SELECT setting::int AS wal_segsize
|
||||
FROM pg_settings WHERE name = 'wal_segment_size'
|
||||
)
|
||||
INSERT INTO filler
|
||||
SELECT g FROM setting,
|
||||
generate_series(1, 1000 * (wal_segsize - ((pg_current_wal_insert_lsn() - '0/0') % wal_segsize)) / $rows_walsize) g
|
||||
});
|
||||
|
||||
my $initfile = $node->safe_psql('postgres',
|
||||
'SELECT pg_walfile_name(pg_current_wal_insert_lsn())');
|
||||
$node->safe_psql('postgres',
|
||||
qq{SELECT pg_logical_emit_message(true, 'test 026', repeat('xyzxz', 123456))}
|
||||
);
|
||||
#$node->safe_psql('postgres', qq{create table foo ()});
|
||||
my $endfile = $node->safe_psql('postgres',
|
||||
'SELECT pg_walfile_name(pg_current_wal_insert_lsn())');
|
||||
ok($initfile != $endfile, "$initfile differs from $endfile");
|
||||
|
||||
# Now stop abruptly, to avoid a stop checkpoint. We can remove the tail file
|
||||
# afterwards, and on startup the large message should be overwritten with new
|
||||
# contents
|
||||
$node->stop('immediate');
|
||||
|
||||
unlink $node->basedir . "/pgdata/pg_wal/$endfile"
|
||||
or die "could not unlink " . $node->basedir . "/pgdata/pg_wal/$endfile: $!";
|
||||
|
||||
# OK, create a standby at this spot.
|
||||
$node->backup_fs_cold('backup');
|
||||
my $node_standby = PostgresNode->new('standby');
|
||||
$node_standby->init_from_backup($node, 'backup', has_streaming => 1);
|
||||
|
||||
$node_standby->start;
|
||||
$node->start;
|
||||
|
||||
$node->safe_psql('postgres',
|
||||
qq{create table foo (a text); insert into foo values ('hello')});
|
||||
$node->safe_psql('postgres',
|
||||
qq{SELECT pg_logical_emit_message(true, 'test 026', 'AABBCC')});
|
||||
|
||||
my $until_lsn = $node->safe_psql('postgres', "SELECT pg_current_wal_lsn()");
|
||||
my $caughtup_query =
|
||||
"SELECT '$until_lsn'::pg_lsn <= pg_last_wal_replay_lsn()";
|
||||
$node_standby->poll_query_until('postgres', $caughtup_query)
|
||||
or die "Timed out while waiting for standby to catch up";
|
||||
|
||||
ok($node_standby->safe_psql('postgres', 'select * from foo') eq 'hello',
|
||||
'standby replays past overwritten contrecord');
|
||||
|
||||
# Verify message appears in standby's log
|
||||
my $log = slurp_file($node_standby->logfile);
|
||||
like(
|
||||
$log,
|
||||
qr[sucessfully skipped missing contrecord at],
|
||||
"found log line in standby");
|
||||
|
||||
$node->stop;
|
||||
$node_standby->stop;
|
||||
|
||||
|
||||
# Second test: a standby that receives WAL via archive/restore commands.
|
||||
$node = PostgresNode->new('primary2');
|
||||
$node->init(
|
||||
has_archiving => 1,
|
||||
extra => ['--wal-segsize=1']);
|
||||
|
||||
# Note: consistent use of forward slashes here avoids any escaping problems
|
||||
# that arise from use of backslashes. That means we need to double-quote all
|
||||
# the paths in the archive_command
|
||||
my $perlbin = TestLib::perl2host($^X);
|
||||
$perlbin =~ s!\\!/!g if $TestLib::windows_os;
|
||||
my $archivedir = $node->archive_dir;
|
||||
$archivedir =~ s!\\!/!g if $TestLib::windows_os;
|
||||
$node->append_conf(
|
||||
'postgresql.conf',
|
||||
qq{
|
||||
archive_command = '"$perlbin" "$FindBin::RealBin/idiosyncratic_copy" "%p" "$archivedir/%f"'
|
||||
wal_level = replica
|
||||
max_wal_senders = 2
|
||||
wal_keep_size = 1GB
|
||||
});
|
||||
# Make sure that Msys perl doesn't complain about difficulty in setting locale
|
||||
# when called from the archive_command.
|
||||
local $ENV{PERL_BADLANG} = 0;
|
||||
$node->start;
|
||||
$node->backup('backup');
|
||||
|
||||
$node_standby = PostgresNode->new('standby2');
|
||||
$node_standby->init_from_backup($node, 'backup', has_restoring => 1);
|
||||
|
||||
$node_standby->start;
|
||||
|
||||
$node->safe_psql('postgres', 'create table filler (a int)');
|
||||
# First, measure how many bytes does the insertion of 1000 rows produce
|
||||
$start_lsn =
|
||||
$node->safe_psql('postgres', q{select pg_current_wal_insert_lsn() - '0/0'});
|
||||
$node->safe_psql('postgres',
|
||||
'insert into filler select * from generate_series(1, 1000)');
|
||||
$end_lsn =
|
||||
$node->safe_psql('postgres', q{select pg_current_wal_insert_lsn() - '0/0'});
|
||||
$rows_walsize = $end_lsn - $start_lsn;
|
||||
|
||||
# Now consume all remaining room in the current WAL segment, leaving
|
||||
# space enough only for the start of a largish record.
|
||||
$node->safe_psql(
|
||||
'postgres', qq{
|
||||
WITH setting AS (
|
||||
SELECT setting::int AS wal_segsize
|
||||
FROM pg_settings WHERE name = 'wal_segment_size'
|
||||
)
|
||||
INSERT INTO filler
|
||||
SELECT g FROM setting,
|
||||
generate_series(1, 1000 * (wal_segsize - ((pg_current_wal_insert_lsn() - '0/0') % wal_segsize)) / $rows_walsize) g
|
||||
});
|
||||
|
||||
# Now block idiosyncratic_copy from creating the next WAL in the replica
|
||||
my $archivedgood = $node->safe_psql('postgres',
|
||||
q{SELECT pg_walfile_name(pg_current_wal_insert_lsn())});
|
||||
my $archivedfail = $node->safe_psql(
|
||||
'postgres',
|
||||
q{SELECT pg_walfile_name(pg_current_wal_insert_lsn() + setting::integer)
|
||||
from pg_settings where name = 'wal_segment_size'});
|
||||
open my $filefail, ">", "$archivedir/$archivedfail.fail"
|
||||
or die "can't open $archivedir/$archivedfail.fail: $!";
|
||||
|
||||
my $currlsn =
|
||||
$node->safe_psql('postgres', 'select pg_current_wal_insert_lsn() - 1000');
|
||||
|
||||
# Now produce a large WAL record in a transaction that we leave open
|
||||
my ($in, $out);
|
||||
my $timer = IPC::Run::timeout(180);
|
||||
my $h =
|
||||
$node->background_psql('postgres', \$in, \$out, $timer, on_error_stop => 0);
|
||||
|
||||
$in .= qq{BEGIN;
|
||||
SELECT pg_logical_emit_message(true, 'test 026', repeat('somenoise', 8192));
|
||||
};
|
||||
$h->pump_nb;
|
||||
$node->poll_query_until(
|
||||
'postgres',
|
||||
"SELECT last_archived_wal >= '$archivedgood' FROM pg_stat_archiver"),
|
||||
or die "Timed out while waiting for standby to catch up";
|
||||
|
||||
# Now crash the node with the transaction open
|
||||
$node->stop('immediate');
|
||||
#$h->finish();
|
||||
$node->start;
|
||||
$node->safe_psql('postgres', 'create table witness (a int);');
|
||||
$node->safe_psql('postgres', 'insert into witness values (42)');
|
||||
unlink "$archivedir/$archivedfail.fail"
|
||||
or die "can't unlink $archivedir/$archivedfail.fail: $!";
|
||||
$node->safe_psql('postgres', 'select pg_switch_wal()');
|
||||
|
||||
$until_lsn = $node->safe_psql('postgres', "SELECT pg_current_wal_lsn()");
|
||||
$caughtup_query = "SELECT '$until_lsn'::pg_lsn <= pg_last_wal_replay_lsn()";
|
||||
$node_standby->poll_query_until('postgres', $caughtup_query)
|
||||
or die "Timed out while waiting for standby to catch up";
|
||||
|
||||
my $answer = $node_standby->safe_psql('postgres', 'select * from witness');
|
||||
is($answer, '42', 'witness tuple appears');
|
||||
|
||||
# Verify message appears in standby's log
|
||||
$log = slurp_file($node_standby->logfile);
|
||||
like(
|
||||
$log,
|
||||
qr[sucessfully skipped missing contrecord at],
|
||||
"found log line in standby");
|
||||
$node->stop;
|
||||
$node_standby->stop;
|
20
src/test/recovery/t/idiosyncratic_copy
Executable file
20
src/test/recovery/t/idiosyncratic_copy
Executable file
@ -0,0 +1,20 @@
|
||||
#!/usr/bin/perl
|
||||
|
||||
use strict;
|
||||
use warnings;
|
||||
|
||||
use File::Copy;
|
||||
|
||||
die "wrong number of arguments" if @ARGV != 2;
|
||||
my ($source, $target) = @ARGV;
|
||||
if ($^O eq 'msys')
|
||||
{
|
||||
# make a windows path look like an msys path if necessary
|
||||
$source =~ s!^([A-Za-z]):!'/' . lc($1)!e;
|
||||
$source =~ s!\\!/!g;
|
||||
}
|
||||
|
||||
die "$0: failed copy of $target" if -f "$target.fail";
|
||||
|
||||
copy($source, $target) or die "couldn't copy $source to $target: $!";
|
||||
print STDERR "$0: archived $source to $target successfully\n";
|
@ -3711,6 +3711,7 @@ xl_logical_message
|
||||
xl_multi_insert_tuple
|
||||
xl_multixact_create
|
||||
xl_multixact_truncate
|
||||
xl_overwrite_contrecord
|
||||
xl_parameter_change
|
||||
xl_relmap_update
|
||||
xl_replorigin_drop
|
||||
|
Loading…
Reference in New Issue
Block a user