From 9290ad198b15d6b986b855d2a58d087a54777e87 Mon Sep 17 00:00:00 2001 From: Amit Kapila Date: Sat, 16 Nov 2019 18:24:00 +0530 Subject: [PATCH] Track statistics for spilling of changes from ReorderBuffer. This adds the statistics about transactions spilled to disk from ReorderBuffer. Users can query the pg_stat_replication view to check these stats. Author: Tomas Vondra, with bug-fixes and minor changes by Dilip Kumar Reviewed-by: Amit Kapila Discussion: https://postgr.es/m/688b0b7f-2f6c-d827-c27b-216a8e3ea700@2ndquadrant.com --- doc/src/sgml/monitoring.sgml | 20 +++++++++ src/backend/catalog/system_views.sql | 5 ++- .../replication/logical/reorderbuffer.c | 12 ++++++ src/backend/replication/walsender.c | 42 ++++++++++++++++++- src/include/catalog/catversion.h | 2 +- src/include/catalog/pg_proc.dat | 6 +-- src/include/replication/reorderbuffer.h | 11 +++++ src/include/replication/walsender_private.h | 5 +++ src/test/regress/expected/rules.out | 7 +++- 9 files changed, 101 insertions(+), 9 deletions(-) diff --git a/doc/src/sgml/monitoring.sgml b/doc/src/sgml/monitoring.sgml index 901fee97ffb..a3c5f86b7e8 100644 --- a/doc/src/sgml/monitoring.sgml +++ b/doc/src/sgml/monitoring.sgml @@ -1972,6 +1972,26 @@ SELECT pid, wait_event_type, wait_event FROM pg_stat_activity WHERE wait_event i timestamp with time zone Send time of last reply message received from standby server + + spill_bytes + bigint + Amount of decoded transaction data spilled to disk. + + + spill_txns + bigint + Number of transactions spilled to disk after the memory used by + logical decoding exceeds logical_decoding_work_mem. The + counter gets incremented both for toplevel transactions and + subtransactions. + + + spill_count + bigint + Number of times transactions were spilled to disk. Transactions + may get spilled repeatedly, and this counter gets incremented on every + such invocation. + diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql index 4456fefb38d..f7800f01a6b 100644 --- a/src/backend/catalog/system_views.sql +++ b/src/backend/catalog/system_views.sql @@ -776,7 +776,10 @@ CREATE VIEW pg_stat_replication AS W.replay_lag, W.sync_priority, W.sync_state, - W.reply_time + W.reply_time, + W.spill_txns, + W.spill_count, + W.spill_bytes FROM pg_stat_get_activity(NULL) AS S JOIN pg_stat_get_wal_senders() AS W ON (S.pid = W.pid) LEFT JOIN pg_authid AS U ON (S.usesysid = U.oid); diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c index d82a5f18b0a..53affeb8772 100644 --- a/src/backend/replication/logical/reorderbuffer.c +++ b/src/backend/replication/logical/reorderbuffer.c @@ -308,6 +308,10 @@ ReorderBufferAllocate(void) buffer->outbufsize = 0; buffer->size = 0; + buffer->spillCount = 0; + buffer->spillTxns = 0; + buffer->spillBytes = 0; + buffer->current_restart_decoding_lsn = InvalidXLogRecPtr; dlist_init(&buffer->toplevel_by_lsn); @@ -2415,6 +2419,7 @@ ReorderBufferSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn) int fd = -1; XLogSegNo curOpenSegNo = 0; Size spilled = 0; + Size size = txn->size; elog(DEBUG2, "spill %u changes in XID %u to disk", (uint32) txn->nentries_mem, txn->xid); @@ -2473,6 +2478,13 @@ ReorderBufferSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn) spilled++; } + /* update the statistics */ + rb->spillCount += 1; + rb->spillBytes += size; + + /* Don't consider already serialized transaction. */ + rb->spillTxns += txn->serialized ? 0 : 1; + Assert(spilled == txn->nentries_mem); Assert(dlist_is_empty(&txn->changes)); txn->nentries_mem = 0; diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index 7f5671504f7..fa75872877e 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -248,6 +248,7 @@ static void LagTrackerWrite(XLogRecPtr lsn, TimestampTz local_flush_time); static TimeOffset LagTrackerRead(int head, XLogRecPtr lsn, TimestampTz now); static bool TransactionIdInRecentPast(TransactionId xid, uint32 epoch); +static void UpdateSpillStats(LogicalDecodingContext *ctx); static void XLogRead(WALSegmentContext *segcxt, char *buf, XLogRecPtr startptr, Size count); @@ -1261,7 +1262,8 @@ WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, /* * LogicalDecodingContext 'update_progress' callback. * - * Write the current position to the lag tracker (see XLogSendPhysical). + * Write the current position to the lag tracker (see XLogSendPhysical), + * and update the spill statistics. */ static void WalSndUpdateProgress(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid) @@ -1280,6 +1282,11 @@ WalSndUpdateProgress(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId LagTrackerWrite(lsn, now); sendTime = now; + + /* + * Update statistics about transactions that spilled to disk. + */ + UpdateSpillStats(ctx); } /* @@ -2318,6 +2325,9 @@ InitWalSenderSlot(void) walsnd->state = WALSNDSTATE_STARTUP; walsnd->latch = &MyProc->procLatch; walsnd->replyTime = 0; + walsnd->spillTxns = 0; + walsnd->spillCount = 0; + walsnd->spillBytes = 0; SpinLockRelease(&walsnd->mutex); /* don't need the lock anymore */ MyWalSnd = (WalSnd *) walsnd; @@ -3219,7 +3229,7 @@ offset_to_interval(TimeOffset offset) Datum pg_stat_get_wal_senders(PG_FUNCTION_ARGS) { -#define PG_STAT_GET_WAL_SENDERS_COLS 12 +#define PG_STAT_GET_WAL_SENDERS_COLS 15 ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo; TupleDesc tupdesc; Tuplestorestate *tupstore; @@ -3274,6 +3284,9 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS) int pid; WalSndState state; TimestampTz replyTime; + int64 spillTxns; + int64 spillCount; + int64 spillBytes; Datum values[PG_STAT_GET_WAL_SENDERS_COLS]; bool nulls[PG_STAT_GET_WAL_SENDERS_COLS]; @@ -3294,6 +3307,9 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS) applyLag = walsnd->applyLag; priority = walsnd->sync_standby_priority; replyTime = walsnd->replyTime; + spillTxns = walsnd->spillTxns; + spillCount = walsnd->spillCount; + spillBytes = walsnd->spillBytes; SpinLockRelease(&walsnd->mutex); memset(nulls, 0, sizeof(nulls)); @@ -3375,6 +3391,11 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS) nulls[11] = true; else values[11] = TimestampTzGetDatum(replyTime); + + /* spill to disk */ + values[12] = Int64GetDatum(spillTxns); + values[13] = Int64GetDatum(spillCount); + values[14] = Int64GetDatum(spillBytes); } tuplestore_putvalues(tupstore, tupdesc, values, nulls); @@ -3611,3 +3632,20 @@ LagTrackerRead(int head, XLogRecPtr lsn, TimestampTz now) Assert(time != 0); return now - time; } + +static void +UpdateSpillStats(LogicalDecodingContext *ctx) +{ + ReorderBuffer *rb = ctx->reorder; + + SpinLockAcquire(&MyWalSnd->mutex); + + MyWalSnd->spillTxns = rb->spillTxns; + MyWalSnd->spillCount = rb->spillCount; + MyWalSnd->spillBytes = rb->spillBytes; + + elog(DEBUG2, "UpdateSpillStats: updating stats %p %ld %ld %ld", + rb, rb->spillTxns, rb->spillCount, rb->spillBytes); + + SpinLockRelease(&MyWalSnd->mutex); +} diff --git a/src/include/catalog/catversion.h b/src/include/catalog/catversion.h index e3996590201..555384cd2bf 100644 --- a/src/include/catalog/catversion.h +++ b/src/include/catalog/catversion.h @@ -53,6 +53,6 @@ */ /* yyyymmddN */ -#define CATALOG_VERSION_NO 201911121 +#define CATALOG_VERSION_NO 201911211 #endif diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat index 58ea5b982b3..fa0a2a10021 100644 --- a/src/include/catalog/pg_proc.dat +++ b/src/include/catalog/pg_proc.dat @@ -5166,9 +5166,9 @@ proname => 'pg_stat_get_wal_senders', prorows => '10', proisstrict => 'f', proretset => 't', provolatile => 's', proparallel => 'r', prorettype => 'record', proargtypes => '', - proallargtypes => '{int4,text,pg_lsn,pg_lsn,pg_lsn,pg_lsn,interval,interval,interval,int4,text,timestamptz}', - proargmodes => '{o,o,o,o,o,o,o,o,o,o,o,o}', - proargnames => '{pid,state,sent_lsn,write_lsn,flush_lsn,replay_lsn,write_lag,flush_lag,replay_lag,sync_priority,sync_state,reply_time}', + proallargtypes => '{int4,text,pg_lsn,pg_lsn,pg_lsn,pg_lsn,interval,interval,interval,int4,text,timestamptz,int8,int8,int8}', + proargmodes => '{o,o,o,o,o,o,o,o,o,o,o,o,o,o,o}', + proargnames => '{pid,state,sent_lsn,write_lsn,flush_lsn,replay_lsn,write_lag,flush_lag,replay_lag,sync_priority,sync_state,reply_time,spill_txns,spill_count,spill_bytes}', prosrc => 'pg_stat_get_wal_senders' }, { oid => '3317', descr => 'statistics: information about WAL receiver', proname => 'pg_stat_get_wal_receiver', proisstrict => 'f', provolatile => 's', diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h index 7c94d920fe9..0867ee9e636 100644 --- a/src/include/replication/reorderbuffer.h +++ b/src/include/replication/reorderbuffer.h @@ -402,6 +402,17 @@ struct ReorderBuffer /* memory accounting */ Size size; + + /* + * Statistics about transactions spilled to disk. + * + * A single transaction may be spilled repeatedly, which is why we keep + * two different counters. For spilling, the transaction counter includes + * both toplevel transactions and subtransactions. + */ + int64 spillCount; /* spill-to-disk invocation counter */ + int64 spillTxns; /* number of transactions spilled to disk */ + int64 spillBytes; /* amount of data spilled to disk */ }; diff --git a/src/include/replication/walsender_private.h b/src/include/replication/walsender_private.h index 0dd6d1cf808..a6b32051ac4 100644 --- a/src/include/replication/walsender_private.h +++ b/src/include/replication/walsender_private.h @@ -80,6 +80,11 @@ typedef struct WalSnd * Timestamp of the last message received from standby. */ TimestampTz replyTime; + + /* Statistics for transactions spilled to disk. */ + int64 spillTxns; + int64 spillCount; + int64 spillBytes; } WalSnd; extern WalSnd *MyWalSnd; diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out index abe3a43cd27..c9cc5694048 100644 --- a/src/test/regress/expected/rules.out +++ b/src/test/regress/expected/rules.out @@ -1952,9 +1952,12 @@ pg_stat_replication| SELECT s.pid, w.replay_lag, w.sync_priority, w.sync_state, - w.reply_time + w.reply_time, + w.spill_txns, + w.spill_count, + w.spill_bytes FROM ((pg_stat_get_activity(NULL::integer) s(datid, pid, usesysid, application_name, state, query, wait_event_type, wait_event, xact_start, query_start, backend_start, state_change, client_addr, client_hostname, client_port, backend_xid, backend_xmin, backend_type, ssl, sslversion, sslcipher, sslbits, sslcompression, ssl_client_dn, ssl_client_serial, ssl_issuer_dn, gss_auth, gss_princ, gss_enc) - JOIN pg_stat_get_wal_senders() w(pid, state, sent_lsn, write_lsn, flush_lsn, replay_lsn, write_lag, flush_lag, replay_lag, sync_priority, sync_state, reply_time) ON ((s.pid = w.pid))) + JOIN pg_stat_get_wal_senders() w(pid, state, sent_lsn, write_lsn, flush_lsn, replay_lsn, write_lag, flush_lag, replay_lag, sync_priority, sync_state, reply_time, spill_txns, spill_count, spill_bytes) ON ((s.pid = w.pid))) LEFT JOIN pg_authid u ON ((s.usesysid = u.oid))); pg_stat_ssl| SELECT s.pid, s.ssl,