diff --git a/contrib/test_decoding/Makefile b/contrib/test_decoding/Makefile
index c5e28ce5cc..9a31e0b879 100644
--- a/contrib/test_decoding/Makefile
+++ b/contrib/test_decoding/Makefile
@@ -17,6 +17,8 @@ ISOLATION_OPTS = --temp-config $(top_srcdir)/contrib/test_decoding/logical.conf
# typical installcheck users do not have (e.g. buildfarm clients).
NO_INSTALLCHECK = 1
+TAP_TESTS = 1
+
ifdef USE_PGXS
PG_CONFIG = pg_config
PGXS := $(shell $(PG_CONFIG) --pgxs)
diff --git a/contrib/test_decoding/expected/stats.out b/contrib/test_decoding/expected/stats.out
index bca36fa903..bc8e601eab 100644
--- a/contrib/test_decoding/expected/stats.out
+++ b/contrib/test_decoding/expected/stats.out
@@ -8,7 +8,7 @@ SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_d
CREATE TABLE stats_test(data text);
-- function to wait for counters to advance
-CREATE FUNCTION wait_for_decode_stats(check_reset bool) RETURNS void AS $$
+CREATE FUNCTION wait_for_decode_stats(check_reset bool, check_spill_txns bool) RETURNS void AS $$
DECLARE
start_time timestamptz := clock_timestamp();
updated bool;
@@ -16,12 +16,25 @@ BEGIN
-- we don't want to wait forever; loop will exit after 30 seconds
FOR i IN 1 .. 300 LOOP
- -- check to see if all updates have been reset/updated
- SELECT CASE WHEN check_reset THEN (spill_txns = 0)
- ELSE (spill_txns > 0)
- END
- INTO updated
- FROM pg_stat_replication_slots WHERE slot_name='regression_slot';
+ IF check_spill_txns THEN
+
+ -- check to see if all updates have been reset/updated
+ SELECT CASE WHEN check_reset THEN (spill_txns = 0)
+ ELSE (spill_txns > 0)
+ END
+ INTO updated
+ FROM pg_stat_replication_slots WHERE slot_name='regression_slot';
+
+ ELSE
+
+ -- check to see if all updates have been reset/updated
+ SELECT CASE WHEN check_reset THEN (total_txns = 0)
+ ELSE (total_txns > 0)
+ END
+ INTO updated
+ FROM pg_stat_replication_slots WHERE slot_name='regression_slot';
+
+ END IF;
exit WHEN updated;
@@ -51,16 +64,16 @@ SELECT count(*) FROM pg_logical_slot_peek_changes('regression_slot', NULL, NULL,
-- Check stats, wait for the stats collector to update. We can't test the
-- exact stats count as that can vary if any background transaction (say by
-- autovacuum) happens in parallel to the main transaction.
-SELECT wait_for_decode_stats(false);
+SELECT wait_for_decode_stats(false, true);
wait_for_decode_stats
-----------------------
(1 row)
-SELECT slot_name, spill_txns > 0 AS spill_txns, spill_count > 0 AS spill_count FROM pg_stat_replication_slots;
- slot_name | spill_txns | spill_count
------------------+------------+-------------
- regression_slot | t | t
+SELECT slot_name, spill_txns > 0 AS spill_txns, spill_count > 0 AS spill_count, total_txns > 0 AS total_txns, total_bytes > 0 AS total_bytes FROM pg_stat_replication_slots;
+ slot_name | spill_txns | spill_count | total_txns | total_bytes
+-----------------+------------+-------------+------------+-------------
+ regression_slot | t | t | t | t
(1 row)
-- reset the slot stats, and wait for stats collector to reset
@@ -70,16 +83,16 @@ SELECT pg_stat_reset_replication_slot('regression_slot');
(1 row)
-SELECT wait_for_decode_stats(true);
+SELECT wait_for_decode_stats(true, true);
wait_for_decode_stats
-----------------------
(1 row)
-SELECT slot_name, spill_txns, spill_count FROM pg_stat_replication_slots;
- slot_name | spill_txns | spill_count
------------------+------------+-------------
- regression_slot | 0 | 0
+SELECT slot_name, spill_txns, spill_count, total_txns, total_bytes FROM pg_stat_replication_slots;
+ slot_name | spill_txns | spill_count | total_txns | total_bytes
+-----------------+------------+-------------+------------+-------------
+ regression_slot | 0 | 0 | 0 | 0
(1 row)
-- decode and check stats again.
@@ -89,16 +102,36 @@ SELECT count(*) FROM pg_logical_slot_peek_changes('regression_slot', NULL, NULL,
5002
(1 row)
-SELECT wait_for_decode_stats(false);
+SELECT wait_for_decode_stats(false, true);
wait_for_decode_stats
-----------------------
(1 row)
-SELECT slot_name, spill_txns > 0 AS spill_txns, spill_count > 0 AS spill_count FROM pg_stat_replication_slots;
- slot_name | spill_txns | spill_count
------------------+------------+-------------
- regression_slot | t | t
+SELECT slot_name, spill_txns > 0 AS spill_txns, spill_count > 0 AS spill_count, total_txns > 0 AS total_txns, total_bytes > 0 AS total_bytes FROM pg_stat_replication_slots;
+ slot_name | spill_txns | spill_count | total_txns | total_bytes
+-----------------+------------+-------------+------------+-------------
+ regression_slot | t | t | t | t
+(1 row)
+
+SELECT pg_stat_reset_replication_slot('regression_slot');
+ pg_stat_reset_replication_slot
+--------------------------------
+
+(1 row)
+
+-- non-spilled xact
+INSERT INTO stats_test values(generate_series(1, 10));
+SELECT wait_for_decode_stats(false, false);
+ wait_for_decode_stats
+-----------------------
+
+(1 row)
+
+SELECT slot_name, spill_txns = 0 AS spill_txns, spill_count = 0 AS spill_count, total_txns > 0 AS total_txns, total_bytes > 0 AS total_bytes FROM pg_stat_replication_slots;
+ slot_name | spill_txns | spill_count | total_txns | total_bytes
+-----------------+------------+-------------+------------+-------------
+ regression_slot | f | f | t | t
(1 row)
-- Ensure stats can be repeatedly accessed using the same stats snapshot. See
@@ -117,7 +150,7 @@ SELECT slot_name FROM pg_stat_replication_slots;
(1 row)
COMMIT;
-DROP FUNCTION wait_for_decode_stats(bool);
+DROP FUNCTION wait_for_decode_stats(bool, bool);
DROP TABLE stats_test;
SELECT pg_drop_replication_slot('regression_slot');
pg_drop_replication_slot
diff --git a/contrib/test_decoding/sql/stats.sql b/contrib/test_decoding/sql/stats.sql
index 51294e48e8..8c34aeced1 100644
--- a/contrib/test_decoding/sql/stats.sql
+++ b/contrib/test_decoding/sql/stats.sql
@@ -6,7 +6,7 @@ SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_d
CREATE TABLE stats_test(data text);
-- function to wait for counters to advance
-CREATE FUNCTION wait_for_decode_stats(check_reset bool) RETURNS void AS $$
+CREATE FUNCTION wait_for_decode_stats(check_reset bool, check_spill_txns bool) RETURNS void AS $$
DECLARE
start_time timestamptz := clock_timestamp();
updated bool;
@@ -14,12 +14,25 @@ BEGIN
-- we don't want to wait forever; loop will exit after 30 seconds
FOR i IN 1 .. 300 LOOP
- -- check to see if all updates have been reset/updated
- SELECT CASE WHEN check_reset THEN (spill_txns = 0)
- ELSE (spill_txns > 0)
- END
- INTO updated
- FROM pg_stat_replication_slots WHERE slot_name='regression_slot';
+ IF check_spill_txns THEN
+
+ -- check to see if all updates have been reset/updated
+ SELECT CASE WHEN check_reset THEN (spill_txns = 0)
+ ELSE (spill_txns > 0)
+ END
+ INTO updated
+ FROM pg_stat_replication_slots WHERE slot_name='regression_slot';
+
+ ELSE
+
+ -- check to see if all updates have been reset/updated
+ SELECT CASE WHEN check_reset THEN (total_txns = 0)
+ ELSE (total_txns > 0)
+ END
+ INTO updated
+ FROM pg_stat_replication_slots WHERE slot_name='regression_slot';
+
+ END IF;
exit WHEN updated;
@@ -46,18 +59,25 @@ SELECT count(*) FROM pg_logical_slot_peek_changes('regression_slot', NULL, NULL,
-- Check stats, wait for the stats collector to update. We can't test the
-- exact stats count as that can vary if any background transaction (say by
-- autovacuum) happens in parallel to the main transaction.
-SELECT wait_for_decode_stats(false);
-SELECT slot_name, spill_txns > 0 AS spill_txns, spill_count > 0 AS spill_count FROM pg_stat_replication_slots;
+SELECT wait_for_decode_stats(false, true);
+SELECT slot_name, spill_txns > 0 AS spill_txns, spill_count > 0 AS spill_count, total_txns > 0 AS total_txns, total_bytes > 0 AS total_bytes FROM pg_stat_replication_slots;
-- reset the slot stats, and wait for stats collector to reset
SELECT pg_stat_reset_replication_slot('regression_slot');
-SELECT wait_for_decode_stats(true);
-SELECT slot_name, spill_txns, spill_count FROM pg_stat_replication_slots;
+SELECT wait_for_decode_stats(true, true);
+SELECT slot_name, spill_txns, spill_count, total_txns, total_bytes FROM pg_stat_replication_slots;
-- decode and check stats again.
SELECT count(*) FROM pg_logical_slot_peek_changes('regression_slot', NULL, NULL, 'skip-empty-xacts', '1');
-SELECT wait_for_decode_stats(false);
-SELECT slot_name, spill_txns > 0 AS spill_txns, spill_count > 0 AS spill_count FROM pg_stat_replication_slots;
+SELECT wait_for_decode_stats(false, true);
+SELECT slot_name, spill_txns > 0 AS spill_txns, spill_count > 0 AS spill_count, total_txns > 0 AS total_txns, total_bytes > 0 AS total_bytes FROM pg_stat_replication_slots;
+
+SELECT pg_stat_reset_replication_slot('regression_slot');
+
+-- non-spilled xact
+INSERT INTO stats_test values(generate_series(1, 10));
+SELECT wait_for_decode_stats(false, false);
+SELECT slot_name, spill_txns = 0 AS spill_txns, spill_count = 0 AS spill_count, total_txns > 0 AS total_txns, total_bytes > 0 AS total_bytes FROM pg_stat_replication_slots;
-- Ensure stats can be repeatedly accessed using the same stats snapshot. See
-- https://postgr.es/m/20210317230447.c7uc4g3vbs4wi32i%40alap3.anarazel.de
@@ -66,6 +86,6 @@ SELECT slot_name FROM pg_stat_replication_slots;
SELECT slot_name FROM pg_stat_replication_slots;
COMMIT;
-DROP FUNCTION wait_for_decode_stats(bool);
+DROP FUNCTION wait_for_decode_stats(bool, bool);
DROP TABLE stats_test;
SELECT pg_drop_replication_slot('regression_slot');
diff --git a/contrib/test_decoding/t/001_repl_stats.pl b/contrib/test_decoding/t/001_repl_stats.pl
new file mode 100644
index 0000000000..11b6cd9b9c
--- /dev/null
+++ b/contrib/test_decoding/t/001_repl_stats.pl
@@ -0,0 +1,76 @@
+# Test replication statistics data in pg_stat_replication_slots is sane after
+# drop replication slot and restart.
+use strict;
+use warnings;
+use PostgresNode;
+use TestLib;
+use Test::More tests => 1;
+
+# Test set-up
+my $node = get_new_node('test');
+$node->init(allows_streaming => 'logical');
+$node->append_conf('postgresql.conf', 'synchronous_commit = on');
+$node->start;
+
+# Create table.
+$node->safe_psql('postgres',
+ "CREATE TABLE test_repl_stat(col1 int)");
+
+# Create replication slots.
+$node->safe_psql(
+ 'postgres', qq[
+ SELECT pg_create_logical_replication_slot('regression_slot1', 'test_decoding');
+ SELECT pg_create_logical_replication_slot('regression_slot2', 'test_decoding');
+ SELECT pg_create_logical_replication_slot('regression_slot3', 'test_decoding');
+ SELECT pg_create_logical_replication_slot('regression_slot4', 'test_decoding');
+]);
+
+# Insert some data.
+$node->safe_psql('postgres', "INSERT INTO test_repl_stat values(generate_series(1, 5));");
+
+$node->safe_psql(
+ 'postgres', qq[
+ SELECT data FROM pg_logical_slot_get_changes('regression_slot1', NULL,
+ NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+ SELECT data FROM pg_logical_slot_get_changes('regression_slot2', NULL,
+ NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+ SELECT data FROM pg_logical_slot_get_changes('regression_slot3', NULL,
+ NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+ SELECT data FROM pg_logical_slot_get_changes('regression_slot4', NULL,
+ NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+]);
+
+# Wait for the statistics to be updated.
+$node->poll_query_until(
+ 'postgres', qq[
+ SELECT count(slot_name) >= 4 FROM pg_stat_replication_slots
+ WHERE slot_name ~ 'regression_slot'
+ AND total_txns > 0 AND total_bytes > 0;
+]) or die "Timed out while waiting for statistics to be updated";
+
+# Test to drop one of the replication slot and verify replication statistics data is
+# fine after restart.
+$node->safe_psql('postgres', "SELECT pg_drop_replication_slot('regression_slot4')");
+
+$node->stop;
+$node->start;
+
+# Verify statistics data present in pg_stat_replication_slots are sane after
+# restart.
+my $result = $node->safe_psql('postgres',
+ "SELECT slot_name, total_txns > 0 AS total_txn,
+ total_bytes > 0 AS total_bytes FROM pg_stat_replication_slots
+ ORDER BY slot_name"
+);
+is($result, qq(regression_slot1|t|t
+regression_slot2|t|t
+regression_slot3|t|t), 'check replication statistics are updated');
+
+# cleanup
+$node->safe_psql('postgres', "DROP TABLE test_repl_stat");
+$node->safe_psql('postgres', "SELECT pg_drop_replication_slot('regression_slot1')");
+$node->safe_psql('postgres', "SELECT pg_drop_replication_slot('regression_slot2')");
+$node->safe_psql('postgres', "SELECT pg_drop_replication_slot('regression_slot3')");
+
+# shutdown
+$node->stop;
diff --git a/doc/src/sgml/monitoring.sgml b/doc/src/sgml/monitoring.sgml
index 8287587f61..c44d087508 100644
--- a/doc/src/sgml/monitoring.sgml
+++ b/doc/src/sgml/monitoring.sgml
@@ -2716,6 +2716,31 @@ SELECT pid, wait_event_type, wait_event FROM pg_stat_activity WHERE wait_event i
+
+
+ total_txns bigint
+
+
+ Number of decoded transactions sent to the decoding output plugin for
+ this slot. This counter is used to maintain the top level transactions,
+ so the counter is not incremented for subtransactions. Note that this
+ includes the transactions that are streamed and/or spilled.
+
+
+
+
+
+ total_bytesbigint
+
+
+ Amount of decoded transactions data sent to the decoding output plugin
+ while decoding the changes from WAL for this slot. This can be used to
+ gauge the total amount of data sent during logical decoding. Note that
+ this includes the data that is streamed and/or spilled.
+
+
+
+
stats_reset timestamp with time zone
diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql
index 451db2ee0a..6d78b33590 100644
--- a/src/backend/catalog/system_views.sql
+++ b/src/backend/catalog/system_views.sql
@@ -875,6 +875,8 @@ CREATE VIEW pg_stat_replication_slots AS
s.stream_txns,
s.stream_count,
s.stream_bytes,
+ s.total_txns,
+ s.total_bytes,
s.stats_reset
FROM pg_stat_get_replication_slots() AS s;
diff --git a/src/backend/postmaster/pgstat.c b/src/backend/postmaster/pgstat.c
index 666ce95d08..e1ec7d8b7d 100644
--- a/src/backend/postmaster/pgstat.c
+++ b/src/backend/postmaster/pgstat.c
@@ -1829,6 +1829,8 @@ pgstat_report_replslot(const PgStat_ReplSlotStats *repSlotStat)
msg.m_stream_txns = repSlotStat->stream_txns;
msg.m_stream_count = repSlotStat->stream_count;
msg.m_stream_bytes = repSlotStat->stream_bytes;
+ msg.m_total_txns = repSlotStat->total_txns;
+ msg.m_total_bytes = repSlotStat->total_bytes;
pgstat_send(&msg, sizeof(PgStat_MsgReplSlot));
}
@@ -5568,6 +5570,8 @@ pgstat_recv_replslot(PgStat_MsgReplSlot *msg, int len)
replSlotStats[idx].stream_txns += msg->m_stream_txns;
replSlotStats[idx].stream_count += msg->m_stream_count;
replSlotStats[idx].stream_bytes += msg->m_stream_bytes;
+ replSlotStats[idx].total_txns += msg->m_total_txns;
+ replSlotStats[idx].total_bytes += msg->m_total_bytes;
}
}
@@ -5795,6 +5799,8 @@ pgstat_reset_replslot(int i, TimestampTz ts)
replSlotStats[i].stream_txns = 0;
replSlotStats[i].stream_count = 0;
replSlotStats[i].stream_bytes = 0;
+ replSlotStats[i].total_txns = 0;
+ replSlotStats[i].total_bytes = 0;
replSlotStats[i].stat_reset_timestamp = ts;
}
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index 68e210ce12..35b0c67641 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -1775,21 +1775,20 @@ UpdateDecodingStats(LogicalDecodingContext *ctx)
ReorderBuffer *rb = ctx->reorder;
PgStat_ReplSlotStats repSlotStat;
- /*
- * Nothing to do if we haven't spilled or streamed anything since the last
- * time the stats has been sent.
- */
- if (rb->spillBytes <= 0 && rb->streamBytes <= 0)
+ /* Nothing to do if we don't have any replication stats to be sent. */
+ if (rb->spillBytes <= 0 && rb->streamBytes <= 0 && rb->totalBytes <= 0)
return;
- elog(DEBUG2, "UpdateDecodingStats: updating stats %p %lld %lld %lld %lld %lld %lld",
+ elog(DEBUG2, "UpdateDecodingStats: updating stats %p %lld %lld %lld %lld %lld %lld %lld %lld",
rb,
(long long) rb->spillTxns,
(long long) rb->spillCount,
(long long) rb->spillBytes,
(long long) rb->streamTxns,
(long long) rb->streamCount,
- (long long) rb->streamBytes);
+ (long long) rb->streamBytes,
+ (long long) rb->totalTxns,
+ (long long) rb->totalBytes);
namestrcpy(&repSlotStat.slotname, NameStr(ctx->slot->data.name));
repSlotStat.spill_txns = rb->spillTxns;
@@ -1798,12 +1797,17 @@ UpdateDecodingStats(LogicalDecodingContext *ctx)
repSlotStat.stream_txns = rb->streamTxns;
repSlotStat.stream_count = rb->streamCount;
repSlotStat.stream_bytes = rb->streamBytes;
+ repSlotStat.total_txns = rb->totalTxns;
+ repSlotStat.total_bytes = rb->totalBytes;
pgstat_report_replslot(&repSlotStat);
+
rb->spillTxns = 0;
rb->spillCount = 0;
rb->spillBytes = 0;
rb->streamTxns = 0;
rb->streamCount = 0;
rb->streamBytes = 0;
+ rb->totalTxns = 0;
+ rb->totalBytes = 0;
}
diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index 52d06285a2..5cb484f032 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -350,6 +350,8 @@ ReorderBufferAllocate(void)
buffer->streamTxns = 0;
buffer->streamCount = 0;
buffer->streamBytes = 0;
+ buffer->totalTxns = 0;
+ buffer->totalBytes = 0;
buffer->current_restart_decoding_lsn = InvalidXLogRecPtr;
@@ -1363,6 +1365,11 @@ ReorderBufferIterTXNNext(ReorderBuffer *rb, ReorderBufferIterTXNState *state)
dlist_delete(&change->node);
dlist_push_tail(&state->old_change, &change->node);
+ /*
+ * Update the total bytes processed before releasing the current set
+ * of changes and restoring the new set of changes.
+ */
+ rb->totalBytes += rb->size;
if (ReorderBufferRestoreChanges(rb, entry->txn, &entry->file,
&state->entries[off].segno))
{
@@ -2363,6 +2370,20 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn,
ReorderBufferIterTXNFinish(rb, iterstate);
iterstate = NULL;
+ /*
+ * Update total transaction count and total transaction bytes
+ * processed. Ensure to not count the streamed transaction multiple
+ * times.
+ *
+ * Note that the statistics computation has to be done after
+ * ReorderBufferIterTXNFinish as it releases the serialized change
+ * which we have already accounted in ReorderBufferIterTXNNext.
+ */
+ if (!rbtxn_is_streamed(txn))
+ rb->totalTxns++;
+
+ rb->totalBytes += rb->size;
+
/*
* Done with current changes, send the last message for this set of
* changes depending upon streaming mode.
diff --git a/src/backend/utils/adt/pgstatfuncs.c b/src/backend/utils/adt/pgstatfuncs.c
index 521ba73614..2680190a40 100644
--- a/src/backend/utils/adt/pgstatfuncs.c
+++ b/src/backend/utils/adt/pgstatfuncs.c
@@ -2284,7 +2284,7 @@ pg_stat_get_archiver(PG_FUNCTION_ARGS)
Datum
pg_stat_get_replication_slots(PG_FUNCTION_ARGS)
{
-#define PG_STAT_GET_REPLICATION_SLOT_COLS 8
+#define PG_STAT_GET_REPLICATION_SLOT_COLS 10
ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
TupleDesc tupdesc;
Tuplestorestate *tupstore;
@@ -2335,11 +2335,13 @@ pg_stat_get_replication_slots(PG_FUNCTION_ARGS)
values[4] = Int64GetDatum(s->stream_txns);
values[5] = Int64GetDatum(s->stream_count);
values[6] = Int64GetDatum(s->stream_bytes);
+ values[7] = Int64GetDatum(s->total_txns);
+ values[8] = Int64GetDatum(s->total_bytes);
if (s->stat_reset_timestamp == 0)
- nulls[7] = true;
+ nulls[9] = true;
else
- values[7] = TimestampTzGetDatum(s->stat_reset_timestamp);
+ values[9] = TimestampTzGetDatum(s->stat_reset_timestamp);
tuplestore_putvalues(tupstore, tupdesc, values, nulls);
}
diff --git a/src/include/catalog/catversion.h b/src/include/catalog/catversion.h
index 87e9596da5..904b0c97ec 100644
--- a/src/include/catalog/catversion.h
+++ b/src/include/catalog/catversion.h
@@ -53,6 +53,6 @@
*/
/* yyyymmddN */
-#define CATALOG_VERSION_NO 202104151
+#define CATALOG_VERSION_NO 202104161
#endif
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index f4957653ae..591753fe81 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -5315,9 +5315,9 @@
proname => 'pg_stat_get_replication_slots', prorows => '10',
proisstrict => 'f', proretset => 't', provolatile => 's', proparallel => 'r',
prorettype => 'record', proargtypes => '',
- proallargtypes => '{text,int8,int8,int8,int8,int8,int8,timestamptz}',
- proargmodes => '{o,o,o,o,o,o,o,o}',
- proargnames => '{slot_name,spill_txns,spill_count,spill_bytes,stream_txns,stream_count,stream_bytes,stats_reset}',
+ proallargtypes => '{text,int8,int8,int8,int8,int8,int8,int8,int8,timestamptz}',
+ proargmodes => '{o,o,o,o,o,o,o,o,o,o}',
+ proargnames => '{slot_name,spill_txns,spill_count,spill_bytes,stream_txns,stream_count,stream_bytes,total_txns,total_bytes,stats_reset}',
prosrc => 'pg_stat_get_replication_slots' },
{ oid => '6118', descr => 'statistics: information about subscription',
proname => 'pg_stat_get_subscription', prorows => '10', proisstrict => 'f',
diff --git a/src/include/pgstat.h b/src/include/pgstat.h
index 8e11215058..2aeb3cded4 100644
--- a/src/include/pgstat.h
+++ b/src/include/pgstat.h
@@ -548,6 +548,8 @@ typedef struct PgStat_MsgReplSlot
PgStat_Counter m_stream_txns;
PgStat_Counter m_stream_count;
PgStat_Counter m_stream_bytes;
+ PgStat_Counter m_total_txns;
+ PgStat_Counter m_total_bytes;
} PgStat_MsgReplSlot;
/* ----------
@@ -924,6 +926,8 @@ typedef struct PgStat_ReplSlotStats
PgStat_Counter stream_txns;
PgStat_Counter stream_count;
PgStat_Counter stream_bytes;
+ PgStat_Counter total_txns;
+ PgStat_Counter total_bytes;
TimestampTz stat_reset_timestamp;
} PgStat_ReplSlotStats;
diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h
index 565a961d6a..bfab8303ee 100644
--- a/src/include/replication/reorderbuffer.h
+++ b/src/include/replication/reorderbuffer.h
@@ -618,6 +618,13 @@ struct ReorderBuffer
int64 streamTxns; /* number of transactions streamed */
int64 streamCount; /* streaming invocation counter */
int64 streamBytes; /* amount of data streamed */
+
+ /*
+ * Statistics about all the transactions sent to the decoding output
+ * plugin
+ */
+ int64 totalTxns; /* total number of transactions sent */
+ int64 totalBytes; /* total amount of data sent */
};
diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out
index 186e6c966c..6399f3feef 100644
--- a/src/test/regress/expected/rules.out
+++ b/src/test/regress/expected/rules.out
@@ -2068,8 +2068,10 @@ pg_stat_replication_slots| SELECT s.slot_name,
s.stream_txns,
s.stream_count,
s.stream_bytes,
+ s.total_txns,
+ s.total_bytes,
s.stats_reset
- FROM pg_stat_get_replication_slots() s(slot_name, spill_txns, spill_count, spill_bytes, stream_txns, stream_count, stream_bytes, stats_reset);
+ FROM pg_stat_get_replication_slots() s(slot_name, spill_txns, spill_count, spill_bytes, stream_txns, stream_count, stream_bytes, total_txns, total_bytes, stats_reset);
pg_stat_slru| SELECT s.name,
s.blks_zeroed,
s.blks_hit,