Add tests for logical replication spilled stats.

Commit 9868167500 added a mechanism to track statistics corresponding to
the spilling of changes from ReorderBuffer but didn't add any tests.

Author: Amit Kapila and Sawada Masahiko
Discussion: https://postgr.es/m/CA+fd4k5_pPAYRTDrO2PbtTOe0eHQpBvuqmCr8ic39uTNmR49Eg@mail.gmail.com
This commit is contained in:
Amit Kapila 2020-10-13 08:30:35 +05:30
parent 371668a838
commit 8fccf75834
3 changed files with 172 additions and 1 deletions

View File

@ -5,7 +5,7 @@ PGFILEDESC = "test_decoding - example of a logical decoding output plugin"
REGRESS = ddl xact rewrite toast permissions decoding_in_xact \
decoding_into_rel binary prepared replorigin time messages \
spill slot truncate stream
spill slot truncate stream stats
ISOLATION = mxact delayed_startup ondisk_startup concurrent_ddl_dml \
oldest_xmin snapshot_transfer subxact_without_top concurrent_stream

View File

@ -0,0 +1,109 @@
-- predictability
SET synchronous_commit = on;
SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding');
?column?
----------
init
(1 row)
CREATE TABLE stats_test(data text);
-- function to wait for counters to advance
CREATE FUNCTION wait_for_decode_stats(check_reset bool) RETURNS void AS $$
DECLARE
start_time timestamptz := clock_timestamp();
updated bool;
BEGIN
-- we don't want to wait forever; loop will exit after 30 seconds
FOR i IN 1 .. 300 LOOP
-- check to see if all updates have been reset/updated
SELECT CASE WHEN check_reset THEN (spill_txns = 0)
ELSE (spill_txns > 0)
END
INTO updated
FROM pg_stat_replication_slots WHERE name='regression_slot';
exit WHEN updated;
-- wait a little
perform pg_sleep_for('100 milliseconds');
-- reset stats snapshot so we can test again
perform pg_stat_clear_snapshot();
END LOOP;
-- report time waited in postmaster log (where it won't change test output)
RAISE LOG 'wait_for_decode_stats delayed % seconds',
extract(epoch from clock_timestamp() - start_time);
END
$$ LANGUAGE plpgsql;
-- spilling the xact
BEGIN;
INSERT INTO stats_test SELECT 'serialize-topbig--1:'||g.i FROM generate_series(1, 5000) g(i);
COMMIT;
SELECT count(*) FROM pg_logical_slot_peek_changes('regression_slot', NULL,NULL);
count
-------
5006
(1 row)
-- check stats, wait for stats collector to update.
SELECT wait_for_decode_stats(false);
wait_for_decode_stats
-----------------------
(1 row)
SELECT name, spill_txns, spill_count FROM pg_stat_replication_slots;
name | spill_txns | spill_count
-----------------+------------+-------------
regression_slot | 1 | 12
(1 row)
-- reset the slot stats, and wait for stats collector to reset
SELECT pg_stat_reset_replication_slot('regression_slot');
pg_stat_reset_replication_slot
--------------------------------
(1 row)
SELECT wait_for_decode_stats(true);
wait_for_decode_stats
-----------------------
(1 row)
SELECT name, spill_txns, spill_count FROM pg_stat_replication_slots;
name | spill_txns | spill_count
-----------------+------------+-------------
regression_slot | 0 | 0
(1 row)
-- decode and check stats again.
SELECT count(*) FROM pg_logical_slot_peek_changes('regression_slot', NULL,NULL);
count
-------
5006
(1 row)
SELECT wait_for_decode_stats(false);
wait_for_decode_stats
-----------------------
(1 row)
SELECT name, spill_txns, spill_count FROM pg_stat_replication_slots;
name | spill_txns | spill_count
-----------------+------------+-------------
regression_slot | 1 | 12
(1 row)
DROP FUNCTION wait_for_decode_stats(bool);
DROP TABLE stats_test;
SELECT pg_drop_replication_slot('regression_slot');
pg_drop_replication_slot
--------------------------
(1 row)

View File

@ -0,0 +1,62 @@
-- predictability
SET synchronous_commit = on;
SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding');
CREATE TABLE stats_test(data text);
-- function to wait for counters to advance
CREATE FUNCTION wait_for_decode_stats(check_reset bool) RETURNS void AS $$
DECLARE
start_time timestamptz := clock_timestamp();
updated bool;
BEGIN
-- we don't want to wait forever; loop will exit after 30 seconds
FOR i IN 1 .. 300 LOOP
-- check to see if all updates have been reset/updated
SELECT CASE WHEN check_reset THEN (spill_txns = 0)
ELSE (spill_txns > 0)
END
INTO updated
FROM pg_stat_replication_slots WHERE name='regression_slot';
exit WHEN updated;
-- wait a little
perform pg_sleep_for('100 milliseconds');
-- reset stats snapshot so we can test again
perform pg_stat_clear_snapshot();
END LOOP;
-- report time waited in postmaster log (where it won't change test output)
RAISE LOG 'wait_for_decode_stats delayed % seconds',
extract(epoch from clock_timestamp() - start_time);
END
$$ LANGUAGE plpgsql;
-- spilling the xact
BEGIN;
INSERT INTO stats_test SELECT 'serialize-topbig--1:'||g.i FROM generate_series(1, 5000) g(i);
COMMIT;
SELECT count(*) FROM pg_logical_slot_peek_changes('regression_slot', NULL,NULL);
-- check stats, wait for stats collector to update.
SELECT wait_for_decode_stats(false);
SELECT name, spill_txns, spill_count FROM pg_stat_replication_slots;
-- reset the slot stats, and wait for stats collector to reset
SELECT pg_stat_reset_replication_slot('regression_slot');
SELECT wait_for_decode_stats(true);
SELECT name, spill_txns, spill_count FROM pg_stat_replication_slots;
-- decode and check stats again.
SELECT count(*) FROM pg_logical_slot_peek_changes('regression_slot', NULL,NULL);
SELECT wait_for_decode_stats(false);
SELECT name, spill_txns, spill_count FROM pg_stat_replication_slots;
DROP FUNCTION wait_for_decode_stats(bool);
DROP TABLE stats_test;
SELECT pg_drop_replication_slot('regression_slot');