Don't trust unvalidated xl_tot_len.

xl_tot_len comes first in a WAL record.  Usually we don't trust it to be
the true length until we've validated the record header.  If the record
header was split across two pages, previously we wouldn't do the
validation until after we'd already tried to allocate enough memory to
hold the record, which was bad because it might actually be garbage
bytes from a recycled WAL file, so we could try to allocate a lot of
memory.  Release 15 made it worse.

Since 70b4f82a4b, we'd at least generate an end-of-WAL condition if the
garbage 4 byte value happened to be > 1GB, but we'd still try to
allocate up to 1GB of memory bogusly otherwise.  That was an
improvement, but unfortunately release 15 tries to allocate another
object before that, so you could get a FATAL error and recovery could
fail.

We can fix both variants of the problem more fundamentally using
pre-existing page-level validation, if we just re-order some logic.

The new order of operations in the split-header case defers all memory
allocation based on xl_tot_len until we've read the following page.  At
that point we know that its first few bytes are not recycled data, by
checking its xlp_pageaddr, and that its xlp_rem_len agrees with
xl_tot_len on the preceding page.  That is strong evidence that
xl_tot_len was truly the start of a record that was logged.

This problem was most likely to occur on a standby, because
walreceiver.c recycles WAL files without zeroing out trailing regions of
each page.  We could fix that too, but it wouldn't protect us from rare
crash scenarios where the trailing zeroes don't make it to disk.

With reliable xl_tot_len validation in place, the ancient policy of
considering malloc failure to indicate corruption at end-of-WAL seems
quite surprising, but changing that is left for later work.

Also included is a new TAP test to exercise various cases of end-of-WAL
detection by writing contrived data into the WAL from Perl.

Back-patch to 12.  We decided not to put this change into the final
release of 11.

Author: Thomas Munro <thomas.munro@gmail.com>
Author: Michael Paquier <michael@paquier.xyz>
Reported-by: Alexander Lakhin <exclusion@gmail.com>
Reviewed-by: Noah Misch <noah@leadboat.com> (the idea, not the code)
Reviewed-by: Michael Paquier <michael@paquier.xyz>
Reviewed-by: Sergei Kornilov <sk@zsrv.org>
Reviewed-by: Alexander Lakhin <exclusion@gmail.com>
Discussion: https://postgr.es/m/17928-aa92416a70ff44a2%40postgresql.org
This commit is contained in:
Thomas Munro 2023-09-23 10:26:24 +12:00
parent 755eb44d3c
commit bae868caf2
4 changed files with 569 additions and 56 deletions

View File

