diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index d29c0c5a55..3874939380 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -776,7 +776,7 @@ apply_handle_stream_start(StringInfo s) hash_ctl.entrysize = sizeof(StreamXidHash); hash_ctl.hcxt = ApplyContext; xidhash = hash_create("StreamXidHash", 1024, &hash_ctl, - HASH_ELEM | HASH_CONTEXT); + HASH_ELEM | HASH_BLOBS | HASH_CONTEXT); } /* open the spool file for this transaction */ diff --git a/src/test/perl/PostgresNode.pm b/src/test/perl/PostgresNode.pm index 014f0fcda6..9667f7667e 100644 --- a/src/test/perl/PostgresNode.pm +++ b/src/test/perl/PostgresNode.pm @@ -1565,6 +1565,93 @@ sub psql =pod +=item $node->background_psql($dbname, \$stdin, \$stdout, $timer, %params) => harness + +Invoke B on B<$dbname> and return an IPC::Run harness object, which the +caller may use to send input to B. The process's stdin is sourced from +the $stdin scalar reference, and its stdout and stderr go to the $stdout +scalar reference. This allows the caller to act on other parts of the system +while idling this backend. + +The specified timer object is attached to the harness, as well. It's caller's +responsibility to select the timeout length, and to restart the timer after +each command if the timeout is per-command. + +psql is invoked in tuples-only unaligned mode with reading of B<.psqlrc> +disabled. That may be overridden by passing extra psql parameters. + +Dies on failure to invoke psql, or if psql fails to connect. Errors occurring +later are the caller's problem. psql runs with on_error_stop by default so +that it will stop running sql and return 3 if passed SQL results in an error. + +Be sure to "finish" the harness when done with it. + +=over + +=item on_error_stop => 1 + +By default, the B method invokes the B program with ON_ERROR_STOP=1 +set, so SQL execution is stopped at the first error and exit code 3 is +returned. Set B to 0 to ignore errors instead. + +=item replication => B + +If set, add B to the conninfo string. +Passing the literal value C results in a logical replication +connection. + +=item extra_params => ['--single-transaction'] + +If given, it must be an array reference containing additional parameters to B. + +=back + +=cut + +sub background_psql +{ + my ($self, $dbname, $stdin, $stdout, $timer, %params) = @_; + + my $replication = $params{replication}; + + my @psql_params = ( + 'psql', + '-XAtq', + '-d', + $self->connstr($dbname) + . (defined $replication ? " replication=$replication" : ""), + '-f', + '-'); + + $params{on_error_stop} = 1 unless defined $params{on_error_stop}; + + push @psql_params, '-v', 'ON_ERROR_STOP=1' if $params{on_error_stop}; + push @psql_params, @{ $params{extra_params} } + if defined $params{extra_params}; + + # Ensure there is no data waiting to be sent: + $$stdin = "" if ref($stdin); + # IPC::Run would otherwise append to existing contents: + $$stdout = "" if ref($stdout); + + my $harness = IPC::Run::start \@psql_params, + '<', $stdin, '>', $stdout, $timer; + + # Request some output, and pump until we see it. This means that psql + # connection failures are caught here, relieving callers of the need to + # handle those. (Right now, we have no particularly good handling for + # errors anyway, but that might be added later.) + my $banner = "background_psql: ready"; + $$stdin = "\\echo $banner\n"; + pump $harness until $$stdout =~ /$banner/ || $timer->is_expired; + + die "psql startup timed out" if $timer->is_expired; + + return $harness; +} + +=pod + =item $node->interactive_psql($dbname, \$stdin, \$stdout, $timer, %params) => harness Invoke B on B<$dbname> and return an IPC::Run harness object, diff --git a/src/test/subscription/t/015_stream.pl b/src/test/subscription/t/015_stream.pl index fffe001965..1b0e6fb9fb 100644 --- a/src/test/subscription/t/015_stream.pl +++ b/src/test/subscription/t/015_stream.pl @@ -46,15 +46,37 @@ my $result = $node_subscriber->safe_psql('postgres', "SELECT count(*), count(c), count(d = 999) FROM test_tab"); is($result, qq(2|2|2), 'check initial data was copied to subscriber'); -# Insert, update and delete enough rows to exceed the 64kB limit. -$node_publisher->safe_psql('postgres', q{ +# Interleave a pair of transactions, each exceeding the 64kB limit. +my $in = ''; +my $out = ''; + +my $timer = IPC::Run::timeout(180); + +my $h = $node_publisher->background_psql('postgres', \$in, \$out, $timer, + on_error_stop => 0); + +$in .= q{ BEGIN; INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(3, 5000) s(i); UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0; DELETE FROM test_tab WHERE mod(a,3) = 0; +}; +$h->pump_nb; + +$node_publisher->safe_psql( + 'postgres', q{ +BEGIN; +INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(5001, 9999) s(i); +DELETE FROM test_tab WHERE a > 5000; COMMIT; }); +$in .= q{ +COMMIT; +\q +}; +$h->finish; # errors make the next test fail, so ignore them here + $node_publisher->wait_for_catchup($appname); $result =