diff --git a/contrib/test_decoding/Makefile b/contrib/test_decoding/Makefile index 1d601d8144..afcab930f7 100644 --- a/contrib/test_decoding/Makefile +++ b/contrib/test_decoding/Makefile @@ -50,7 +50,8 @@ regresscheck-install-force: | submake-regress submake-test_decoding temp-install $(pg_regress_installcheck) \ $(REGRESSCHECKS) -ISOLATIONCHECKS=mxact delayed_startup ondisk_startup concurrent_ddl_dml +ISOLATIONCHECKS=mxact delayed_startup ondisk_startup concurrent_ddl_dml \ + oldest_xmin snapshot_transfer isolationcheck: | submake-isolation submake-test_decoding temp-install $(pg_isolation_regress_check) \ diff --git a/contrib/test_decoding/expected/oldest_xmin.out b/contrib/test_decoding/expected/oldest_xmin.out new file mode 100644 index 0000000000..d09342c4be --- /dev/null +++ b/contrib/test_decoding/expected/oldest_xmin.out @@ -0,0 +1,27 @@ +Parsed test spec with 2 sessions + +starting permutation: s0_begin s0_getxid s1_begin s1_insert s0_alter s0_commit s0_checkpoint s0_get_changes s1_commit s0_vacuum s0_get_changes +step s0_begin: BEGIN; +step s0_getxid: SELECT txid_current() IS NULL; +?column? + +f +step s1_begin: BEGIN; +step s1_insert: INSERT INTO harvest VALUES ((1, 2, 3)); +step s0_alter: ALTER TYPE basket DROP ATTRIBUTE mangos; +step s0_commit: COMMIT; +step s0_checkpoint: CHECKPOINT; +step s0_get_changes: SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); +data + +step s1_commit: COMMIT; +step s0_vacuum: VACUUM FULL; +step s0_get_changes: SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); +data + +BEGIN +table public.harvest: INSERT: fruits[basket]:'(1,2,3)' +COMMIT +?column? + +stop diff --git a/contrib/test_decoding/expected/snapshot_transfer.out b/contrib/test_decoding/expected/snapshot_transfer.out new file mode 100644 index 0000000000..87bed03f76 --- /dev/null +++ b/contrib/test_decoding/expected/snapshot_transfer.out @@ -0,0 +1,49 @@ +Parsed test spec with 2 sessions + +starting permutation: s0_begin s0_begin_sub0 s0_log_assignment s0_sub_get_base_snap s1_produce_new_snap s0_insert s0_end_sub0 s0_commit s0_get_changes +step s0_begin: BEGIN; +step s0_begin_sub0: SAVEPOINT s0; +step s0_log_assignment: SELECT txid_current() IS NULL; +?column? + +f +step s0_sub_get_base_snap: INSERT INTO dummy VALUES (0); +step s1_produce_new_snap: ALTER TABLE harvest ADD COLUMN mangos int; +step s0_insert: INSERT INTO harvest VALUES (1, 2, 3); +step s0_end_sub0: RELEASE SAVEPOINT s0; +step s0_commit: COMMIT; +step s0_get_changes: SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); +data + +BEGIN +table public.dummy: INSERT: i[integer]:0 +table public.harvest: INSERT: apples[integer]:1 pears[integer]:2 mangos[integer]:3 +COMMIT +?column? + +stop + +starting permutation: s0_begin s0_begin_sub0 s0_log_assignment s0_begin_sub1 s0_sub_get_base_snap s1_produce_new_snap s0_insert s0_end_sub1 s0_end_sub0 s0_commit s0_get_changes +step s0_begin: BEGIN; +step s0_begin_sub0: SAVEPOINT s0; +step s0_log_assignment: SELECT txid_current() IS NULL; +?column? + +f +step s0_begin_sub1: SAVEPOINT s1; +step s0_sub_get_base_snap: INSERT INTO dummy VALUES (0); +step s1_produce_new_snap: ALTER TABLE harvest ADD COLUMN mangos int; +step s0_insert: INSERT INTO harvest VALUES (1, 2, 3); +step s0_end_sub1: RELEASE SAVEPOINT s1; +step s0_end_sub0: RELEASE SAVEPOINT s0; +step s0_commit: COMMIT; +step s0_get_changes: SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); +data + +BEGIN +table public.dummy: INSERT: i[integer]:0 +table public.harvest: INSERT: apples[integer]:1 pears[integer]:2 mangos[integer]:3 +COMMIT +?column? + +stop diff --git a/contrib/test_decoding/specs/oldest_xmin.spec b/contrib/test_decoding/specs/oldest_xmin.spec new file mode 100644 index 0000000000..4f8af70aa2 --- /dev/null +++ b/contrib/test_decoding/specs/oldest_xmin.spec @@ -0,0 +1,37 @@ +# Test advancement of the slot's oldest xmin + +setup +{ + SELECT 'init' FROM pg_create_logical_replication_slot('isolation_slot', 'test_decoding'); -- must be first write in xact + DROP TYPE IF EXISTS basket; + CREATE TYPE basket AS (apples integer, pears integer, mangos integer); + DROP TABLE IF EXISTS harvest; + CREATE TABLE harvest(fruits basket); +} + +teardown +{ + DROP TABLE IF EXISTS harvest; + DROP TYPE IF EXISTS basket; + SELECT 'stop' FROM pg_drop_replication_slot('isolation_slot'); +} + +session "s0" +step "s0_begin" { BEGIN; } +step "s0_getxid" { SELECT txid_current() IS NULL; } +step "s0_alter" { ALTER TYPE basket DROP ATTRIBUTE mangos; } +step "s0_commit" { COMMIT; } +step "s0_checkpoint" { CHECKPOINT; } +step "s0_vacuum" { VACUUM FULL; } +step "s0_get_changes" { SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); } + +session "s1" +step "s1_begin" { BEGIN; } +step "s1_insert" { INSERT INTO harvest VALUES ((1, 2, 3)); } +step "s1_commit" { COMMIT; } + +# Checkpoint with following get_changes forces to advance xmin. ALTER of a +# composite type is a rare form of DDL which allows T1 to see the tuple which +# will be removed (xmax set) before T1 commits. That is, interlocking doesn't +# forbid modifying catalog after someone read it (and didn't commit yet). +permutation "s0_begin" "s0_getxid" "s1_begin" "s1_insert" "s0_alter" "s0_commit" "s0_checkpoint" "s0_get_changes" "s1_commit" "s0_vacuum" "s0_get_changes" diff --git a/contrib/test_decoding/specs/snapshot_transfer.spec b/contrib/test_decoding/specs/snapshot_transfer.spec new file mode 100644 index 0000000000..47db7fd90a --- /dev/null +++ b/contrib/test_decoding/specs/snapshot_transfer.spec @@ -0,0 +1,42 @@ +# Test snapshot transfer from subxact to top-level and receival of later snaps. + +setup +{ + SELECT 'init' FROM pg_create_logical_replication_slot('isolation_slot', 'test_decoding'); -- must be first write in xact + DROP TABLE IF EXISTS dummy; + CREATE TABLE dummy(i int); + DROP TABLE IF EXISTS harvest; + CREATE TABLE harvest(apples int, pears int); +} + +teardown +{ + DROP TABLE IF EXISTS harvest; + DROP TABLE IF EXISTS dummy; + SELECT 'stop' FROM pg_drop_replication_slot('isolation_slot'); +} + +session "s0" +step "s0_begin" { BEGIN; } +step "s0_begin_sub0" { SAVEPOINT s0; } +step "s0_log_assignment" { SELECT txid_current() IS NULL; } +step "s0_begin_sub1" { SAVEPOINT s1; } +step "s0_sub_get_base_snap" { INSERT INTO dummy VALUES (0); } +step "s0_insert" { INSERT INTO harvest VALUES (1, 2, 3); } +step "s0_end_sub0" { RELEASE SAVEPOINT s0; } +step "s0_end_sub1" { RELEASE SAVEPOINT s1; } +step "s0_insert2" { INSERT INTO harvest VALUES (1, 2, 3, 4); } +step "s0_commit" { COMMIT; } +step "s0_get_changes" { SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); } + +session "s1" +step "s1_produce_new_snap" { ALTER TABLE harvest ADD COLUMN mangos int; } + +# start top-level without base snap, get base snap in subxact, then create new +# snap and make sure it is queued. +permutation "s0_begin" "s0_begin_sub0" "s0_log_assignment" "s0_sub_get_base_snap" "s1_produce_new_snap" "s0_insert" "s0_end_sub0" "s0_commit" "s0_get_changes" + +# In previous test, we firstly associated subxact with xact and only then got +# base snap; now nest one more subxact to get snap first and only then (at +# commit) associate it with toplevel. +permutation "s0_begin" "s0_begin_sub0" "s0_log_assignment" "s0_begin_sub1" "s0_sub_get_base_snap" "s1_produce_new_snap" "s0_insert" "s0_end_sub1" "s0_end_sub0" "s0_commit" "s0_get_changes" diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c index c1447a513b..5f4aa07131 100644 --- a/src/backend/replication/logical/reorderbuffer.c +++ b/src/backend/replication/logical/reorderbuffer.c @@ -165,6 +165,8 @@ static void ReorderBufferReturnTXN(ReorderBuffer *rb, ReorderBufferTXN *txn); static ReorderBufferTXN *ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top); +static void ReorderBufferTransferSnapToParent(ReorderBufferTXN *txn, + ReorderBufferTXN *subtxn); static void AssertTXNLsnOrder(ReorderBuffer *rb); @@ -271,6 +273,7 @@ ReorderBufferAllocate(void) buffer->current_restart_decoding_lsn = InvalidXLogRecPtr; dlist_init(&buffer->toplevel_by_lsn); + dlist_init(&buffer->txns_by_base_snapshot_lsn); /* * Ensure there's no stale data from prior uses of this slot, in case some @@ -462,7 +465,6 @@ ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool found; Assert(TransactionIdIsValid(xid)); - Assert(!create || lsn != InvalidXLogRecPtr); /* * Check the one-entry lookup cache first @@ -506,6 +508,7 @@ ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, { /* initialize the new entry, if creation was requested */ Assert(ent != NULL); + Assert(lsn != InvalidXLogRecPtr); ent->txn = ReorderBufferGetTXN(rb); ent->txn->xid = xid; @@ -607,43 +610,80 @@ ReorderBufferQueueMessage(ReorderBuffer *rb, TransactionId xid, } } - +/* + * AssertTXNLsnOrder + * Verify LSN ordering of transaction lists in the reorderbuffer + * + * Other LSN-related invariants are checked too. + * + * No-op if assertions are not in use. + */ static void AssertTXNLsnOrder(ReorderBuffer *rb) { #ifdef USE_ASSERT_CHECKING dlist_iter iter; XLogRecPtr prev_first_lsn = InvalidXLogRecPtr; + XLogRecPtr prev_base_snap_lsn = InvalidXLogRecPtr; dlist_foreach(iter, &rb->toplevel_by_lsn) { - ReorderBufferTXN *cur_txn; + ReorderBufferTXN *cur_txn = dlist_container(ReorderBufferTXN, node, + iter.cur); - cur_txn = dlist_container(ReorderBufferTXN, node, iter.cur); + /* start LSN must be set */ Assert(cur_txn->first_lsn != InvalidXLogRecPtr); + /* If there is an end LSN, it must be higher than start LSN */ if (cur_txn->end_lsn != InvalidXLogRecPtr) Assert(cur_txn->first_lsn <= cur_txn->end_lsn); + /* Current initial LSN must be strictly higher than previous */ if (prev_first_lsn != InvalidXLogRecPtr) Assert(prev_first_lsn < cur_txn->first_lsn); + /* known-as-subtxn txns must not be listed */ Assert(!cur_txn->is_known_as_subxact); + prev_first_lsn = cur_txn->first_lsn; } + + dlist_foreach(iter, &rb->txns_by_base_snapshot_lsn) + { + ReorderBufferTXN *cur_txn = dlist_container(ReorderBufferTXN, + base_snapshot_node, + iter.cur); + + /* base snapshot (and its LSN) must be set */ + Assert(cur_txn->base_snapshot != NULL); + Assert(cur_txn->base_snapshot_lsn != InvalidXLogRecPtr); + + /* current LSN must be strictly higher than previous */ + if (prev_base_snap_lsn != InvalidXLogRecPtr) + Assert(prev_base_snap_lsn < cur_txn->base_snapshot_lsn); + + /* known-as-subtxn txns must not be listed */ + Assert(!cur_txn->is_known_as_subxact); + + prev_base_snap_lsn = cur_txn->base_snapshot_lsn; + } #endif } +/* + * ReorderBufferGetOldestTXN + * Return oldest transaction in reorderbuffer + */ ReorderBufferTXN * ReorderBufferGetOldestTXN(ReorderBuffer *rb) { ReorderBufferTXN *txn; + AssertTXNLsnOrder(rb); + if (dlist_is_empty(&rb->toplevel_by_lsn)) return NULL; - AssertTXNLsnOrder(rb); - txn = dlist_head_element(ReorderBufferTXN, node, &rb->toplevel_by_lsn); Assert(!txn->is_known_as_subxact); @@ -651,12 +691,44 @@ ReorderBufferGetOldestTXN(ReorderBuffer *rb) return txn; } +/* + * ReorderBufferGetOldestXmin + * Return oldest Xmin in reorderbuffer + * + * Returns oldest possibly running Xid from the point of view of snapshots + * used in the transactions kept by reorderbuffer, or InvalidTransactionId if + * there are none. + * + * Since snapshots are assigned monotonically, this equals the Xmin of the + * base snapshot with minimal base_snapshot_lsn. + */ +TransactionId +ReorderBufferGetOldestXmin(ReorderBuffer *rb) +{ + ReorderBufferTXN *txn; + + AssertTXNLsnOrder(rb); + + if (dlist_is_empty(&rb->txns_by_base_snapshot_lsn)) + return InvalidTransactionId; + + txn = dlist_head_element(ReorderBufferTXN, base_snapshot_node, + &rb->txns_by_base_snapshot_lsn); + return txn->base_snapshot->xmin; +} + void ReorderBufferSetRestartPoint(ReorderBuffer *rb, XLogRecPtr ptr) { rb->current_restart_decoding_lsn = ptr; } +/* + * ReorderBufferAssignChild + * + * Make note that we know that subxid is a subtransaction of xid, seen as of + * the given lsn. + */ void ReorderBufferAssignChild(ReorderBuffer *rb, TransactionId xid, TransactionId subxid, XLogRecPtr lsn) @@ -669,32 +741,107 @@ ReorderBufferAssignChild(ReorderBuffer *rb, TransactionId xid, txn = ReorderBufferTXNByXid(rb, xid, true, &new_top, lsn, true); subtxn = ReorderBufferTXNByXid(rb, subxid, true, &new_sub, lsn, false); - if (new_sub) - { - /* - * we assign subtransactions to top level transaction even if we don't - * have data for it yet, assignment records frequently reference xids - * that have not yet produced any records. Knowing those aren't top - * level xids allows us to make processing cheaper in some places. - */ - dlist_push_tail(&txn->subtxns, &subtxn->node); - txn->nsubtxns++; - } - else if (!subtxn->is_known_as_subxact) - { - subtxn->is_known_as_subxact = true; - Assert(subtxn->nsubtxns == 0); + if (new_top && !new_sub) + elog(ERROR, "subtransaction logged without previous top-level txn record"); - /* remove from lsn order list of top-level transactions */ - dlist_delete(&subtxn->node); - - /* add to toplevel transaction */ - dlist_push_tail(&txn->subtxns, &subtxn->node); - txn->nsubtxns++; - } - else if (new_top) + if (!new_sub) { - elog(ERROR, "existing subxact assigned to unknown toplevel xact"); + if (subtxn->is_known_as_subxact) + { + /* already associated, nothing to do */ + return; + } + else + { + /* + * We already saw this transaction, but initially added it to the list + * of top-level txns. Now that we know it's not top-level, remove + * it from there. + */ + dlist_delete(&subtxn->node); + } + } + + subtxn->is_known_as_subxact = true; + subtxn->toplevel_xid = xid; + Assert(subtxn->nsubtxns == 0); + + /* add to subtransaction list */ + dlist_push_tail(&txn->subtxns, &subtxn->node); + txn->nsubtxns++; + + /* Possibly transfer the subtxn's snapshot to its top-level txn. */ + ReorderBufferTransferSnapToParent(txn, subtxn); + + /* Verify LSN-ordering invariant */ + AssertTXNLsnOrder(rb); +} + +/* + * ReorderBufferTransferSnapToParent + * Transfer base snapshot from subtxn to top-level txn, if needed + * + * This is done if the top-level txn doesn't have a base snapshot, or if the + * subtxn's base snapshot has an earlier LSN than the top-level txn's base + * snapshot's LSN. This can happen if there are no changes in the toplevel + * txn but there are some in the subtxn, or the first change in subtxn has + * earlier LSN than first change in the top-level txn and we learned about + * their kinship only now. + * + * The subtransaction's snapshot is cleared regardless of the transfer + * happening, since it's not needed anymore in either case. + * + * We do this as soon as we become aware of their kinship, to avoid queueing + * extra snapshots to txns known-as-subtxns -- only top-level txns will + * receive further snapshots. + */ +static void +ReorderBufferTransferSnapToParent(ReorderBufferTXN *txn, + ReorderBufferTXN *subtxn) +{ + Assert(subtxn->toplevel_xid == txn->xid); + + if (subtxn->base_snapshot != NULL) + { + if (txn->base_snapshot == NULL || + subtxn->base_snapshot_lsn < txn->base_snapshot_lsn) + { + /* + * If the toplevel transaction already has a base snapshot but + * it's newer than the subxact's, purge it. + */ + if (txn->base_snapshot != NULL) + { + SnapBuildSnapDecRefcount(txn->base_snapshot); + dlist_delete(&txn->base_snapshot_node); + } + + /* + * The snapshot is now the top transaction's; transfer it, and + * adjust the list position of the top transaction in the list by + * moving it to where the subtransaction is. + */ + txn->base_snapshot = subtxn->base_snapshot; + txn->base_snapshot_lsn = subtxn->base_snapshot_lsn; + dlist_insert_before(&subtxn->base_snapshot_node, + &txn->base_snapshot_node); + + /* + * The subtransaction doesn't have a snapshot anymore (so it + * mustn't be in the list.) + */ + subtxn->base_snapshot = NULL; + subtxn->base_snapshot_lsn = InvalidXLogRecPtr; + dlist_delete(&subtxn->base_snapshot_node); + } + else + { + /* Base snap of toplevel is fine, so subxact's is not needed */ + SnapBuildSnapDecRefcount(subtxn->base_snapshot); + dlist_delete(&subtxn->base_snapshot_node); + subtxn->base_snapshot = NULL; + subtxn->base_snapshot_lsn = InvalidXLogRecPtr; + } } } @@ -707,7 +854,6 @@ ReorderBufferCommitChild(ReorderBuffer *rb, TransactionId xid, TransactionId subxid, XLogRecPtr commit_lsn, XLogRecPtr end_lsn) { - ReorderBufferTXN *txn; ReorderBufferTXN *subtxn; subtxn = ReorderBufferTXNByXid(rb, subxid, false, NULL, @@ -719,42 +865,14 @@ ReorderBufferCommitChild(ReorderBuffer *rb, TransactionId xid, if (!subtxn) return; - txn = ReorderBufferTXNByXid(rb, xid, false, NULL, commit_lsn, true); - - if (txn == NULL) - elog(ERROR, "subxact logged without previous toplevel record"); - - /* - * Pass our base snapshot to the parent transaction if it doesn't have - * one, or ours is older. That can happen if there are no changes in the - * toplevel transaction but in one of the child transactions. This allows - * the parent to simply use its base snapshot initially. - */ - if (subtxn->base_snapshot != NULL && - (txn->base_snapshot == NULL || - txn->base_snapshot_lsn > subtxn->base_snapshot_lsn)) - { - txn->base_snapshot = subtxn->base_snapshot; - txn->base_snapshot_lsn = subtxn->base_snapshot_lsn; - subtxn->base_snapshot = NULL; - subtxn->base_snapshot_lsn = InvalidXLogRecPtr; - } - subtxn->final_lsn = commit_lsn; subtxn->end_lsn = end_lsn; - if (!subtxn->is_known_as_subxact) - { - subtxn->is_known_as_subxact = true; - Assert(subtxn->nsubtxns == 0); - - /* remove from lsn order list of top-level transactions */ - dlist_delete(&subtxn->node); - - /* add to subtransaction list */ - dlist_push_tail(&txn->subtxns, &subtxn->node); - txn->nsubtxns++; - } + /* + * Assign this subxact as a child of the toplevel xact (no-op if already + * done.) + */ + ReorderBufferAssignChild(rb, xid, subxid, InvalidXLogRecPtr); } @@ -1078,11 +1196,13 @@ ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn) ReorderBufferReturnChange(rb, change); } + /* + * Cleanup the base snapshot, if set. + */ if (txn->base_snapshot != NULL) { SnapBuildSnapDecRefcount(txn->base_snapshot); - txn->base_snapshot = NULL; - txn->base_snapshot_lsn = InvalidXLogRecPtr; + dlist_delete(&txn->base_snapshot_node); } /* @@ -1257,17 +1377,17 @@ ReorderBufferFreeSnap(ReorderBuffer *rb, Snapshot snap) } /* - * Perform the replay of a transaction and it's non-aborted subtransactions. + * Perform the replay of a transaction and its non-aborted subtransactions. * * Subtransactions previously have to be processed by * ReorderBufferCommitChild(), even if previously assigned to the toplevel * transaction with ReorderBufferAssignChild. * - * We currently can only decode a transaction's contents in when their commit - * record is read because that's currently the only place where we know about - * cache invalidations. Thus, once a toplevel commit is read, we iterate over - * the top and subtransactions (using a k-way merge) and replay the changes in - * lsn order. + * We currently can only decode a transaction's contents when its commit + * record is read because that's the only place where we know about cache + * invalidations. Thus, once a toplevel commit is read, we iterate over the top + * and subtransactions (using a k-way merge) and replay the changes in lsn + * order. */ void ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid, @@ -1295,10 +1415,10 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid, txn->origin_lsn = origin_lsn; /* - * If this transaction didn't have any real changes in our database, it's - * OK not to have a snapshot. Note that ReorderBufferCommitChild will have - * transferred its snapshot to this transaction if it had one and the - * toplevel tx didn't. + * If this transaction has no snapshot, it didn't make any changes to the + * database, so there's nothing to decode. Note that + * ReorderBufferCommitChild will have transferred any snapshots from + * subtransactions if there were any. */ if (txn->base_snapshot == NULL) { @@ -1861,12 +1981,10 @@ ReorderBufferAddSnapshot(ReorderBuffer *rb, TransactionId xid, } /* - * Setup the base snapshot of a transaction. The base snapshot is the snapshot - * that is used to decode all changes until either this transaction modifies - * the catalog or another catalog modifying transaction commits. + * Set up the transaction's base snapshot. * - * Needs to be called before any changes are added with - * ReorderBufferQueueChange(). + * If we know that xid is a subtransaction, set the base snapshot on the + * top-level transaction instead. */ void ReorderBufferSetBaseSnapshot(ReorderBuffer *rb, TransactionId xid, @@ -1875,12 +1993,23 @@ ReorderBufferSetBaseSnapshot(ReorderBuffer *rb, TransactionId xid, ReorderBufferTXN *txn; bool is_new; + AssertArg(snap != NULL); + + /* + * Fetch the transaction to operate on. If we know it's a subtransaction, + * operate on its top-level transaction instead. + */ txn = ReorderBufferTXNByXid(rb, xid, true, &is_new, lsn, true); + if (txn->is_known_as_subxact) + txn = ReorderBufferTXNByXid(rb, txn->toplevel_xid, false, + NULL, InvalidXLogRecPtr, false); Assert(txn->base_snapshot == NULL); - Assert(snap != NULL); txn->base_snapshot = snap; txn->base_snapshot_lsn = lsn; + dlist_push_tail(&rb->txns_by_base_snapshot_lsn, &txn->base_snapshot_node); + + AssertTXNLsnOrder(rb); } /* @@ -1999,25 +2128,26 @@ ReorderBufferXidHasCatalogChanges(ReorderBuffer *rb, TransactionId xid) } /* - * Have we already added the first snapshot? + * ReorderBufferXidHasBaseSnapshot + * Have we already set the base snapshot for the given txn/subtxn? */ bool ReorderBufferXidHasBaseSnapshot(ReorderBuffer *rb, TransactionId xid) { ReorderBufferTXN *txn; - txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr, - false); + txn = ReorderBufferTXNByXid(rb, xid, false, + NULL, InvalidXLogRecPtr, false); /* transaction isn't known yet, ergo no snapshot */ if (txn == NULL) return false; - /* - * TODO: It would be a nice improvement if we would check the toplevel - * transaction in subtransactions, but we'd need to keep track of a bit - * more state. - */ + /* a known subtxn? operate on top-level txn instead */ + if (txn->is_known_as_subxact) + txn = ReorderBufferTXNByXid(rb, txn->toplevel_xid, false, + NULL, InvalidXLogRecPtr, false); + return txn->base_snapshot != NULL; } diff --git a/src/backend/replication/logical/snapbuild.c b/src/backend/replication/logical/snapbuild.c index 2c4a1bab4b..e975faeb8c 100644 --- a/src/backend/replication/logical/snapbuild.c +++ b/src/backend/replication/logical/snapbuild.c @@ -830,9 +830,9 @@ SnapBuildDistributeNewCatalogSnapshot(SnapBuild *builder, XLogRecPtr lsn) * all. We'll add a snapshot when the first change gets queued. * * NB: This works correctly even for subtransactions because - * ReorderBufferCommitChild() takes care to pass the parent the base - * snapshot, and while iterating the changequeue we'll get the change - * from the subtxn. + * ReorderBufferAssignChild() takes care to transfer the base snapshot + * to the top-level transaction, and while iterating the changequeue + * we'll get the change from the subtxn. */ if (!ReorderBufferXidHasBaseSnapshot(builder->reorder, txn->xid)) continue; @@ -1074,7 +1074,7 @@ SnapBuildCommitTxn(SnapBuild *builder, XLogRecPtr lsn, TransactionId xid, /* refcount of the snapshot builder for the new snapshot */ SnapBuildSnapIncRefcount(builder->snapshot); - /* add a new Snapshot to all currently running transactions */ + /* add a new catalog snapshot to all currently running transactions */ SnapBuildDistributeNewCatalogSnapshot(builder, lsn); } } @@ -1094,6 +1094,7 @@ void SnapBuildProcessRunningXacts(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *running) { ReorderBufferTXN *txn; + TransactionId xmin; /* * If we're not consistent yet, inspect the record to see whether it @@ -1126,15 +1127,21 @@ SnapBuildProcessRunningXacts(SnapBuild *builder, XLogRecPtr lsn, xl_running_xact /* Remove transactions we don't need to keep track off anymore */ SnapBuildPurgeCommittedTxn(builder); - elog(DEBUG3, "xmin: %u, xmax: %u, oldestrunning: %u", - builder->xmin, builder->xmax, - running->oldestRunningXid); - /* - * Increase shared memory limits, so vacuum can work on tuples we - * prevented from being pruned till now. + * Advance the xmin limit for the current replication slot, to allow + * vacuum to clean up the tuples this slot has been protecting. + * + * The reorderbuffer might have an xmin among the currently running + * snapshots; use it if so. If not, we need only consider the snapshots + * we'll produce later, which can't be less than the oldest running xid in + * the record we're reading now. */ - LogicalIncreaseXminForSlot(lsn, running->oldestRunningXid); + xmin = ReorderBufferGetOldestXmin(builder->reorder); + if (xmin == InvalidTransactionId) + xmin = running->oldestRunningXid; + elog(DEBUG3, "xmin: %u, xmax: %u, oldest running: %u, oldest xmin: %u", + builder->xmin, builder->xmax, running->oldestRunningXid, xmin); + LogicalIncreaseXminForSlot(lsn, xmin); /* * Also tell the slot where we can restart decoding from. We don't want to diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h index 1c7982958e..f8a295bddc 100644 --- a/src/include/replication/reorderbuffer.h +++ b/src/include/replication/reorderbuffer.h @@ -160,10 +160,9 @@ typedef struct ReorderBufferTXN /* did the TX have catalog changes */ bool has_catalog_changes; - /* - * Do we know this is a subxact? - */ + /* Do we know this is a subxact? Xid of top-level txn if so */ bool is_known_as_subxact; + TransactionId toplevel_xid; /* * LSN of the first data carrying, WAL record with knowledge about this @@ -209,10 +208,13 @@ typedef struct ReorderBufferTXN TimestampTz commit_time; /* - * Base snapshot or NULL. + * The base snapshot is used to decode all changes until either this + * transaction modifies the catalog, or another catalog-modifying + * transaction commits. */ Snapshot base_snapshot; XLogRecPtr base_snapshot_lsn; + dlist_node base_snapshot_node; /* link in txns_by_base_snapshot_lsn */ /* * How many ReorderBufferChange's do we have in this txn. @@ -279,7 +281,7 @@ typedef struct ReorderBufferTXN * Position in one of three lists: * * list of subtransactions if we are *known* to be subxact * * list of toplevel xacts (can be an as-yet unknown subxact) - * * list of preallocated ReorderBufferTXNs + * * list of preallocated ReorderBufferTXNs (if unused) * --- */ dlist_node node; @@ -337,6 +339,15 @@ struct ReorderBuffer */ dlist_head toplevel_by_lsn; + /* + * Transactions and subtransactions that have a base snapshot, ordered by + * LSN of the record which caused us to first obtain the base snapshot. + * This is not the same as toplevel_by_lsn, because we only set the base + * snapshot on the first logical-decoding-relevant record (eg. heap + * writes), whereas the initial LSN could be set by other operations. + */ + dlist_head txns_by_base_snapshot_lsn; + /* * one-entry sized cache for by_txn. Very frequently the same txn gets * looked up over and over again. @@ -422,6 +433,7 @@ bool ReorderBufferXidHasCatalogChanges(ReorderBuffer *, TransactionId xid); bool ReorderBufferXidHasBaseSnapshot(ReorderBuffer *, TransactionId xid); ReorderBufferTXN *ReorderBufferGetOldestTXN(ReorderBuffer *); +TransactionId ReorderBufferGetOldestXmin(ReorderBuffer *rb); void ReorderBufferSetRestartPoint(ReorderBuffer *, XLogRecPtr ptr);