Change the default value of the streaming option to 'parallel'.

Previously the default value of streaming option for a subscription was
'off'. The parallel option indicates that the changes in large
transactions (greater than logical_decoding_work_mem) are to be applied
directly via one of the parallel apply workers, if available.

The parallel mode was introduced in 16, but we refrain from enabling it by
default to avoid seeing any unpleasant behavior in the existing
applications. However we haven't found any such report yet, so this is a
good time to enable it by default.

Reported-by: Vignesh C
Author: Hayato Kuroda, Masahiko Sawada, Peter Smith, Amit Kapila
Discussion: https://postgr.es/m/CALDaNm1=MedhW23NuoePJTmonwsMSp80ddsw+sEJs0GUMC_kqQ@mail.gmail.com
This commit is contained in:
Amit Kapila 2024-10-28 08:42:05 +05:30
parent 6b652e6ce8
commit 1bf1140be8
6 changed files with 39 additions and 30 deletions

View File

@ -271,11 +271,23 @@ CREATE SUBSCRIPTION <replaceable class="parameter">subscription_name</replaceabl
<listitem> <listitem>
<para> <para>
Specifies whether to enable streaming of in-progress transactions Specifies whether to enable streaming of in-progress transactions
for this subscription. The default value is <literal>off</literal>, for this subscription. The default value is <literal>parallel</literal>,
meaning all transactions are fully decoded on the publisher and only meaning incoming changes are directly applied via one of the parallel
then sent to the subscriber as a whole. apply workers, if available. If no parallel apply worker is free to
handle streaming transactions then the changes are written to
temporary files and applied after the transaction is committed. Note
that if an error happens in a parallel apply worker, the finish LSN
of the remote transaction might not be reported in the server log.
</para> </para>
<caution>
<para>
There is a risk of deadlock when the schemas of the publisher and
subscriber differ, although such cases are rare. The apply worker
is equipped to retry these transactions automatically.
</para>
</caution>
<para> <para>
If set to <literal>on</literal>, the incoming changes are written to If set to <literal>on</literal>, the incoming changes are written to
temporary files and then applied only after the transaction is temporary files and then applied only after the transaction is
@ -283,13 +295,8 @@ CREATE SUBSCRIPTION <replaceable class="parameter">subscription_name</replaceabl
</para> </para>
<para> <para>
If set to <literal>parallel</literal>, incoming changes are directly If set to <literal>off</literal>, all transactions are fully decoded
applied via one of the parallel apply workers, if available. If no on the publisher and only then sent to the subscriber as a whole.
parallel apply worker is free to handle streaming transactions then
the changes are written to temporary files and applied after the
transaction is committed. Note that if an error happens in a
parallel apply worker, the finish LSN of the remote transaction
might not be reported in the server log.
</para> </para>
</listitem> </listitem>
</varlistentry> </varlistentry>

View File

