mirror of
https://git.postgresql.org/git/postgresql.git
synced 2025-01-06 15:24:56 +08:00
0ba5181c00
We were decoding empty transactions via streaming APIs added in commit
45fdc9738b
even when the user used the option 'skip-empty-xacts'. The APIs
makes no effort to skip empty xacts under the assumption that we will
never try to stream such transactions. However, that is not true because
we can pick to stream a transaction that has change messages for
REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT and we don't send such messages to
downstream rather they are just to update the internal state. So, we need
to skip such xacts when plugin uses the option 'skip-empty-xacts'.
Diagnosed-By: Amit Kapila
Author: Dilip Kumar
Reviewed-by: Amit Kapila
Discussion: https://postgr.es/m/CAA4eK1+OqgFNZkf7=ETe_y5ntjgDk3T0wcdkd4Sot_u1hySGfw@mail.gmail.com
92 lines
3.0 KiB
Plaintext
92 lines
3.0 KiB
Plaintext
SET synchronous_commit = on;
|
|
SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding');
|
|
?column?
|
|
----------
|
|
init
|
|
(1 row)
|
|
|
|
CREATE TABLE stream_test(data text);
|
|
-- consume DDL
|
|
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
|
|
data
|
|
------
|
|
(0 rows)
|
|
|
|
-- streaming test with sub-transaction
|
|
BEGIN;
|
|
savepoint s1;
|
|
SELECT 'msg5' FROM pg_logical_emit_message(true, 'test', repeat('a', 50));
|
|
?column?
|
|
----------
|
|
msg5
|
|
(1 row)
|
|
|
|
INSERT INTO stream_test SELECT repeat('a', 2000) || g.i FROM generate_series(1, 35) g(i);
|
|
TRUNCATE table stream_test;
|
|
rollback to s1;
|
|
INSERT INTO stream_test SELECT repeat('a', 10) || g.i FROM generate_series(1, 20) g(i);
|
|
COMMIT;
|
|
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'stream-changes', '1');
|
|
data
|
|
----------------------------------------------------------
|
|
streaming message: transactional: 1 prefix: test, sz: 50
|
|
opening a streamed block for transaction
|
|
streaming change for transaction
|
|
streaming change for transaction
|
|
streaming change for transaction
|
|
streaming change for transaction
|
|
streaming change for transaction
|
|
streaming change for transaction
|
|
streaming change for transaction
|
|
streaming change for transaction
|
|
streaming change for transaction
|
|
streaming change for transaction
|
|
streaming change for transaction
|
|
streaming change for transaction
|
|
streaming change for transaction
|
|
streaming change for transaction
|
|
streaming change for transaction
|
|
streaming change for transaction
|
|
streaming change for transaction
|
|
streaming change for transaction
|
|
streaming change for transaction
|
|
streaming change for transaction
|
|
closing a streamed block for transaction
|
|
committing streamed transaction
|
|
(24 rows)
|
|
|
|
-- streaming test for toast changes
|
|
ALTER TABLE stream_test ALTER COLUMN data set storage external;
|
|
-- consume DDL
|
|
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
|
|
data
|
|
------
|
|
(0 rows)
|
|
|
|
INSERT INTO stream_test SELECT repeat('a', 6000) || g.i FROM generate_series(1, 10) g(i);
|
|
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'stream-changes', '1');
|
|
data
|
|
------------------------------------------
|
|
opening a streamed block for transaction
|
|
streaming change for transaction
|
|
streaming change for transaction
|
|
streaming change for transaction
|
|
streaming change for transaction
|
|
streaming change for transaction
|
|
streaming change for transaction
|
|
streaming change for transaction
|
|
streaming change for transaction
|
|
streaming change for transaction
|
|
streaming change for transaction
|
|
closing a streamed block for transaction
|
|
committing streamed transaction
|
|
(13 rows)
|
|
|
|
DROP TABLE stream_test;
|
|
SELECT pg_drop_replication_slot('regression_slot');
|
|
pg_drop_replication_slot
|
|
--------------------------
|
|
|
|
(1 row)
|
|
|