@ -192,6 +192,9 @@ XLogReaderFree(XLogReaderState *state)
* XLOG_BLCKSZ, and make sure it's at least 5*Max(BLCKSZ, XLOG_BLCKSZ) to start
* with. (That is enough for all "normal" records, but very large commit or
* abort records might need more space.)
*
* Note: This routine should *never* be called for xl_tot_len until the header
* of the record has been fully validated.
*/
static bool
allocate_recordbuf(XLogReaderState *state, uint32 reclength)
@ -201,25 +204,6 @@ allocate_recordbuf(XLogReaderState *state, uint32 reclength)
newSize += XLOG_BLCKSZ - (newSize % XLOG_BLCKSZ);
newSize = Max(newSize, 5 * Max(BLCKSZ, XLOG_BLCKSZ));
#ifndef FRONTEND
/*
* Note that in much unlucky circumstances, the random data read from a
* recycled segment can cause this routine to be called with a size
* causing a hard failure at allocation. For a standby, this would cause
* the instance to stop suddenly with a hard failure, preventing it to
* retry fetching WAL from one of its sources which could allow it to move
* on with replay without a manual restart. If the data comes from a past
* recycled segment and is still valid, then the allocation may succeed
* but record checks are going to fail so this would be short-lived. If
* the allocation fails because of a memory shortage, then this is not a
* hard failure either per the guarantee given by MCXT_ALLOC_NO_OOM.
*/
if (!AllocSizeIsValid(newSize))
return false;
#endif
if (state->readRecordBuf)
pfree(state->readRecordBuf);
state->readRecordBuf =
@ -669,41 +653,26 @@ restart:
}
else
{
/* XXX: more validation should be done here */
if (total_len < SizeOfXLogRecord)
{
report_invalid_record(state,
"invalid record length at %X/%X: expected at least %u, got %u",
LSN_FORMAT_ARGS(RecPtr),
(uint32) SizeOfXLogRecord, total_len);
goto err;
}
/* We'll validate the header once we have the next page. */
gotheader = false;
}
/*
* Find space to decode this record. Don't allow oversized allocation if
* the caller requested nonblocking. Otherwise, we *have* to try to
* decode the record now because the caller has nothing else to do, so
* allow an oversized record to be palloc'd if that turns out to be
* necessary.
* Try to find space to decode this record, if we can do so without
* calling palloc. If we can't, we'll try again below after we've
* validated that total_len isn't garbage bytes from a recycled WAL page.
*/
decoded = XLogReadRecordAlloc(state,
total_len,
!nonblocking /* allow_oversized */ );
if (decoded == NULL)
false /* allow_oversized */ );
if (decoded == NULL && nonblocking)
{
/*
* There is no space in the decode buffer. The caller should help
* with that problem by consuming some records.
* There is no space in the circular decode buffer, and the caller is
* only reading ahead. The caller should consume existing records to
* make space.
*/
if (nonblocking)
return XLREAD_WOULDBLOCK;
/* We failed to allocate memory for an oversized record. */
report_invalid_record(state,
"out of memory while trying to decode a record of length %u", total_len);
goto err;
}
len = XLOG_BLCKSZ - RecPtr % XLOG_BLCKSZ;
@ -718,16 +687,11 @@ restart:
assembled = true;
/*
* Enlarge readRecordBuf as needed.
* We always have space for a couple of pages, enough to validate a
* boundary-spanning record header.
*/
if (total_len > state->readRecordBufSize &&
!allocate_recordbuf(state, total_len))
{
/* We treat this as a "bogus data" condition */
report_invalid_record(state, "record length %u at %X/%X too long",
total_len, LSN_FORMAT_ARGS(RecPtr));
goto err;
}
Assert(state->readRecordBufSize >= XLOG_BLCKSZ * 2);
Assert(state->readRecordBufSize >= len);
/* Copy the first fragment of the record from the first page. */
memcpy(state->readRecordBuf,
@ -824,8 +788,35 @@ restart:
goto err;
gotheader = true;
}
} while (gotlen < total_len);
/*
* We might need a bigger buffer. We have validated the record
* header, in the case that it split over a page boundary. We've
* also cross-checked total_len against xlp_rem_len on the second
* page, and verified xlp_pageaddr on both.
*/
if (total_len > state->readRecordBufSize)
{
char save_copy[XLOG_BLCKSZ * 2];
/*
* Save and restore the data we already had. It can't be more
* than two pages.
*/
Assert(gotlen <= lengthof(save_copy));
Assert(gotlen <= state->readRecordBufSize);
memcpy(save_copy, state->readRecordBuf, gotlen);
if (!allocate_recordbuf(state, total_len))
{
/* We treat this as a "bogus data" condition */
report_invalid_record(state, "record length %u at %X/%X too long",
total_len, LSN_FORMAT_ARGS(RecPtr));
goto err;
}
memcpy(state->readRecordBuf, save_copy, gotlen);
buffer = state->readRecordBuf + gotlen;
}
} while (gotlen < total_len);
Assert(gotheader);
record = (XLogRecord *) state->readRecordBuf;
@ -867,6 +858,28 @@ restart:
state->NextRecPtr -= XLogSegmentOffset(state->NextRecPtr, state->segcxt.ws_segsize);
}
/*
* If we got here without a DecodedXLogRecord, it means we needed to
* validate total_len before trusting it, but by now now we've done that.
*/
if (decoded == NULL)
{
Assert(!nonblocking);
decoded = XLogReadRecordAlloc(state,
total_len,
true /* allow_oversized */ );
if (decoded == NULL)
{
/*
* We failed to allocate memory for an oversized record. As
* above, we currently treat this as a "bogus data" condition.
*/
report_invalid_record(state,
"out of memory while trying to decode a record of length %u", total_len);
goto err;
}
}
if (DecodeXLogRecord(state, decoded, record, RecPtr, &errormsg))
{
/* Record the location of the next record. */
@ -895,8 +908,6 @@ restart:
state->decode_queue_head = decoded;
return XLREAD_SUCCESS;
}
else
return XLREAD_FAIL;
err:
if (assembled)

View File

