diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c index 07eebedbac9..5cf28d4df42 100644 --- a/src/backend/replication/logical/reorderbuffer.c +++ b/src/backend/replication/logical/reorderbuffer.c @@ -67,6 +67,21 @@ * allocator, evicting the oldest changes would make it more likely the * memory gets actually freed. * + * We use a max-heap with transaction size as the key to efficiently find + * the largest transaction. While the max-heap is empty, we don't update + * the max-heap when updating the memory counter. Therefore, we can get + * the largest transaction in O(N) time, where N is the number of + * transactions including top-level transactions and subtransactions. + * + * We build the max-heap just before selecting the largest transactions + * if the number of transactions being decoded is higher than the threshold, + * MAX_HEAP_TXN_COUNT_THRESHOLD. After building the max-heap, we also + * update the max-heap when updating the memory counter. The intention is + * to efficiently find the largest transaction in O(1) time instead of + * incurring the cost of memory counter updates (O(log N)). Once the number + * of transactions got lower than the threshold, we reset the max-heap + * (refer to ReorderBufferMaybeResetMaxHeap() for details). + * * We still rely on max_changes_in_memory when loading serialized changes * back into memory. At that point we can't use the memory limit directly * as we load the subxacts independently. One option to deal with this @@ -107,6 +122,22 @@ #include "utils/rel.h" #include "utils/relfilenumbermap.h" +/* + * Threshold of the total number of top-level and sub transactions that + * controls whether we use the max-heap for tracking their sizes. Although + * using the max-heap to select the largest transaction is effective when + * there are many transactions being decoded, maintaining the max-heap while + * updating the memory statistics can be costly. Therefore, we use + * MaxConnections as the threshold so that we use the max-heap only when + * using subtransactions. + */ +#define MAX_HEAP_TXN_COUNT_THRESHOLD MaxConnections + +/* + * A macro to check if the max-heap is ready to use and needs to be updated + * accordingly. + */ +#define ReorderBufferMaxHeapIsReady(rb) !binaryheap_empty((rb)->txn_heap) /* entry for a hash table we use to map from xid to our transaction state */ typedef struct ReorderBufferTXNByIdEnt @@ -259,6 +290,9 @@ static void ReorderBufferTruncateTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, static void ReorderBufferCleanupSerializedTXNs(const char *slotname); static void ReorderBufferSerializedPath(char *path, ReplicationSlot *slot, TransactionId xid, XLogSegNo segno); +static void ReorderBufferBuildMaxHeap(ReorderBuffer *rb); +static void ReorderBufferMaybeResetMaxHeap(ReorderBuffer *rb); +static int ReorderBufferTXNSizeCompare(Datum a, Datum b, void *arg); static void ReorderBufferFreeSnap(ReorderBuffer *rb, Snapshot snap); static Snapshot ReorderBufferCopySnap(ReorderBuffer *rb, Snapshot orig_snap, @@ -293,6 +327,7 @@ static void ReorderBufferToastAppendChunk(ReorderBuffer *rb, ReorderBufferTXN *t static Size ReorderBufferChangeSize(ReorderBufferChange *change); static void ReorderBufferChangeMemoryUpdate(ReorderBuffer *rb, ReorderBufferChange *change, + ReorderBufferTXN *txn, bool addition, Size sz); /* @@ -355,6 +390,17 @@ ReorderBufferAllocate(void) buffer->outbufsize = 0; buffer->size = 0; + /* + * The binaryheap is indexed for faster manipulations. + * + * We allocate the initial heap size greater than + * MAX_HEAP_TXN_COUNT_THRESHOLD because the txn_heap will not be used + * until the threshold is exceeded. + */ + buffer->txn_heap = binaryheap_allocate(MAX_HEAP_TXN_COUNT_THRESHOLD * 2, + ReorderBufferTXNSizeCompare, + true, NULL); + buffer->spillTxns = 0; buffer->spillCount = 0; buffer->spillBytes = 0; @@ -485,7 +531,7 @@ ReorderBufferReturnChange(ReorderBuffer *rb, ReorderBufferChange *change, { /* update memory accounting info */ if (upd_mem) - ReorderBufferChangeMemoryUpdate(rb, change, false, + ReorderBufferChangeMemoryUpdate(rb, change, NULL, false, ReorderBufferChangeSize(change)); /* free contained data */ @@ -816,7 +862,7 @@ ReorderBufferQueueChange(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, txn->nentries_mem++; /* update memory accounting information */ - ReorderBufferChangeMemoryUpdate(rb, change, true, + ReorderBufferChangeMemoryUpdate(rb, change, NULL, true, ReorderBufferChangeSize(change)); /* process partial change */ @@ -1527,7 +1573,7 @@ ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn) /* Check we're not mixing changes from different transactions. */ Assert(change->txn == txn); - ReorderBufferReturnChange(rb, change, true); + ReorderBufferReturnChange(rb, change, false); } /* @@ -1586,8 +1632,17 @@ ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn) if (rbtxn_is_serialized(txn)) ReorderBufferRestoreCleanup(rb, txn); + /* Update the memory counter */ + ReorderBufferChangeMemoryUpdate(rb, NULL, txn, false, txn->size); + /* deallocate */ ReorderBufferReturnTXN(rb, txn); + + /* + * After cleaning up one transaction, the number of transactions might get + * lower than the threshold for the max-heap. + */ + ReorderBufferMaybeResetMaxHeap(rb); } /* @@ -1637,9 +1692,12 @@ ReorderBufferTruncateTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, bool txn_prep /* remove the change from it's containing list */ dlist_delete(&change->node); - ReorderBufferReturnChange(rb, change, true); + ReorderBufferReturnChange(rb, change, false); } + /* Update the memory counter */ + ReorderBufferChangeMemoryUpdate(rb, NULL, txn, false, txn->size); + /* * Mark the transaction as streamed. * @@ -3166,6 +3224,9 @@ ReorderBufferAddNewCommandId(ReorderBuffer *rb, TransactionId xid, * decide if we reached the memory limit, the transaction counter allows * us to quickly pick the largest transaction for eviction. * + * Either txn or change must be non-NULL at least. We update the memory + * counter of txn if it's non-NULL, otherwise change->txn. + * * When streaming is enabled, we need to update the toplevel transaction * counters instead - we don't really care about subtransactions as we * can't stream them individually anyway, and we only pick toplevel @@ -3174,22 +3235,27 @@ ReorderBufferAddNewCommandId(ReorderBuffer *rb, TransactionId xid, static void ReorderBufferChangeMemoryUpdate(ReorderBuffer *rb, ReorderBufferChange *change, + ReorderBufferTXN *txn, bool addition, Size sz) { - ReorderBufferTXN *txn; ReorderBufferTXN *toptxn; - Assert(change->txn); + Assert(txn || change); /* * Ignore tuple CID changes, because those are not evicted when reaching * memory limit. So we just don't count them, because it might easily * trigger a pointless attempt to spill. */ - if (change->action == REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID) + if (change && change->action == REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID) return; - txn = change->txn; + if (sz == 0) + return; + + if (txn == NULL) + txn = change->txn; + Assert(txn != NULL); /* * Update the total size in top level as well. This is later used to @@ -3204,6 +3270,15 @@ ReorderBufferChangeMemoryUpdate(ReorderBuffer *rb, /* Update the total size in the top transaction. */ toptxn->total_size += sz; + + /* Update the max-heap as well if necessary */ + if (ReorderBufferMaxHeapIsReady(rb)) + { + if ((txn->size - sz) == 0) + binaryheap_add(rb->txn_heap, PointerGetDatum(txn)); + else + binaryheap_update_up(rb->txn_heap, PointerGetDatum(txn)); + } } else { @@ -3213,6 +3288,15 @@ ReorderBufferChangeMemoryUpdate(ReorderBuffer *rb, /* Update the total size in the top transaction. */ toptxn->total_size -= sz; + + /* Update the max-heap as well if necessary */ + if (ReorderBufferMaxHeapIsReady(rb)) + { + if (txn->size == 0) + binaryheap_remove_node_ptr(rb->txn_heap, PointerGetDatum(txn)); + else + binaryheap_update_down(rb->txn_heap, PointerGetDatum(txn)); + } } Assert(txn->size <= rb->size); @@ -3468,34 +3552,123 @@ ReorderBufferSerializeReserve(ReorderBuffer *rb, Size sz) } } + +/* Compare two transactions by size */ +static int +ReorderBufferTXNSizeCompare(Datum a, Datum b, void *arg) +{ + ReorderBufferTXN *ta = (ReorderBufferTXN *) DatumGetPointer(a); + ReorderBufferTXN *tb = (ReorderBufferTXN *) DatumGetPointer(b); + + if (ta->size < tb->size) + return -1; + if (ta->size > tb->size) + return 1; + return 0; +} + /* - * Find the largest transaction (toplevel or subxact) to evict (spill to disk). - * - * XXX With many subtransactions this might be quite slow, because we'll have - * to walk through all of them. There are some options how we could improve - * that: (a) maintain some secondary structure with transactions sorted by - * amount of changes, (b) not looking for the entirely largest transaction, - * but e.g. for transaction using at least some fraction of the memory limit, - * and (c) evicting multiple transactions at once, e.g. to free a given portion - * of the memory limit (e.g. 50%). + * Build the max-heap. The heap assembly step is deferred until the end, for + * efficiency. */ -static ReorderBufferTXN * -ReorderBufferLargestTXN(ReorderBuffer *rb) +static void +ReorderBufferBuildMaxHeap(ReorderBuffer *rb) { HASH_SEQ_STATUS hash_seq; ReorderBufferTXNByIdEnt *ent; - ReorderBufferTXN *largest = NULL; + + Assert(binaryheap_empty(rb->txn_heap)); hash_seq_init(&hash_seq, rb->by_txn); while ((ent = hash_seq_search(&hash_seq)) != NULL) { ReorderBufferTXN *txn = ent->txn; - /* if the current transaction is larger, remember it */ - if ((!largest) || (txn->size > largest->size)) - largest = txn; + if (txn->size == 0) + continue; + + binaryheap_add_unordered(rb->txn_heap, PointerGetDatum(txn)); } + binaryheap_build(rb->txn_heap); +} + +/* + * Reset the max-heap if the number of transactions got lower than the + * threshold. + */ +static void +ReorderBufferMaybeResetMaxHeap(ReorderBuffer *rb) +{ + /* + * If we add and remove transactions right around the threshold, we could + * easily end up "thrashing". To avoid it, we adapt 10% of transactions to + * reset the max-heap. + */ + if (ReorderBufferMaxHeapIsReady(rb) && + binaryheap_size(rb->txn_heap) < MAX_HEAP_TXN_COUNT_THRESHOLD * 0.9) + binaryheap_reset(rb->txn_heap); +} + +/* + * Find the largest transaction (toplevel or subxact) to evict (spill to disk) + * by doing a linear search or using the max-heap depending on the number of + * transactions in ReorderBuffer. Refer to the comments atop this file for the + * algorithm details. + */ +static ReorderBufferTXN * +ReorderBufferLargestTXN(ReorderBuffer *rb) +{ + ReorderBufferTXN *largest = NULL; + + if (!ReorderBufferMaxHeapIsReady(rb)) + { + /* + * If the number of transactions are small, we scan all transactions + * being decoded to get the largest transaction. This saves the cost + * of building a max-heap with a small number of transactions. + */ + if (hash_get_num_entries(rb->by_txn) < MAX_HEAP_TXN_COUNT_THRESHOLD) + { + HASH_SEQ_STATUS hash_seq; + ReorderBufferTXNByIdEnt *ent; + + hash_seq_init(&hash_seq, rb->by_txn); + while ((ent = hash_seq_search(&hash_seq)) != NULL) + { + ReorderBufferTXN *txn = ent->txn; + + /* if the current transaction is larger, remember it */ + if ((!largest) || (txn->size > largest->size)) + largest = txn; + } + + Assert(largest); + } + else + { + /* + * There are a large number of transactions in ReorderBuffer. We + * build the max-heap for efficiently selecting the largest + * transactions. + */ + ReorderBufferBuildMaxHeap(rb); + + /* + * The max-heap is ready now. We remain the max-heap at least + * until we free up enough transactions to bring the total memory + * usage below the limit. The largest transaction is selected + * below. + */ + Assert(ReorderBufferMaxHeapIsReady(rb)); + } + } + + /* Get the largest transaction from the max-heap */ + if (ReorderBufferMaxHeapIsReady(rb)) + largest = (ReorderBufferTXN *) + DatumGetPointer(binaryheap_first(rb->txn_heap)); + Assert(largest); Assert(largest->size > 0); Assert(largest->size <= rb->size); @@ -3638,6 +3811,13 @@ ReorderBufferCheckMemoryLimit(ReorderBuffer *rb) /* We must be under the memory limit now. */ Assert(rb->size < logical_decoding_work_mem * 1024L); + + /* + * After evicting some transactions, the number of transactions might get + * lower than the threshold for the max-heap. + */ + ReorderBufferMaybeResetMaxHeap(rb); + } /* @@ -3705,11 +3885,14 @@ ReorderBufferSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn) ReorderBufferSerializeChange(rb, txn, fd, change); dlist_delete(&change->node); - ReorderBufferReturnChange(rb, change, true); + ReorderBufferReturnChange(rb, change, false); spilled++; } + /* Update the memory counter */ + ReorderBufferChangeMemoryUpdate(rb, NULL, txn, false, size); + /* update the statistics iff we have spilled anything */ if (spilled) { @@ -4491,7 +4674,7 @@ ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn, * update the accounting too (subtracting the size from the counters). And * we don't want to underflow there. */ - ReorderBufferChangeMemoryUpdate(rb, change, true, + ReorderBufferChangeMemoryUpdate(rb, change, NULL, true, ReorderBufferChangeSize(change)); } @@ -4903,9 +5086,9 @@ ReorderBufferToastReplace(ReorderBuffer *rb, ReorderBufferTXN *txn, MemoryContextSwitchTo(oldcontext); /* subtract the old change size */ - ReorderBufferChangeMemoryUpdate(rb, change, false, old_size); + ReorderBufferChangeMemoryUpdate(rb, change, NULL, false, old_size); /* now add the change back, with the correct size */ - ReorderBufferChangeMemoryUpdate(rb, change, true, + ReorderBufferChangeMemoryUpdate(rb, change, NULL, true, ReorderBufferChangeSize(change)); } diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h index 0b2c95f7aa0..a5aec01c2f0 100644 --- a/src/include/replication/reorderbuffer.h +++ b/src/include/replication/reorderbuffer.h @@ -10,6 +10,7 @@ #define REORDERBUFFER_H #include "access/htup_details.h" +#include "lib/binaryheap.h" #include "lib/ilist.h" #include "storage/sinval.h" #include "utils/hsearch.h" @@ -631,6 +632,9 @@ struct ReorderBuffer /* memory accounting */ Size size; + /* Max-heap for sizes of all top-level and sub transactions */ + binaryheap *txn_heap; + /* * Statistics about transactions spilled to disk. *