Skip logical decoding of already-aborted transactions.

Previously, transaction aborts were detected concurrently only during
system catalog scans while replaying a transaction in streaming mode.

This commit adds an additional CLOG lookup to check the transaction
status, allowing the logical decoding to skip changes also when it
doesn't touch system catalogs, if the transaction is already
aborted. This optimization enhances logical decoding performance,
especially for large transactions that have already been rolled back,
as it avoids unnecessary disk or network I/O.

To avoid potential slowdowns caused by frequent CLOG lookups for small
transactions (most of which commit), the CLOG lookup is performed only
for large transactions before eviction. The performance benchmark
results showed there is not noticeable performance regression due to
CLOG lookups.

Reviewed-by: Amit Kapila, Peter Smith, Vignesh C, Ajin Cherian
Reviewed-by: Dilip Kumar, Andres Freund
Discussion: https://postgr.es/m/CAD21AoDht9Pz_DFv_R2LqBTBbO4eGrpa9Vojmt5z5sEx3XwD7A@mail.gmail.com
This commit is contained in:
Masahiko Sawada 2025-02-12 16:31:34 -08:00
parent 9e66a2b784
commit 072ee847ad
6 changed files with 246 additions and 46 deletions

View File

@ -138,12 +138,46 @@ SELECT slot_name FROM pg_stat_replication_slots;
(3 rows)
COMMIT;
SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot_stats4_twophase', 'test_decoding', false, true) s4;
?column?
----------
init
(1 row)
-- The INSERT changes are large enough to be spilled but will not be, because
-- the transaction is aborted. The logical decoding skips collecting further
-- changes too. The transaction is prepared to make sure the decoding processes
-- the aborted transaction.
BEGIN;
INSERT INTO stats_test SELECT 'serialize-toobig--1:'||g.i FROM generate_series(1, 5000) g(i);
PREPARE TRANSACTION 'test1_abort';
ROLLBACK PREPARED 'test1_abort';
SELECT count(*) FROM pg_logical_slot_get_changes('regression_slot_stats4_twophase', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
count
-------
1
(1 row)
-- Verify that the decoding doesn't spill already-aborted transaction's changes.
SELECT pg_stat_force_next_flush();
pg_stat_force_next_flush
--------------------------
(1 row)
SELECT slot_name, spill_txns, spill_count FROM pg_stat_replication_slots WHERE slot_name = 'regression_slot_stats4_twophase';
slot_name | spill_txns | spill_count
---------------------------------+------------+-------------
regression_slot_stats4_twophase | 0 | 0
(1 row)
DROP TABLE stats_test;
SELECT pg_drop_replication_slot('regression_slot_stats1'),
pg_drop_replication_slot('regression_slot_stats2'),
pg_drop_replication_slot('regression_slot_stats3');
pg_drop_replication_slot | pg_drop_replication_slot | pg_drop_replication_slot
--------------------------+--------------------------+--------------------------
| |
pg_drop_replication_slot('regression_slot_stats3'),
pg_drop_replication_slot('regression_slot_stats4_twophase');
pg_drop_replication_slot | pg_drop_replication_slot | pg_drop_replication_slot | pg_drop_replication_slot
--------------------------+--------------------------+--------------------------+--------------------------
| | |
(1 row)

View File

@ -114,7 +114,12 @@ SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'incl
* detect that the subtransaction was aborted, and reset the transaction while having
* the TOAST changes in memory, resulting in deallocating both decoded changes and
* TOAST reconstruction data. Memory usage counters must be updated correctly.
*
* Set debug_logical_replication_streaming to 'immediate' to disable the transaction
* status check happening before streaming the second insertion, so we can detect a
* concurrent abort while streaming.
*/
SET debug_logical_replication_streaming = immediate;
BEGIN;
INSERT INTO stream_test SELECT repeat(string_agg(to_char(g.i, 'FM0000'), ''), 50) FROM generate_series(1, 500) g(i);
ALTER TABLE stream_test ADD COLUMN i INT;
@ -128,6 +133,7 @@ SELECT count(*) FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL,
5
(1 row)
RESET debug_logical_replication_streaming;
DROP TABLE stream_test;
SELECT pg_drop_replication_slot('regression_slot');
pg_drop_replication_slot

View File

@ -50,7 +50,25 @@ SELECT slot_name FROM pg_stat_replication_slots;
SELECT slot_name FROM pg_stat_replication_slots;
COMMIT;
SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot_stats4_twophase', 'test_decoding', false, true) s4;
-- The INSERT changes are large enough to be spilled but will not be, because
-- the transaction is aborted. The logical decoding skips collecting further
-- changes too. The transaction is prepared to make sure the decoding processes
-- the aborted transaction.
BEGIN;
INSERT INTO stats_test SELECT 'serialize-toobig--1:'||g.i FROM generate_series(1, 5000) g(i);
PREPARE TRANSACTION 'test1_abort';
ROLLBACK PREPARED 'test1_abort';
SELECT count(*) FROM pg_logical_slot_get_changes('regression_slot_stats4_twophase', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
-- Verify that the decoding doesn't spill already-aborted transaction's changes.
SELECT pg_stat_force_next_flush();
SELECT slot_name, spill_txns, spill_count FROM pg_stat_replication_slots WHERE slot_name = 'regression_slot_stats4_twophase';
DROP TABLE stats_test;
SELECT pg_drop_replication_slot('regression_slot_stats1'),
pg_drop_replication_slot('regression_slot_stats2'),
pg_drop_replication_slot('regression_slot_stats3');
pg_drop_replication_slot('regression_slot_stats3'),
pg_drop_replication_slot('regression_slot_stats4_twophase');

View File

@ -49,7 +49,12 @@ SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'incl
* detect that the subtransaction was aborted, and reset the transaction while having
* the TOAST changes in memory, resulting in deallocating both decoded changes and
* TOAST reconstruction data. Memory usage counters must be updated correctly.
*
* Set debug_logical_replication_streaming to 'immediate' to disable the transaction
* status check happening before streaming the second insertion, so we can detect a
* concurrent abort while streaming.
*/
SET debug_logical_replication_streaming = immediate;
BEGIN;
INSERT INTO stream_test SELECT repeat(string_agg(to_char(g.i, 'FM0000'), ''), 50) FROM generate_series(1, 500) g(i);
ALTER TABLE stream_test ADD COLUMN i INT;
@ -58,6 +63,7 @@ INSERT INTO stream_test(data, i) SELECT repeat(string_agg(to_char(g.i, 'FM0000')
ROLLBACK TO s1;
COMMIT;
SELECT count(*) FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'stream-changes', '1');
RESET debug_logical_replication_streaming;
DROP TABLE stream_test;
SELECT pg_drop_replication_slot('regression_slot');

View File

@ -106,6 +106,7 @@
#include "replication/snapbuild.h" /* just for SnapBuildSnapDecRefcount */
#include "storage/bufmgr.h"
#include "storage/fd.h"
#include "storage/procarray.h"
#include "storage/sinval.h"
#include "utils/builtins.h"
#include "utils/memutils.h"
@ -260,6 +261,8 @@ static void ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
static void ReorderBufferRestoreCleanup(ReorderBuffer *rb, ReorderBufferTXN *txn);
static void ReorderBufferTruncateTXN(ReorderBuffer *rb, ReorderBufferTXN *txn,
bool txn_prepared);
static void ReorderBufferMaybeMarkTXNStreamed(ReorderBuffer *rb, ReorderBufferTXN *txn);
static bool ReorderBufferCheckAndTruncateAbortedTXN(ReorderBuffer *rb, ReorderBufferTXN *txn);
static void ReorderBufferCleanupSerializedTXNs(const char *slotname);
static void ReorderBufferSerializedPath(char *path, ReplicationSlot *slot,
TransactionId xid, XLogSegNo segno);
@ -793,11 +796,11 @@ ReorderBufferQueueChange(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn,
txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
/*
* While streaming the previous changes we have detected that the
* transaction is aborted. So there is no point in collecting further
* changes for it.
* If we have detected that the transaction is aborted while streaming the
* previous changes or by checking its CLOG, there is no point in
* collecting further changes for it.
*/
if (txn->concurrent_abort)
if (rbtxn_is_aborted(txn))
{
/*
* We don't need to update memory accounting for this change as we
@ -1620,8 +1623,9 @@ ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
/*
* Discard changes from a transaction (and subtransactions), either after
* streaming or decoding them at PREPARE. Keep the remaining info -
* transactions, tuplecids, invalidations and snapshots.
* streaming, decoding them at PREPARE, or detecting the transaction abort.
* Keep the remaining info - transactions, tuplecids, invalidations and
* snapshots.
*
* We additionally remove tuplecids after decoding the transaction at prepare
* time as we only need to perform invalidation at rollback or commit prepared.
@ -1650,6 +1654,7 @@ ReorderBufferTruncateTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, bool txn_prep
Assert(rbtxn_is_known_subxact(subtxn));
Assert(subtxn->nsubtxns == 0);
ReorderBufferMaybeMarkTXNStreamed(rb, subtxn);
ReorderBufferTruncateTXN(rb, subtxn, txn_prepared);
}
@ -1680,24 +1685,6 @@ ReorderBufferTruncateTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, bool txn_prep
/* Update the memory counter */
ReorderBufferChangeMemoryUpdate(rb, NULL, txn, false, mem_freed);
/*
* Mark the transaction as streamed.
*
* The top-level transaction, is marked as streamed always, even if it
* does not contain any changes (that is, when all the changes are in
* subtransactions).
*
* For subtransactions, we only mark them as streamed when there are
* changes in them.
*
* We do it this way because of aborts - we don't want to send aborts for
* XIDs the downstream is not aware of. And of course, it always knows
* about the toplevel xact (we send the XID in all messages), but we never
* stream XIDs of empty subxacts.
*/
if ((!txn_prepared) && (rbtxn_is_toptxn(txn) || (txn->nentries_mem != 0)))
txn->txn_flags |= RBTXN_IS_STREAMED;
if (txn_prepared)
{
/*
@ -1752,6 +1739,76 @@ ReorderBufferTruncateTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, bool txn_prep
txn->nentries = 0;
}
/*
* Check the transaction status by CLOG lookup and discard all changes if
* the transaction is aborted. The transaction status is cached in
* txn->txn_flags so we can skip future changes and avoid CLOG lookups on the
* next call.
*
* Return true if the transaction is aborted, otherwise return false.
*
* When the 'debug_logical_replication_streaming' is set to "immediate", we
* don't check the transaction status, meaning the caller will always process
* this transaction.
*/
static bool
ReorderBufferCheckAndTruncateAbortedTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
{
/* Quick return for regression tests */
if (unlikely(debug_logical_replication_streaming == DEBUG_LOGICAL_REP_STREAMING_IMMEDIATE))
return false;
/*
* Quick return if the transaction status is already known.
*/
if (rbtxn_is_committed(txn))
return false;
if (rbtxn_is_aborted(txn))
{
/* Already-aborted transactions should not have any changes */
Assert(txn->size == 0);
return true;
}
/* Otherwise, check the transaction status using CLOG lookup */
if (TransactionIdIsInProgress(txn->xid))
return false;
if (TransactionIdDidCommit(txn->xid))
{
/*
* Remember the transaction is committed so that we can skip CLOG
* check next time, avoiding the pressure on CLOG lookup.
*/
Assert(!rbtxn_is_aborted(txn));
txn->txn_flags |= RBTXN_IS_COMMITTED;
return false;
}
/*
* The transaction aborted. We discard both the changes collected so far
* and the toast reconstruction data. The full cleanup will happen as part
* of decoding ABORT record of this transaction.
*/
ReorderBufferTruncateTXN(rb, txn, rbtxn_prepared(txn));
ReorderBufferToastReset(rb, txn);
/* All changes should be discarded */
Assert(txn->size == 0);
/*
* Mark the transaction as aborted so we can ignore future changes of this
* transaction.
*/
Assert(!rbtxn_is_committed(txn));
txn->txn_flags |= RBTXN_IS_ABORTED;
return true;
}
/*
* Build a hash with a (relfilelocator, ctid) -> (cmin, cmax) mapping for use by
* HeapTupleSatisfiesHistoricMVCC.
@ -1917,7 +1974,9 @@ ReorderBufferStreamCommit(ReorderBuffer *rb, ReorderBufferTXN *txn)
* Note, we send stream prepare even if a concurrent abort is
* detected. See DecodePrepare for more information.
*/
Assert(!rbtxn_sent_prepare(txn));
rb->stream_prepare(rb, txn, txn->final_lsn);
txn->txn_flags |= RBTXN_SENT_PREPARE;
/*
* This is a PREPARED transaction, part of a two-phase commit. The
@ -2052,6 +2111,30 @@ ReorderBufferSaveTXNSnapshot(ReorderBuffer *rb, ReorderBufferTXN *txn,
txn, command_id);
}
/*
* Mark the given transaction as streamed if it's a top-level transaction
* or has changes.
*/
static void
ReorderBufferMaybeMarkTXNStreamed(ReorderBuffer *rb, ReorderBufferTXN *txn)
{
/*
* The top-level transaction, is marked as streamed always, even if it
* does not contain any changes (that is, when all the changes are in
* subtransactions).
*
* For subtransactions, we only mark them as streamed when there are
* changes in them.
*
* We do it this way because of aborts - we don't want to send aborts for
* XIDs the downstream is not aware of. And of course, it always knows
* about the top-level xact (we send the XID in all messages), but we
* never stream XIDs of empty subxacts.
*/
if (rbtxn_is_toptxn(txn) || (txn->nentries_mem != 0))
txn->txn_flags |= RBTXN_IS_STREAMED;
}
/*
* Helper function for ReorderBufferProcessTXN to handle the concurrent
* abort of the streaming transaction. This resets the TXN such that it
@ -2543,7 +2626,11 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn,
* regular ones).
*/
if (rbtxn_prepared(txn))
{
Assert(!rbtxn_sent_prepare(txn));
rb->prepare(rb, txn, commit_lsn);
txn->txn_flags |= RBTXN_SENT_PREPARE;
}
else
rb->commit(rb, txn, commit_lsn);
}
@ -2595,6 +2682,9 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn,
*/
if (streaming || rbtxn_prepared(txn))
{
if (streaming)
ReorderBufferMaybeMarkTXNStreamed(rb, txn);
ReorderBufferTruncateTXN(rb, txn, rbtxn_prepared(txn));
/* Reset the CheckXidAlive */
CheckXidAlive = InvalidTransactionId;
@ -2648,7 +2738,14 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn,
FlushErrorState();
FreeErrorData(errdata);
errdata = NULL;
curtxn->concurrent_abort = true;
/* Remember the transaction is aborted. */
Assert(!rbtxn_is_committed(curtxn));
curtxn->txn_flags |= RBTXN_IS_ABORTED;
/* Mark the transaction is streamed if appropriate */
if (stream_started)
ReorderBufferMaybeMarkTXNStreamed(rb, txn);
/* Reset the TXN so that it is allowed to stream remaining data. */
ReorderBufferResetTXN(rb, txn, snapshot_now,
@ -2828,15 +2925,15 @@ ReorderBufferPrepare(ReorderBuffer *rb, TransactionId xid,
txn->xact_time.prepare_time, txn->origin_id, txn->origin_lsn);
/*
* We send the prepare for the concurrently aborted xacts so that later
* when rollback prepared is decoded and sent, the downstream should be
* able to rollback such a xact. See comments atop DecodePrepare.
*
* Note, for the concurrent_abort + streaming case a stream_prepare was
* already sent within the ReorderBufferReplay call above.
* Send a prepare if not already done so. This might occur if we have
* detected a concurrent abort while replaying the non-streaming
* transaction.
*/
if (txn->concurrent_abort && !rbtxn_is_streamed(txn))
if (!rbtxn_sent_prepare(txn))
{
rb->prepare(rb, txn, txn->final_lsn);
txn->txn_flags |= RBTXN_SENT_PREPARE;
}
}
/*
@ -3566,7 +3663,8 @@ ReorderBufferLargestTXN(ReorderBuffer *rb)
}
/*
* Find the largest streamable toplevel transaction to evict (by streaming).
* Find the largest streamable (and non-aborted) toplevel transaction to evict
* (by streaming).
*
* This can be seen as an optimized version of ReorderBufferLargestTXN, which
* should give us the same transaction (because we don't update memory account
@ -3608,9 +3706,15 @@ ReorderBufferLargestStreamableTopTXN(ReorderBuffer *rb)
/* base_snapshot must be set */
Assert(txn->base_snapshot != NULL);
/* Don't consider these kinds of transactions for eviction. */
if (rbtxn_has_partial_change(txn) ||
!rbtxn_has_streamable_change(txn) ||
rbtxn_is_aborted(txn))
continue;
/* Find the largest of the eviction candidates. */
if ((largest == NULL || txn->total_size > largest_size) &&
(txn->total_size > 0) && !(rbtxn_has_partial_change(txn)) &&
rbtxn_has_streamable_change(txn))
(txn->total_size > 0))
{
largest = txn;
largest_size = txn->total_size;
@ -3661,8 +3765,8 @@ ReorderBufferCheckMemoryLimit(ReorderBuffer *rb)
rb->size > 0))
{
/*
* Pick the largest transaction and evict it from memory by streaming,
* if possible. Otherwise, spill to disk.
* Pick the largest non-aborted transaction and evict it from memory
* by streaming, if possible. Otherwise, spill to disk.
*/
if (ReorderBufferCanStartStreaming(rb) &&
(txn = ReorderBufferLargestStreamableTopTXN(rb)) != NULL)
@ -3672,6 +3776,10 @@ ReorderBufferCheckMemoryLimit(ReorderBuffer *rb)
Assert(txn->total_size > 0);
Assert(rb->size >= txn->total_size);
/* skip the transaction if aborted */
if (ReorderBufferCheckAndTruncateAbortedTXN(rb, txn))
continue;
ReorderBufferStreamTXN(rb, txn);
}
else
@ -3687,6 +3795,10 @@ ReorderBufferCheckMemoryLimit(ReorderBuffer *rb)
Assert(txn->size > 0);
Assert(rb->size >= txn->size);
/* skip the transaction if aborted */
if (ReorderBufferCheckAndTruncateAbortedTXN(rb, txn))
continue;
ReorderBufferSerializeTXN(rb, txn);
}

View File

@ -173,6 +173,9 @@ typedef struct ReorderBufferChange
#define RBTXN_PREPARE 0x0040
#define RBTXN_SKIPPED_PREPARE 0x0080
#define RBTXN_HAS_STREAMABLE_CHANGE 0x0100
#define RBTXN_SENT_PREPARE 0x0200
#define RBTXN_IS_COMMITTED 0x0400
#define RBTXN_IS_ABORTED 0x0800
/* Does the transaction have catalog changes? */
#define rbtxn_has_catalog_changes(txn) \
@ -224,12 +227,36 @@ typedef struct ReorderBufferChange
((txn)->txn_flags & RBTXN_IS_STREAMED) != 0 \
)
/* Has this transaction been prepared? */
/*
* Is this a prepared transaction?
*
* Being true means that this transaction should be prepared instead of
* committed. To check whether a prepare or a stream_prepare has already
* been sent for this transaction, we need to use rbtxn_sent_prepare().
*/
#define rbtxn_prepared(txn) \
( \
((txn)->txn_flags & RBTXN_PREPARE) != 0 \
)
/* Has a prepare or stream_prepare already been sent? */
#define rbtxn_sent_prepare(txn) \
( \
((txn)->txn_flags & RBTXN_SENT_PREPARE) != 0 \
)
/* Is this transaction committed? */
#define rbtxn_is_committed(txn) \
( \
((txn)->txn_flags & RBTXN_IS_COMMITTED) != 0 \
)
/* Is this transaction aborted? */
#define rbtxn_is_aborted(txn) \
( \
((txn)->txn_flags & RBTXN_IS_ABORTED) != 0 \
)
/* prepare for this transaction skipped? */
#define rbtxn_skip_prepared(txn) \
( \
@ -419,9 +446,6 @@ typedef struct ReorderBufferTXN
/* Size of top-transaction including sub-transactions. */
Size total_size;
/* If we have detected concurrent abort then ignore future changes. */
bool concurrent_abort;
/*
* Private data pointer of the output plugin.
*/