@ -71,6 +71,7 @@ our @EXPORT = qw(
chmod_recursive
check_pg_config
dir_symlink
scan_server_header
system_or_bail
system_log
run_log
@ -702,6 +703,46 @@ sub chmod_recursive
=pod
=item scan_server_header(header_path, regexp)
Returns an array that stores all the matches of the given regular expression
within the PostgreSQL installation's C<header_path>. This can be used to
retrieve specific value patterns from the installation's header files.
=cut
sub scan_server_header
{
my ($header_path, $regexp) = @_;
my ($stdout, $stderr);
my $result = IPC::Run::run [ 'pg_config', '--includedir-server' ], '>',
\$stdout, '2>', \$stderr
or die "could not execute pg_config";
chomp($stdout);
$stdout =~ s/\r$//;
open my $header_h, '<', "$stdout/$header_path" or die "$!";
my @match = undef;
while (<$header_h>)
{
my $line = $_;
if (@match = $line =~ /^$regexp/)
{
last;
}
}
close $header_h;
die "could not find match in header $header_path\n"
unless @match;
return @match;
}
=pod
=item check_pg_config(regexp)
Return the number of matches of the given regular expression

View File

@ -44,6 +44,7 @@ tests += {
't/036_truncated_dropped.pl',
't/037_invalid_database.pl',
't/038_save_logical_slots_shutdown.pl',
't/039_end_of_wal.pl',
],
},
}

View File