@ -151,7 +151,7 @@ parse_subscription_options(ParseState *pstate, List *stmt_options,
if (IsSet(supported_opts, SUBOPT_BINARY)) if (IsSet(supported_opts, SUBOPT_BINARY))
opts->binary = false; opts->binary = false;
if (IsSet(supported_opts, SUBOPT_STREAMING)) if (IsSet(supported_opts, SUBOPT_STREAMING))
opts->streaming = LOGICALREP_STREAM_OFF; opts->streaming = LOGICALREP_STREAM_PARALLEL;
if (IsSet(supported_opts, SUBOPT_TWOPHASE_COMMIT)) if (IsSet(supported_opts, SUBOPT_TWOPHASE_COMMIT))
opts->twophase = false; opts->twophase = false;
if (IsSet(supported_opts, SUBOPT_DISABLE_ON_ERR)) if (IsSet(supported_opts, SUBOPT_DISABLE_ON_ERR))

View File

@ -5235,6 +5235,8 @@ dumpSubscription(Archive *fout, const SubscriptionInfo *subinfo)
appendPQExpBufferStr(query, ", streaming = on"); appendPQExpBufferStr(query, ", streaming = on");
else if (strcmp(subinfo->substream, "p") == 0) else if (strcmp(subinfo->substream, "p") == 0)
appendPQExpBufferStr(query, ", streaming = parallel"); appendPQExpBufferStr(query, ", streaming = parallel");
else
appendPQExpBufferStr(query, ", streaming = off");
if (strcmp(subinfo->subtwophasestate, two_phase_disabled) != 0) if (strcmp(subinfo->subtwophasestate, two_phase_disabled) != 0)
appendPQExpBufferStr(query, ", two_phase = on"); appendPQExpBufferStr(query, ", two_phase = on");

View File

@ -2992,7 +2992,7 @@ my %tests = (
CONNECTION \'dbname=doesnotexist\' PUBLICATION pub1 CONNECTION \'dbname=doesnotexist\' PUBLICATION pub1
WITH (connect = false);', WITH (connect = false);',
regexp => qr/^ regexp => qr/^
\QCREATE SUBSCRIPTION sub1 CONNECTION 'dbname=doesnotexist' PUBLICATION pub1 WITH (connect = false, slot_name = 'sub1');\E \QCREATE SUBSCRIPTION sub1 CONNECTION 'dbname=doesnotexist' PUBLICATION pub1 WITH (connect = false, slot_name = 'sub1', streaming = parallel);\E
/xm, /xm,
like => { %full_runs, section_post_data => 1, }, like => { %full_runs, section_post_data => 1, },
}, },
@ -3001,9 +3001,9 @@ my %tests = (
create_order => 50, create_order => 50,
create_sql => 'CREATE SUBSCRIPTION sub2 create_sql => 'CREATE SUBSCRIPTION sub2
CONNECTION \'dbname=doesnotexist\' PUBLICATION pub1 CONNECTION \'dbname=doesnotexist\' PUBLICATION pub1
WITH (connect = false, origin = none);', WITH (connect = false, origin = none, streaming = off);',
regexp => qr/^ regexp => qr/^
\QCREATE SUBSCRIPTION sub2 CONNECTION 'dbname=doesnotexist' PUBLICATION pub1 WITH (connect = false, slot_name = 'sub2', origin = none);\E \QCREATE SUBSCRIPTION sub2 CONNECTION 'dbname=doesnotexist' PUBLICATION pub1 WITH (connect = false, slot_name = 'sub2', streaming = off, origin = none);\E
/xm, /xm,
like => { %full_runs, section_post_data => 1, }, like => { %full_runs, section_post_data => 1, },
}, },
@ -3012,9 +3012,9 @@ my %tests = (
create_order => 50, create_order => 50,
create_sql => 'CREATE SUBSCRIPTION sub3 create_sql => 'CREATE SUBSCRIPTION sub3
CONNECTION \'dbname=doesnotexist\' PUBLICATION pub1 CONNECTION \'dbname=doesnotexist\' PUBLICATION pub1
WITH (connect = false, origin = any);', WITH (connect = false, origin = any, streaming = on);',
regexp => qr/^ regexp => qr/^
\QCREATE SUBSCRIPTION sub3 CONNECTION 'dbname=doesnotexist' PUBLICATION pub1 WITH (connect = false, slot_name = 'sub3');\E \QCREATE SUBSCRIPTION sub3 CONNECTION 'dbname=doesnotexist' PUBLICATION pub1 WITH (connect = false, slot_name = 'sub3', streaming = on);\E
/xm, /xm,
like => { %full_runs, section_post_data => 1, }, like => { %full_runs, section_post_data => 1, },
}, },

View File

