diff --git a/contrib/test_decoding/Makefile b/contrib/test_decoding/Makefile index c7ce6037064..a4ba1a509ae 100644 --- a/contrib/test_decoding/Makefile +++ b/contrib/test_decoding/Makefile @@ -8,7 +8,8 @@ REGRESS = ddl xact rewrite toast permissions decoding_in_xact \ spill slot truncate stream stats twophase twophase_stream ISOLATION = mxact delayed_startup ondisk_startup concurrent_ddl_dml \ oldest_xmin snapshot_transfer subxact_without_top concurrent_stream \ - twophase_snapshot slot_creation_error catalog_change_snapshot + twophase_snapshot slot_creation_error catalog_change_snapshot \ + skip_snapshot_restore REGRESS_OPTS = --temp-config $(top_srcdir)/contrib/test_decoding/logical.conf ISOLATION_OPTS = --temp-config $(top_srcdir)/contrib/test_decoding/logical.conf diff --git a/contrib/test_decoding/expected/skip_snapshot_restore.out b/contrib/test_decoding/expected/skip_snapshot_restore.out new file mode 100644 index 00000000000..c64dbd9c4e0 --- /dev/null +++ b/contrib/test_decoding/expected/skip_snapshot_restore.out @@ -0,0 +1,45 @@ +Parsed test spec with 3 sessions + +starting permutation: s0_init s0_begin s0_insert1 s1_init s2_checkpoint s2_get_changes_slot0 s0_insert2 s0_commit s1_get_changes_slot0 s1_get_changes_slot1 +step s0_init: SELECT 'init' FROM pg_create_logical_replication_slot('slot0', 'test_decoding'); +?column? +-------- +init +(1 row) + +step s0_begin: BEGIN; +step s0_insert1: INSERT INTO tbl VALUES (1); +step s1_init: SELECT 'init' FROM pg_create_logical_replication_slot('slot1', 'test_decoding'); +step s2_checkpoint: CHECKPOINT; +step s2_get_changes_slot0: SELECT data FROM pg_logical_slot_get_changes('slot0', NULL, NULL, 'skip-empty-xacts', '1', 'include-xids', '0'); +data +---- +(0 rows) + +step s0_insert2: INSERT INTO tbl VALUES (2); +step s0_commit: COMMIT; +step s1_init: <... completed> +?column? +-------- +init +(1 row) + +step s1_get_changes_slot0: SELECT data FROM pg_logical_slot_get_changes('slot0', NULL, NULL, 'skip-empty-xacts', '1', 'include-xids', '0'); +data +----------------------------------------- +BEGIN +table public.tbl: INSERT: val1[integer]:1 +table public.tbl: INSERT: val1[integer]:2 +COMMIT +(4 rows) + +step s1_get_changes_slot1: SELECT data FROM pg_logical_slot_get_changes('slot1', NULL, NULL, 'skip-empty-xacts', '1', 'include-xids', '0'); +data +---- +(0 rows) + +?column? +-------- +stop +(1 row) + diff --git a/contrib/test_decoding/meson.build b/contrib/test_decoding/meson.build index f1548c0fafc..f643dc81a2c 100644 --- a/contrib/test_decoding/meson.build +++ b/contrib/test_decoding/meson.build @@ -62,6 +62,7 @@ tests += { 'concurrent_stream', 'twophase_snapshot', 'slot_creation_error', + 'skip_snapshot_restore', ], 'regress_args': [ '--temp-config', files('logical.conf'), diff --git a/contrib/test_decoding/specs/skip_snapshot_restore.spec b/contrib/test_decoding/specs/skip_snapshot_restore.spec new file mode 100644 index 00000000000..3f1fb6f02c7 --- /dev/null +++ b/contrib/test_decoding/specs/skip_snapshot_restore.spec @@ -0,0 +1,46 @@ +# Test that a slot creation skips to restore serialized snapshot to reach +# the consistent state. + +setup +{ + DROP TABLE IF EXISTS tbl; + CREATE TABLE tbl (val1 integer); +} + +teardown +{ + DROP TABLE tbl; + SELECT 'stop' FROM pg_drop_replication_slot('slot0'); + SELECT 'stop' FROM pg_drop_replication_slot('slot1'); +} + +session "s0" +setup { SET synchronous_commit = on; } +step "s0_init" { SELECT 'init' FROM pg_create_logical_replication_slot('slot0', 'test_decoding'); } +step "s0_begin" { BEGIN; } +step "s0_insert1" { INSERT INTO tbl VALUES (1); } +step "s0_insert2" { INSERT INTO tbl VALUES (2); } +step "s0_commit" { COMMIT; } + +session "s1" +setup { SET synchronous_commit = on; } +step "s1_init" { SELECT 'init' FROM pg_create_logical_replication_slot('slot1', 'test_decoding'); } +step "s1_get_changes_slot0" { SELECT data FROM pg_logical_slot_get_changes('slot0', NULL, NULL, 'skip-empty-xacts', '1', 'include-xids', '0'); } +step "s1_get_changes_slot1" { SELECT data FROM pg_logical_slot_get_changes('slot1', NULL, NULL, 'skip-empty-xacts', '1', 'include-xids', '0'); } + +session "s2" +setup { SET synchronous_commit = on ;} +step "s2_checkpoint" { CHECKPOINT; } +step "s2_get_changes_slot0" { SELECT data FROM pg_logical_slot_get_changes('slot0', NULL, NULL, 'skip-empty-xacts', '1', 'include-xids', '0'); } + + +# While 'slot1' creation by "s1_init" waits for s0-transaction to commit, the +# RUNNING_XACTS record is written by "s2_checkpoint" and "s2_get_changes_slot1" +# serializes consistent snapshots to the disk at LSNs where are before +# s0-transaction's commit. After s0-transaction commits, "s1_init" resumes but +# must not restore any serialized snapshots and will reach the consistent state +# when decoding a RUNNING_XACT record generated after s0-transaction's commit. +# We check if the get_changes on 'slot1' will not return any s0-transaction's +# changes as its confirmed_flush_lsn will be after the s0-transaction's commit +# record. +permutation "s0_init" "s0_begin" "s0_insert1" "s1_init" "s2_checkpoint" "s2_get_changes_slot0" "s0_insert2" "s0_commit" "s1_get_changes_slot0" "s1_get_changes_slot1" diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c index 99f31849bb1..f8ef5d56d26 100644 --- a/src/backend/replication/logical/logical.c +++ b/src/backend/replication/logical/logical.c @@ -152,6 +152,7 @@ StartupDecodingContext(List *output_plugin_options, TransactionId xmin_horizon, bool need_full_snapshot, bool fast_forward, + bool in_create, XLogReaderRoutine *xl_routine, LogicalOutputPluginWriterPrepareWrite prepare_write, LogicalOutputPluginWriterWrite do_write, @@ -212,7 +213,7 @@ StartupDecodingContext(List *output_plugin_options, ctx->reorder = ReorderBufferAllocate(); ctx->snapshot_builder = AllocateSnapshotBuilder(ctx->reorder, xmin_horizon, start_lsn, - need_full_snapshot, slot->data.two_phase_at); + need_full_snapshot, in_create, slot->data.two_phase_at); ctx->reorder->private_data = ctx; @@ -438,7 +439,7 @@ CreateInitDecodingContext(const char *plugin, ReplicationSlotSave(); ctx = StartupDecodingContext(NIL, restart_lsn, xmin_horizon, - need_full_snapshot, false, + need_full_snapshot, false, true, xl_routine, prepare_write, do_write, update_progress); @@ -592,7 +593,7 @@ CreateDecodingContext(XLogRecPtr start_lsn, ctx = StartupDecodingContext(output_plugin_options, start_lsn, InvalidTransactionId, false, - fast_forward, xl_routine, prepare_write, + fast_forward, false, xl_routine, prepare_write, do_write, update_progress); /* call output plugin initialization callback */ diff --git a/src/backend/replication/logical/snapbuild.c b/src/backend/replication/logical/snapbuild.c index e37e22f4417..ae676145e60 100644 --- a/src/backend/replication/logical/snapbuild.c +++ b/src/backend/replication/logical/snapbuild.c @@ -189,6 +189,14 @@ struct SnapBuild /* Indicates if we are building full snapshot or just catalog one. */ bool building_full_snapshot; + /* + * Indicates if we are using the snapshot builder for the creation of a + * logical replication slot. If it's true, the start point for decoding + * changes is not determined yet. So we skip snapshot restores to properly + * find the start point. See SnapBuildFindSnapshot() for details. + */ + bool in_slot_creation; + /* * Snapshot that's valid to see the catalog state seen at this moment. */ @@ -317,6 +325,7 @@ AllocateSnapshotBuilder(ReorderBuffer *reorder, TransactionId xmin_horizon, XLogRecPtr start_lsn, bool need_full_snapshot, + bool in_slot_creation, XLogRecPtr two_phase_at) { MemoryContext context; @@ -347,6 +356,7 @@ AllocateSnapshotBuilder(ReorderBuffer *reorder, builder->initial_xmin_horizon = xmin_horizon; builder->start_decoding_at = start_lsn; + builder->in_slot_creation = in_slot_creation; builder->building_full_snapshot = need_full_snapshot; builder->two_phase_at = two_phase_at; @@ -1327,10 +1337,12 @@ SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *runn * state while waiting on c)'s sub-states. * * b) This (in a previous run) or another decoding slot serialized a - * snapshot to disk that we can use. Can't use this method for the - * initial snapshot when slot is being created and needs full snapshot - * for export or direct use, as that snapshot will only contain catalog - * modifying transactions. + * snapshot to disk that we can use. Can't use this method while finding + * the start point for decoding changes as the restart LSN would be an + * arbitrary LSN but we need to find the start point to extract changes + * where we won't see the data for partial transactions. Also, we cannot + * use this method when a slot needs a full snapshot for export or direct + * use, as that snapshot will only contain catalog modifying transactions. * * c) First incrementally build a snapshot for catalog tuples * (BUILDING_SNAPSHOT), that requires all, already in-progress, @@ -1395,8 +1407,13 @@ SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *runn return false; } - /* b) valid on disk state and not building full snapshot */ + + /* + * b) valid on disk state and while neither building full snapshot nor + * creating a slot. + */ else if (!builder->building_full_snapshot && + !builder->in_slot_creation && SnapBuildRestore(builder, lsn)) { /* there won't be any state to cleanup */ @@ -1580,7 +1597,7 @@ typedef struct SnapBuildOnDisk offsetof(SnapBuildOnDisk, version) #define SNAPBUILD_MAGIC 0x51A1E001 -#define SNAPBUILD_VERSION 5 +#define SNAPBUILD_VERSION 6 /* * Store/Load a snapshot from disk, depending on the snapshot builder's state. diff --git a/src/include/replication/snapbuild.h b/src/include/replication/snapbuild.h index a3360a1c5ea..caa5113ff81 100644 --- a/src/include/replication/snapbuild.h +++ b/src/include/replication/snapbuild.h @@ -62,6 +62,7 @@ extern void CheckPointSnapBuild(void); extern SnapBuild *AllocateSnapshotBuilder(struct ReorderBuffer *reorder, TransactionId xmin_horizon, XLogRecPtr start_lsn, bool need_full_snapshot, + bool in_slot_creation, XLogRecPtr two_phase_at); extern void FreeSnapshotBuilder(SnapBuild *builder);