@ -0,0 +1,460 @@
# Copyright (c) 2023, PostgreSQL Global Development Group
#
# Test detecting end-of-WAL conditions. This test suite generates
# fake defective page and record headers to trigger various failure
# scenarios.
use strict;
use warnings;
use PostgreSQL::Test::Cluster;
use PostgreSQL::Test::Utils;
use Test::More;
use Fcntl qw(SEEK_SET);
use integer; # causes / operator to use integer math
# Header size of record header.
my $RECORD_HEADER_SIZE = 24;
# Fields retrieved from code headers.
my @scan_result = scan_server_header('access/xlog_internal.h',
'#define\s+XLOG_PAGE_MAGIC\s+(\w+)');
my $XLP_PAGE_MAGIC = hex($scan_result[0]);
@scan_result = scan_server_header('access/xlog_internal.h',
'#define\s+XLP_FIRST_IS_CONTRECORD\s+(\w+)');
my $XLP_FIRST_IS_CONTRECORD = hex($scan_result[0]);
# Values queried from the server
my $WAL_SEGMENT_SIZE;
my $WAL_BLOCK_SIZE;
my $TLI;
# Build path of a WAL segment.
sub wal_segment_path
{
my $node = shift;
my $tli = shift;
my $segment = shift;
my $wal_path =
sprintf("%s/pg_wal/%08X%08X%08X", $node->data_dir, $tli, 0, $segment);
return $wal_path;
}
# Calculate from a LSN (in bytes) its segment number and its offset.
sub lsn_to_segment_and_offset
{
my $lsn = shift;
return ($lsn / $WAL_SEGMENT_SIZE, $lsn % $WAL_SEGMENT_SIZE);
}
# Write some arbitrary data in WAL for the given segment at LSN.
# This should be called while the cluster is not running.
sub write_wal
{
my $node = shift;
my $tli = shift;
my $lsn = shift;
my $data = shift;
my ($segment, $offset) = lsn_to_segment_and_offset($lsn);
my $path = wal_segment_path($node, $tli, $segment);
open my $fh, "+<:raw", $path or die;
seek($fh, $offset, SEEK_SET) or die;
print $fh $data;
close $fh;
}
# Emit a WAL record of arbitrary size. Returns the end LSN of the
# record inserted, in bytes.
sub emit_message
{
my $node = shift;
my $size = shift;
return int(
$node->safe_psql(
'postgres',
"SELECT pg_logical_emit_message(true, '', repeat('a', $size)) - '0/0'"
));
}
# Get the current insert LSN of a node, in bytes.
sub get_insert_lsn
{
my $node = shift;
return int(
$node->safe_psql(
'postgres', "SELECT pg_current_wal_insert_lsn() - '0/0'"));
}
# Get GUC value, converted to an int.
sub get_int_setting
{
my $node = shift;
my $name = shift;
return int(
$node->safe_psql(
'postgres',
"SELECT setting FROM pg_settings WHERE name = '$name'"));
}
sub start_of_page
{
my $lsn = shift;
return $lsn & ~($WAL_BLOCK_SIZE - 1);
}
sub start_of_next_page
{
my $lsn = shift;
return start_of_page($lsn) + $WAL_BLOCK_SIZE;
}
# Build a fake WAL record header based on the data given by the caller.
# This needs to follow the format of the C structure XLogRecord. To
# be inserted with write_wal().
sub build_record_header
{
my $xl_tot_len = shift;
my $xl_xid = shift || 0;
my $xl_prev = shift || 0;
my $xl_info = shift || 0;
my $xl_rmid = shift || 0;
my $xl_crc = shift || 0;
# This needs to follow the structure XLogRecord:
# I for xl_tot_len
# I for xl_xid
# Q for xl_prev
# C for xl_info
# C for xl_rmid
# BB for two bytes of padding
# I for xl_crc
return pack("IIQCCBBI",
$xl_tot_len, $xl_xid, $xl_prev, $xl_info, $xl_rmid, 0, 0, $xl_crc);
}
# Build a fake WAL page header, based on the data given by the caller
# This needs to follow the format of the C structure XLogPageHeaderData.
# To be inserted with write_wal().
sub build_page_header
{
my $xlp_magic = shift;
my $xlp_info = shift || 0;
my $xlp_tli = shift || 0;
my $xlp_pageaddr = shift || 0;
my $xlp_rem_len = shift || 0;
# This needs to follow the structure XLogPageHeaderData:
# S for xlp_magic
# S for xlp_info
# I for xlp_tli
# Q for xlp_pageaddr
# I for xlp_rem_len
return pack("SSIQI",
$xlp_magic, $xlp_info, $xlp_tli, $xlp_pageaddr, $xlp_rem_len);
}
# Make sure we are far away enough from the end of a page that we could insert
# a couple of small records. This inserts a few records of a fixed size, until
# the threshold gets close enough to the end of the WAL page inserting records
# to.
sub advance_out_of_record_splitting_zone
{
my $node = shift;
my $page_threshold = $WAL_BLOCK_SIZE / 4;
my $end_lsn = get_insert_lsn($node);
my $page_offset = $end_lsn % $WAL_BLOCK_SIZE;
while ($page_offset >= $WAL_BLOCK_SIZE - $page_threshold)
{
emit_message($node, $page_threshold);
$end_lsn = get_insert_lsn($node);
$page_offset = $end_lsn % $WAL_BLOCK_SIZE;
}
return $end_lsn;
}
# Advance so close to the end of a page that an XLogRecordHeader would not
# fit on it.
sub advance_to_record_splitting_zone
{
my $node = shift;
my $end_lsn = get_insert_lsn($node);
my $page_offset = $end_lsn % $WAL_BLOCK_SIZE;
# Get fairly close to the end of a page in big steps
while ($page_offset <= $WAL_BLOCK_SIZE - 512)
{
emit_message($node, $WAL_BLOCK_SIZE - $page_offset - 256);
$end_lsn = get_insert_lsn($node);
$page_offset = $end_lsn % $WAL_BLOCK_SIZE;
}
# Calibrate our message size so that we can get closer 8 bytes at
# a time.
my $message_size = $WAL_BLOCK_SIZE - 80;
while ($page_offset <= $WAL_BLOCK_SIZE - $RECORD_HEADER_SIZE)
{
emit_message($node, $message_size);
$end_lsn = get_insert_lsn($node);
my $old_offset = $page_offset;
$page_offset = $end_lsn % $WAL_BLOCK_SIZE;
# Adjust the message size until it causes 8 bytes changes in
# offset, enough to be able to split a record header.
my $delta = $page_offset - $old_offset;
if ($delta > 8)
{
$message_size -= 8;
}
elsif ($delta <= 0)
{
$message_size += 8;
}
}
return $end_lsn;
}
# Setup a new node. The configuration chosen here minimizes the number
# of arbitrary records that could get generated in a cluster. Enlarging
# checkpoint_timeout avoids noise with checkpoint activity. wal_level
# set to "minimal" avoids random standby snapshot records. Autovacuum
# could also trigger randomly, generating random WAL activity of its own.
my $node = PostgreSQL::Test::Cluster->new("node");
$node->init;
$node->append_conf(
'postgresql.conf',
q[wal_level = minimal
autovacuum = off
checkpoint_timeout = '30min'
]);
$node->start;
$node->safe_psql('postgres', "CREATE TABLE t AS SELECT 42");
$WAL_SEGMENT_SIZE = get_int_setting($node, 'wal_segment_size');
$WAL_BLOCK_SIZE = get_int_setting($node, 'wal_block_size');
$TLI = $node->safe_psql('postgres',
"SELECT timeline_id FROM pg_control_checkpoint();");
my $end_lsn;
my $prev_lsn;
###########################################################################
note "Single-page end-of-WAL detection";
###########################################################################
# xl_tot_len is 0 (a common case, we hit trailing zeroes).
emit_message($node, 0);
$end_lsn = advance_out_of_record_splitting_zone($node);
$node->stop('immediate');
my $log_size = -s $node->logfile;
$node->start;
ok( $node->log_contains(
"invalid record length at .*: expected at least 24, got 0", $log_size
),
"xl_tot_len zero");
# xl_tot_len is < 24 (presumably recycled garbage).
emit_message($node, 0);
$end_lsn = advance_out_of_record_splitting_zone($node);
$node->stop('immediate');
write_wal($node, $TLI, $end_lsn, build_record_header(23));
$log_size = -s $node->logfile;
$node->start;
ok( $node->log_contains(
"invalid record length at .*: expected at least 24, got 23",
$log_size),
"xl_tot_len short");
# Need more pages, but xl_prev check fails first.
emit_message($node, 0);
$end_lsn = advance_out_of_record_splitting_zone($node);
$node->stop('immediate');
write_wal($node, $TLI, $end_lsn,
build_record_header(2 * 1024 * 1024 * 1024, 0, 0xdeadbeef));
$log_size = -s $node->logfile;
$node->start;
ok( $node->log_contains(
"record with incorrect prev-link 0/DEADBEEF at .*", $log_size),
"xl_prev bad");
# xl_crc check fails.
emit_message($node, 0);
advance_out_of_record_splitting_zone($node);
$end_lsn = emit_message($node, 10);
$node->stop('immediate');
# Corrupt a byte in that record, breaking its CRC.
write_wal($node, $TLI, $end_lsn - 8, '!');
$log_size = -s $node->logfile;
$node->start;
ok( $node->log_contains(
"incorrect resource manager data checksum in record at .*", $log_size
),
"xl_crc bad");
###########################################################################
note "Multi-page end-of-WAL detection, header is not split";
###########################################################################
# This series of tests requires a valid xl_prev set in the record header
# written to WAL.
# Good xl_prev, we hit zero page next (zero magic).
emit_message($node, 0);
$prev_lsn = advance_out_of_record_splitting_zone($node);
$end_lsn = emit_message($node, 0);
$node->stop('immediate');
write_wal($node, $TLI, $end_lsn,
build_record_header(2 * 1024 * 1024 * 1024, 0, $prev_lsn));
$log_size = -s $node->logfile;
$node->start;
ok($node->log_contains("invalid magic number 0000 .* LSN .*", $log_size),
"xlp_magic zero");
# Good xl_prev, we hit garbage page next (bad magic).
emit_message($node, 0);
$prev_lsn = advance_out_of_record_splitting_zone($node);
$end_lsn = emit_message($node, 0);
$node->stop('immediate');
write_wal($node, $TLI, $end_lsn,
build_record_header(2 * 1024 * 1024 * 1024, 0, $prev_lsn));
write_wal(
$node, $TLI,
start_of_next_page($end_lsn),
build_page_header(0xcafe, 0, 1, 0));
$log_size = -s $node->logfile;
$node->start;
ok($node->log_contains("invalid magic number CAFE .* LSN .*", $log_size),
"xlp_magic bad");
# Good xl_prev, we hit typical recycled page (good xlp_magic, bad
# xlp_pageaddr).
emit_message($node, 0);
$prev_lsn = advance_out_of_record_splitting_zone($node);
$end_lsn = emit_message($node, 0);
$node->stop('immediate');
write_wal($node, $TLI, $end_lsn,
build_record_header(2 * 1024 * 1024 * 1024, 0, $prev_lsn));
write_wal(
$node, $TLI,
start_of_next_page($end_lsn),
build_page_header($XLP_PAGE_MAGIC, 0, 1, 0xbaaaaaad));
$log_size = -s $node->logfile;
$node->start;
ok( $node->log_contains(
"unexpected pageaddr 0/BAAAAAAD in .*, LSN .*,", $log_size),
"xlp_pageaddr bad");
# Good xl_prev, xlp_magic, xlp_pageaddr, but bogus xlp_info.
emit_message($node, 0);
$prev_lsn = advance_out_of_record_splitting_zone($node);
$end_lsn = emit_message($node, 0);
$node->stop('immediate');
write_wal($node, $TLI, $end_lsn,
build_record_header(2 * 1024 * 1024 * 1024, 42, $prev_lsn));
write_wal(
$node, $TLI,
start_of_next_page($end_lsn),
build_page_header(
$XLP_PAGE_MAGIC, 0x1234, 1, start_of_next_page($end_lsn)));
$log_size = -s $node->logfile;
$node->start;
ok($node->log_contains("invalid info bits 1234 in .*, LSN .*,", $log_size),
"xlp_info bad");
# Good xl_prev, xlp_magic, xlp_pageaddr, but xlp_info doesn't mention
# continuation record.
emit_message($node, 0);
$prev_lsn = advance_out_of_record_splitting_zone($node);
$end_lsn = emit_message($node, 0);
$node->stop('immediate');
write_wal($node, $TLI, $end_lsn,
build_record_header(2 * 1024 * 1024 * 1024, 42, $prev_lsn));
write_wal(
$node, $TLI,
start_of_next_page($end_lsn),
build_page_header($XLP_PAGE_MAGIC, 0, 1, start_of_next_page($end_lsn)));
$log_size = -s $node->logfile;
$node->start;
ok($node->log_contains("there is no contrecord flag at .*", $log_size),
"xlp_info lacks XLP_FIRST_IS_CONTRECORD");
# Good xl_prev, xlp_magic, xlp_pageaddr, xlp_info but xlp_rem_len doesn't add
# up.
emit_message($node, 0);
$prev_lsn = advance_out_of_record_splitting_zone($node);
$end_lsn = emit_message($node, 0);
$node->stop('immediate');
write_wal($node, $TLI, $end_lsn,
build_record_header(2 * 1024 * 1024 * 1024, 42, $prev_lsn));
write_wal(
$node, $TLI,
start_of_next_page($end_lsn),
build_page_header(
$XLP_PAGE_MAGIC, $XLP_FIRST_IS_CONTRECORD,
1, start_of_next_page($end_lsn),
123456));
$log_size = -s $node->logfile;
$node->start;
ok( $node->log_contains(
"invalid contrecord length 123456 .* at .*", $log_size),
"xlp_rem_len bad");
###########################################################################
note "Multi-page, but header is split, so page checks are done first";
###########################################################################
# xl_prev is bad and xl_tot_len is too big, but we'll check xlp_magic first.
emit_message($node, 0);
$end_lsn = advance_to_record_splitting_zone($node);
$node->stop('immediate');
write_wal($node, $TLI, $end_lsn,
build_record_header(2 * 1024 * 1024 * 1024, 0, 0xdeadbeef));
$log_size = -s $node->logfile;
$node->start;
ok($node->log_contains("invalid magic number 0000 .* LSN .*", $log_size),
"xlp_magic zero (split record header)");
# And we'll also check xlp_pageaddr before any header checks.
emit_message($node, 0);
$end_lsn = advance_to_record_splitting_zone($node);
$node->stop('immediate');
write_wal($node, $TLI, $end_lsn,
build_record_header(2 * 1024 * 1024 * 1024, 0, 0xdeadbeef));
write_wal(
$node, $TLI,
start_of_next_page($end_lsn),
build_page_header(
$XLP_PAGE_MAGIC, $XLP_FIRST_IS_CONTRECORD, 1, 0xbaaaaaad));
$log_size = -s $node->logfile;
$node->start;
ok( $node->log_contains(
"unexpected pageaddr 0/BAAAAAAD in .*, LSN .*,", $log_size),
"xlp_pageaddr bad (split record header)");
# We'll also discover that xlp_rem_len doesn't add up before any
# header checks,
emit_message($node, 0);
$end_lsn = advance_to_record_splitting_zone($node);
$node->stop('immediate');
write_wal($node, $TLI, $end_lsn,
build_record_header(2 * 1024 * 1024 * 1024, 0, 0xdeadbeef));
write_wal(
$node, $TLI,
start_of_next_page($end_lsn),
build_page_header(
$XLP_PAGE_MAGIC, $XLP_FIRST_IS_CONTRECORD,
1, start_of_next_page($end_lsn),
123456));
$log_size = -s $node->logfile;
$node->start;
ok( $node->log_contains(
"invalid contrecord length 123456 .* at .*", $log_size),
"xlp_rem_len bad (split record header)");
done_testing();