@ -119,7 +119,7 @@ HINT: To initiate replication, you must manually create the replication slot, e
List of subscriptions List of subscriptions
Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+---------- ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+----------
regress_testsub4 | regress_subscription_user | f | {testpub} | f | off | d | f | none | t | f | f | off | dbname=regress_doesnotexist | 0/0 regress_testsub4 | regress_subscription_user | f | {testpub} | f | parallel | d | f | none | t | f | f | off | dbname=regress_doesnotexist | 0/0
(1 row) (1 row)
ALTER SUBSCRIPTION regress_testsub4 SET (origin = any); ALTER SUBSCRIPTION regress_testsub4 SET (origin = any);
@ -127,7 +127,7 @@ ALTER SUBSCRIPTION regress_testsub4 SET (origin = any);
List of subscriptions List of subscriptions
Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+---------- ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+----------
regress_testsub4 | regress_subscription_user | f | {testpub} | f | off | d | f | any | t | f | f | off | dbname=regress_doesnotexist | 0/0 regress_testsub4 | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | off | dbname=regress_doesnotexist | 0/0
(1 row) (1 row)
DROP SUBSCRIPTION regress_testsub3; DROP SUBSCRIPTION regress_testsub3;
@ -148,7 +148,7 @@ ERROR: invalid connection string syntax: missing "=" after "foobar" in connecti
List of subscriptions List of subscriptions
Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN
-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+---------- -----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+----------
regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | f | any | t | f | f | off | dbname=regress_doesnotexist | 0/0 regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | off | dbname=regress_doesnotexist | 0/0
(1 row) (1 row)
ALTER SUBSCRIPTION regress_testsub SET PUBLICATION testpub2, testpub3 WITH (refresh = false); ALTER SUBSCRIPTION regress_testsub SET PUBLICATION testpub2, testpub3 WITH (refresh = false);
@ -160,7 +160,7 @@ ALTER SUBSCRIPTION regress_testsub SET (run_as_owner = true);
List of subscriptions List of subscriptions
Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN
-----------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------------+---------- -----------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------------+----------
regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | off | d | f | any | f | t | f | off | dbname=regress_doesnotexist2 | 0/0 regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | parallel | d | f | any | f | t | f | off | dbname=regress_doesnotexist2 | 0/0
(1 row) (1 row)
ALTER SUBSCRIPTION regress_testsub SET (password_required = true); ALTER SUBSCRIPTION regress_testsub SET (password_required = true);
@ -179,7 +179,7 @@ ALTER SUBSCRIPTION regress_testsub SKIP (lsn = '0/12345');
List of subscriptions List of subscriptions
Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN
-----------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------------+---------- -----------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------------+----------
regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | off | d | f | any | t | f | f | off | dbname=regress_doesnotexist2 | 0/12345 regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | parallel | d | f | any | t | f | f | off | dbname=regress_doesnotexist2 | 0/12345
(1 row) (1 row)
-- ok - with lsn = NONE -- ok - with lsn = NONE
@ -191,7 +191,7 @@ ERROR: invalid WAL location (LSN): 0/0
List of subscriptions List of subscriptions
Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN
-----------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------------+---------- -----------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------------+----------
regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | off | d | f | any | t | f | f | off | dbname=regress_doesnotexist2 | 0/0 regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | parallel | d | f | any | t | f | f | off | dbname=regress_doesnotexist2 | 0/0
(1 row) (1 row)
BEGIN; BEGIN;
@ -226,7 +226,7 @@ HINT: Available values: local, remote_write, remote_apply, on, off.
List of subscriptions List of subscriptions
Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN
---------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------------+---------- ---------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------------+----------
regress_testsub_foo | regress_subscription_user | f | {testpub2,testpub3} | f | off | d | f | any | t | f | f | local | dbname=regress_doesnotexist2 | 0/0 regress_testsub_foo | regress_subscription_user | f | {testpub2,testpub3} | f | parallel | d | f | any | t | f | f | local | dbname=regress_doesnotexist2 | 0/0
(1 row) (1 row)
-- rename back to keep the rest simple -- rename back to keep the rest simple
@ -258,7 +258,7 @@ HINT: To initiate replication, you must manually create the replication slot, e
List of subscriptions List of subscriptions
Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN
-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+---------- -----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+----------
regress_testsub | regress_subscription_user | f | {testpub} | t | off | d | f | any | t | f | f | off | dbname=regress_doesnotexist | 0/0 regress_testsub | regress_subscription_user | f | {testpub} | t | parallel | d | f | any | t | f | f | off | dbname=regress_doesnotexist | 0/0
(1 row) (1 row)
ALTER SUBSCRIPTION regress_testsub SET (binary = false); ALTER SUBSCRIPTION regress_testsub SET (binary = false);
@ -267,7 +267,7 @@ ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
List of subscriptions List of subscriptions
Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN
-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+---------- -----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+----------
regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | f | any | t | f | f | off | dbname=regress_doesnotexist | 0/0 regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | off | dbname=regress_doesnotexist | 0/0
(1 row) (1 row)
DROP SUBSCRIPTION regress_testsub; DROP SUBSCRIPTION regress_testsub;
@ -374,7 +374,7 @@ HINT: To initiate replication, you must manually create the replication slot, e
List of subscriptions List of subscriptions
Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN
-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+---------- -----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+----------
regress_testsub | regress_subscription_user | f | {testpub} | f | off | p | f | any | t | f | f | off | dbname=regress_doesnotexist | 0/0 regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | p | f | any | t | f | f | off | dbname=regress_doesnotexist | 0/0
(1 row) (1 row)
-- we can alter streaming when two_phase enabled -- we can alter streaming when two_phase enabled
@ -412,7 +412,7 @@ HINT: To initiate replication, you must manually create the replication slot, e
List of subscriptions List of subscriptions
Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN
-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+---------- -----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+----------
regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | f | any | t | f | f | off | dbname=regress_doesnotexist | 0/0 regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | off | dbname=regress_doesnotexist | 0/0
(1 row) (1 row)
ALTER SUBSCRIPTION regress_testsub SET (disable_on_error = true); ALTER SUBSCRIPTION regress_testsub SET (disable_on_error = true);
@ -420,7 +420,7 @@ ALTER SUBSCRIPTION regress_testsub SET (disable_on_error = true);
List of subscriptions List of subscriptions
Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN
-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+---------- -----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+----------
regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | t | any | t | f | f | off | dbname=regress_doesnotexist | 0/0 regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | t | any | t | f | f | off | dbname=regress_doesnotexist | 0/0
(1 row) (1 row)
ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);

