mirror of
https://git.postgresql.org/git/postgresql.git
synced 2024-11-21 03:13:05 +08:00
Add a pg_recvlogical wrapper to PostgresNode
Allows testing of logical decoding using SQL interface and/or pg_recvlogical Most logical decoding tests are in contrib/test_decoding. This module is for work that doesn't fit well there, like where server restarts are required. Craig Ringer
This commit is contained in:
parent
d3cc37f1d8
commit
eb2a6131be
@ -1505,6 +1505,84 @@ sub slot
|
|||||||
|
|
||||||
=pod
|
=pod
|
||||||
|
|
||||||
|
=item $node->pg_recvlogical_upto(self, dbname, slot_name, endpos, timeout_secs, ...)
|
||||||
|
|
||||||
|
Invoke pg_recvlogical to read from slot_name on dbname until LSN endpos, which
|
||||||
|
corresponds to pg_recvlogical --endpos. Gives up after timeout (if nonzero).
|
||||||
|
|
||||||
|
Disallows pg_recvlogical from internally retrying on error by passing --no-loop.
|
||||||
|
|
||||||
|
Plugin options are passed as additional keyword arguments.
|
||||||
|
|
||||||
|
If called in scalar context, returns stdout, and die()s on timeout or nonzero return.
|
||||||
|
|
||||||
|
If called in array context, returns a tuple of (retval, stdout, stderr, timeout).
|
||||||
|
timeout is the IPC::Run::Timeout object whose is_expired method can be tested
|
||||||
|
to check for timeout. retval is undef on timeout.
|
||||||
|
|
||||||
|
=cut
|
||||||
|
|
||||||
|
sub pg_recvlogical_upto
|
||||||
|
{
|
||||||
|
my ($self, $dbname, $slot_name, $endpos, $timeout_secs, %plugin_options) = @_;
|
||||||
|
my ($stdout, $stderr);
|
||||||
|
|
||||||
|
my $timeout_exception = 'pg_recvlogical timed out';
|
||||||
|
|
||||||
|
die 'slot name must be specified' unless defined($slot_name);
|
||||||
|
die 'endpos must be specified' unless defined($endpos);
|
||||||
|
|
||||||
|
my @cmd = ('pg_recvlogical', '-S', $slot_name, '--dbname', $self->connstr($dbname));
|
||||||
|
push @cmd, '--endpos', $endpos;
|
||||||
|
push @cmd, '-f', '-', '--no-loop', '--start';
|
||||||
|
|
||||||
|
while (my ($k, $v) = each %plugin_options)
|
||||||
|
{
|
||||||
|
die "= is not permitted to appear in replication option name" if ($k =~ qr/=/);
|
||||||
|
push @cmd, "-o", "$k=$v";
|
||||||
|
}
|
||||||
|
|
||||||
|
my $timeout;
|
||||||
|
$timeout = IPC::Run::timeout($timeout_secs, exception => $timeout_exception ) if $timeout_secs;
|
||||||
|
my $ret = 0;
|
||||||
|
|
||||||
|
do {
|
||||||
|
local $@;
|
||||||
|
eval {
|
||||||
|
IPC::Run::run(\@cmd, ">", \$stdout, "2>", \$stderr, $timeout);
|
||||||
|
$ret = $?;
|
||||||
|
};
|
||||||
|
my $exc_save = $@;
|
||||||
|
if ($exc_save)
|
||||||
|
{
|
||||||
|
# IPC::Run::run threw an exception. re-throw unless it's a
|
||||||
|
# timeout, which we'll handle by testing is_expired
|
||||||
|
die $exc_save
|
||||||
|
if (blessed($exc_save) || $exc_save !~ qr/$timeout_exception/);
|
||||||
|
|
||||||
|
$ret = undef;
|
||||||
|
|
||||||
|
die "Got timeout exception '$exc_save' but timer not expired?!"
|
||||||
|
unless $timeout->is_expired;
|
||||||
|
|
||||||
|
die "$exc_save waiting for endpos $endpos with stdout '$stdout', stderr '$stderr'"
|
||||||
|
unless wantarray;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
if (wantarray)
|
||||||
|
{
|
||||||
|
return ($ret, $stdout, $stderr, $timeout);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
die "pg_recvlogical exited with code '$ret', stdout '$stdout' and stderr '$stderr'" if $ret;
|
||||||
|
return $stdout;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
=pod
|
||||||
|
|
||||||
=back
|
=back
|
||||||
|
|
||||||
=cut
|
=cut
|
||||||
|
@ -1,9 +1,13 @@
|
|||||||
# Testing of logical decoding using SQL interface and/or pg_recvlogical
|
# Testing of logical decoding using SQL interface and/or pg_recvlogical
|
||||||
|
#
|
||||||
|
# Most logical decoding tests are in contrib/test_decoding. This module
|
||||||
|
# is for work that doesn't fit well there, like where server restarts
|
||||||
|
# are required.
|
||||||
use strict;
|
use strict;
|
||||||
use warnings;
|
use warnings;
|
||||||
use PostgresNode;
|
use PostgresNode;
|
||||||
use TestLib;
|
use TestLib;
|
||||||
use Test::More tests => 2;
|
use Test::More tests => 5;
|
||||||
|
|
||||||
# Initialize master node
|
# Initialize master node
|
||||||
my $node_master = get_new_node('master');
|
my $node_master = get_new_node('master');
|
||||||
@ -35,5 +39,30 @@ $result = $node_master->safe_psql('postgres', qq[SELECT pg_logical_slot_get_chan
|
|||||||
chomp($result);
|
chomp($result);
|
||||||
is($result, '', 'Decoding after fast restart repeats no rows');
|
is($result, '', 'Decoding after fast restart repeats no rows');
|
||||||
|
|
||||||
|
# Insert some rows and verify that we get the same results from pg_recvlogical
|
||||||
|
# and the SQL interface.
|
||||||
|
$node_master->safe_psql('postgres', qq[INSERT INTO decoding_test(x,y) SELECT s, s::text FROM generate_series(1,4) s;]);
|
||||||
|
|
||||||
|
my $expected = q{BEGIN
|
||||||
|
table public.decoding_test: INSERT: x[integer]:1 y[text]:'1'
|
||||||
|
table public.decoding_test: INSERT: x[integer]:2 y[text]:'2'
|
||||||
|
table public.decoding_test: INSERT: x[integer]:3 y[text]:'3'
|
||||||
|
table public.decoding_test: INSERT: x[integer]:4 y[text]:'4'
|
||||||
|
COMMIT};
|
||||||
|
|
||||||
|
my $stdout_sql = $node_master->safe_psql('postgres', qq[SELECT data FROM pg_logical_slot_peek_changes('test_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');]);
|
||||||
|
is($stdout_sql, $expected, 'got expected output from SQL decoding session');
|
||||||
|
|
||||||
|
my $endpos = $node_master->safe_psql('postgres', "SELECT location FROM pg_logical_slot_peek_changes('test_slot', NULL, NULL) ORDER BY location DESC LIMIT 1;");
|
||||||
|
diag "waiting to replay $endpos";
|
||||||
|
|
||||||
|
my $stdout_recv = $node_master->pg_recvlogical_upto('postgres', 'test_slot', $endpos, 10, 'include-xids' => '0', 'skip-empty-xacts' => '1');
|
||||||
|
chomp($stdout_recv);
|
||||||
|
is($stdout_recv, $expected, 'got same expected output from pg_recvlogical decoding session');
|
||||||
|
|
||||||
|
$stdout_recv = $node_master->pg_recvlogical_upto('postgres', 'test_slot', $endpos, 10, 'include-xids' => '0', 'skip-empty-xacts' => '1');
|
||||||
|
chomp($stdout_recv);
|
||||||
|
is($stdout_recv, '', 'pg_recvlogical acknowledged changes, nothing pending on slot');
|
||||||
|
|
||||||
# done with the node
|
# done with the node
|
||||||
$node_master->stop;
|
$node_master->stop;
|
||||||
|
Loading…
Reference in New Issue
Block a user