mirror of
https://git.postgresql.org/git/postgresql.git
synced 2025-01-24 18:55:04 +08:00
19890a064e
Commit 0aa8a01d04
extends the output plugin API to allow decoding of
prepared xacts and allowed the user to enable/disable the two-phase option
via pg_logical_slot_get_changes(). This can lead to a problem such that
the first time when it gets changes via pg_logical_slot_get_changes()
without two_phase option enabled it will not get the prepared even though
prepare is after consistent snapshot. Now next time during getting changes,
if the two_phase option is enabled it can skip prepare because by that
time start decoding point has been moved. So the user will only get commit
prepared.
Allow to enable/disable this option at the create slot time and default
will be false. It will break the existing slots which is fine in a major
release.
Author: Ajin Cherian
Reviewed-by: Amit Kapila and Vignesh C
Discussion: https://postgr.es/m/d0f60d60-133d-bf8d-bd70-47784d8fabf3@enterprisedb.com
126 lines
4.9 KiB
Plaintext
126 lines
4.9 KiB
Plaintext
-- Test streaming of two-phase commits
|
|
SET synchronous_commit = on;
|
|
SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding', false, true);
|
|
?column?
|
|
----------
|
|
init
|
|
(1 row)
|
|
|
|
CREATE TABLE stream_test(data text);
|
|
-- consume DDL
|
|
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
|
|
data
|
|
------
|
|
(0 rows)
|
|
|
|
-- streaming test with sub-transaction and PREPARE/COMMIT PREPARED
|
|
BEGIN;
|
|
SAVEPOINT s1;
|
|
SELECT 'msg5' FROM pg_logical_emit_message(true, 'test', repeat('a', 50));
|
|
?column?
|
|
----------
|
|
msg5
|
|
(1 row)
|
|
|
|
INSERT INTO stream_test SELECT repeat('a', 2000) || g.i FROM generate_series(1, 35) g(i);
|
|
TRUNCATE table stream_test;
|
|
ROLLBACK TO s1;
|
|
INSERT INTO stream_test SELECT repeat('a', 10) || g.i FROM generate_series(1, 20) g(i);
|
|
PREPARE TRANSACTION 'test1';
|
|
-- should show the inserts after a ROLLBACK
|
|
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'stream-changes', '1');
|
|
data
|
|
----------------------------------------------------------
|
|
streaming message: transactional: 1 prefix: test, sz: 50
|
|
opening a streamed block for transaction
|
|
streaming change for transaction
|
|
streaming change for transaction
|
|
streaming change for transaction
|
|
streaming change for transaction
|
|
streaming change for transaction
|
|
streaming change for transaction
|
|
streaming change for transaction
|
|
streaming change for transaction
|
|
streaming change for transaction
|
|
streaming change for transaction
|
|
streaming change for transaction
|
|
streaming change for transaction
|
|
streaming change for transaction
|
|
streaming change for transaction
|
|
streaming change for transaction
|
|
streaming change for transaction
|
|
streaming change for transaction
|
|
streaming change for transaction
|
|
streaming change for transaction
|
|
streaming change for transaction
|
|
closing a streamed block for transaction
|
|
preparing streamed transaction 'test1'
|
|
(24 rows)
|
|
|
|
COMMIT PREPARED 'test1';
|
|
--should show the COMMIT PREPARED and the other changes in the transaction
|
|
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'stream-changes', '1');
|
|
data
|
|
-------------------------
|
|
COMMIT PREPARED 'test1'
|
|
(1 row)
|
|
|
|
-- streaming test with sub-transaction and PREPARE/COMMIT PREPARED but with
|
|
-- filtered gid. gids with '_nodecode' will not be decoded at prepare time.
|
|
BEGIN;
|
|
SAVEPOINT s1;
|
|
SELECT 'msg5' FROM pg_logical_emit_message(true, 'test', repeat('a', 50));
|
|
?column?
|
|
----------
|
|
msg5
|
|
(1 row)
|
|
|
|
INSERT INTO stream_test SELECT repeat('a', 2000) || g.i FROM generate_series(1, 35) g(i);
|
|
TRUNCATE table stream_test;
|
|
ROLLBACK to s1;
|
|
INSERT INTO stream_test SELECT repeat('a', 10) || g.i FROM generate_series(1, 20) g(i);
|
|
PREPARE TRANSACTION 'test1_nodecode';
|
|
-- should NOT show inserts after a ROLLBACK
|
|
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'stream-changes', '1');
|
|
data
|
|
----------------------------------------------------------
|
|
streaming message: transactional: 1 prefix: test, sz: 50
|
|
(1 row)
|
|
|
|
COMMIT PREPARED 'test1_nodecode';
|
|
-- should show the inserts but not show a COMMIT PREPARED but a COMMIT
|
|
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'stream-changes', '1');
|
|
data
|
|
-------------------------------------------------------------
|
|
BEGIN
|
|
table public.stream_test: INSERT: data[text]:'aaaaaaaaaa1'
|
|
table public.stream_test: INSERT: data[text]:'aaaaaaaaaa2'
|
|
table public.stream_test: INSERT: data[text]:'aaaaaaaaaa3'
|
|
table public.stream_test: INSERT: data[text]:'aaaaaaaaaa4'
|
|
table public.stream_test: INSERT: data[text]:'aaaaaaaaaa5'
|
|
table public.stream_test: INSERT: data[text]:'aaaaaaaaaa6'
|
|
table public.stream_test: INSERT: data[text]:'aaaaaaaaaa7'
|
|
table public.stream_test: INSERT: data[text]:'aaaaaaaaaa8'
|
|
table public.stream_test: INSERT: data[text]:'aaaaaaaaaa9'
|
|
table public.stream_test: INSERT: data[text]:'aaaaaaaaaa10'
|
|
table public.stream_test: INSERT: data[text]:'aaaaaaaaaa11'
|
|
table public.stream_test: INSERT: data[text]:'aaaaaaaaaa12'
|
|
table public.stream_test: INSERT: data[text]:'aaaaaaaaaa13'
|
|
table public.stream_test: INSERT: data[text]:'aaaaaaaaaa14'
|
|
table public.stream_test: INSERT: data[text]:'aaaaaaaaaa15'
|
|
table public.stream_test: INSERT: data[text]:'aaaaaaaaaa16'
|
|
table public.stream_test: INSERT: data[text]:'aaaaaaaaaa17'
|
|
table public.stream_test: INSERT: data[text]:'aaaaaaaaaa18'
|
|
table public.stream_test: INSERT: data[text]:'aaaaaaaaaa19'
|
|
table public.stream_test: INSERT: data[text]:'aaaaaaaaaa20'
|
|
COMMIT
|
|
(22 rows)
|
|
|
|
DROP TABLE stream_test;
|
|
SELECT pg_drop_replication_slot('regression_slot');
|
|
pg_drop_replication_slot
|
|
--------------------------
|
|
|
|
(1 row)
|
|
|