View File

@ -88,7 +88,7 @@ $node_B->safe_psql(
CREATE SUBSCRIPTION tap_sub_B CREATE SUBSCRIPTION tap_sub_B
CONNECTION '$node_A_connstr application_name=$appname_B' CONNECTION '$node_A_connstr application_name=$appname_B'
PUBLICATION tap_pub_A PUBLICATION tap_pub_A
WITH (two_phase = on)"); WITH (two_phase = on, streaming = off)");
# node_B (pub) -> node_C (sub) # node_B (pub) -> node_C (sub)
my $node_B_connstr = $node_B->connstr . ' dbname=postgres'; my $node_B_connstr = $node_B->connstr . ' dbname=postgres';
@ -100,7 +100,7 @@ $node_C->safe_psql(
CREATE SUBSCRIPTION tap_sub_C CREATE SUBSCRIPTION tap_sub_C
CONNECTION '$node_B_connstr application_name=$appname_C' CONNECTION '$node_B_connstr application_name=$appname_C'
PUBLICATION tap_pub_B PUBLICATION tap_pub_B
WITH (two_phase = on)"); WITH (two_phase = on, streaming = off)");
# Wait for subscribers to finish initialization # Wait for subscribers to finish initialization
$node_A->wait_for_catchup($appname_B); $node_A->wait_for_catchup($appname_B);