From 9f6dadc679cdc0da7c005d005a06dad6e81020ad Mon Sep 17 00:00:00 2001
From: Bertrand Drouvot <bertranddrouvot.pg@gmail.com>
Date: Tue, 25 Feb 2025 10:18:05 +0000
Subject: [PATCH v7] Flush the IO statistics of active walsenders

The walsender does not flush its IO statistics until it exits.
The issue is there since pg_stat_io has been introduced in a9c70b46dbe.
This commits:

1. ensures it does not wait to exit to flush its IO statistics
2. flush its IO statistics periodically to not overload the walsender
3. adds a test for a physical walsender and a test for a logical walsender
---
 src/backend/replication/walsender.c   | 36 +++++++++++++++++++++++++--
 src/test/recovery/t/001_stream_rep.pl | 15 +++++++++++
 2 files changed, 49 insertions(+), 2 deletions(-)

diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 1028919aecb1..216baeda5cd2 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -91,10 +91,14 @@
 #include "utils/guc.h"
 #include "utils/memutils.h"
 #include "utils/pg_lsn.h"
+#include "utils/pgstat_internal.h"
 #include "utils/ps_status.h"
 #include "utils/timeout.h"
 #include "utils/timestamp.h"
 
+/* Minimum interval used by walsender for stats flushes, in ms */
+#define WALSENDER_STATS_FLUSH_INTERVAL         1000
+
 /*
  * Maximum data payload in a WAL data message.  Must be >= XLOG_BLCKSZ.
  *
@@ -1797,6 +1801,7 @@ WalSndWaitForWal(XLogRecPtr loc)
 	int			wakeEvents;
 	uint32		wait_event = 0;
 	static XLogRecPtr RecentFlushPtr = InvalidXLogRecPtr;
+	TimestampTz last_flush = 0;
 
 	/*
 	 * Fast path to avoid acquiring the spinlock in case we already know we
@@ -1817,6 +1822,7 @@ WalSndWaitForWal(XLogRecPtr loc)
 	{
 		bool		wait_for_standby_at_stop = false;
 		long		sleeptime;
+		TimestampTz now;
 
 		/* Clear any already-pending wakeups */
 		ResetLatch(MyLatch);
@@ -1927,7 +1933,8 @@ WalSndWaitForWal(XLogRecPtr loc)
 		 * new WAL to be generated.  (But if we have nothing to send, we don't
 		 * want to wake on socket-writable.)
 		 */
-		sleeptime = WalSndComputeSleeptime(GetCurrentTimestamp());
+		now = GetCurrentTimestamp();
+		sleeptime = WalSndComputeSleeptime(now);
 
 		wakeEvents = WL_SOCKET_READABLE;
 
@@ -1936,6 +1943,15 @@ WalSndWaitForWal(XLogRecPtr loc)
 
 		Assert(wait_event != 0);
 
+		/* Report IO statistics, if needed */
+		if (TimestampDifferenceExceeds(last_flush, now,
+									   WALSENDER_STATS_FLUSH_INTERVAL))
+		{
+			pgstat_flush_io(false);
+			(void) pgstat_flush_backend(false, PGSTAT_BACKEND_FLUSH_IO);
+			last_flush = now;
+		}
+
 		WalSndWait(wakeEvents, sleeptime, wait_event);
 	}
 
@@ -2742,6 +2758,8 @@ WalSndCheckTimeOut(void)
 static void
 WalSndLoop(WalSndSendDataCallback send_data)
 {
+	TimestampTz last_flush = 0;
+
 	/*
 	 * Initialize the last reply timestamp. That enables timeout processing
 	 * from hereon.
@@ -2836,6 +2854,9 @@ WalSndLoop(WalSndSendDataCallback send_data)
 		 * WalSndWaitForWal() handle any other blocking; idle receivers need
 		 * its additional actions.  For physical replication, also block if
 		 * caught up; its send_data does not block.
+		 *
+		 * The IO statistics are reported in WalSndWaitForWal() for the
+		 * logical WAL senders.
 		 */
 		if ((WalSndCaughtUp && send_data != XLogSendLogical &&
 			 !streamingDoneSending) ||
@@ -2843,6 +2864,7 @@ WalSndLoop(WalSndSendDataCallback send_data)
 		{
 			long		sleeptime;
 			int			wakeEvents;
+			TimestampTz now;
 
 			if (!streamingDoneReceiving)
 				wakeEvents = WL_SOCKET_READABLE;
@@ -2853,11 +2875,21 @@ WalSndLoop(WalSndSendDataCallback send_data)
 			 * Use fresh timestamp, not last_processing, to reduce the chance
 			 * of reaching wal_sender_timeout before sending a keepalive.
 			 */
-			sleeptime = WalSndComputeSleeptime(GetCurrentTimestamp());
+			now = GetCurrentTimestamp();
+			sleeptime = WalSndComputeSleeptime(now);
 
 			if (pq_is_send_pending())
 				wakeEvents |= WL_SOCKET_WRITEABLE;
 
+			/* Report IO statistics, if needed */
+			if (TimestampDifferenceExceeds(last_flush, now,
+										   WALSENDER_STATS_FLUSH_INTERVAL))
+			{
+				pgstat_flush_io(false);
+				(void) pgstat_flush_backend(false, PGSTAT_BACKEND_FLUSH_IO);
+				last_flush = now;
+			}
+
 			/* Sleep until something happens or we time out */
 			WalSndWait(wakeEvents, sleeptime, WAIT_EVENT_WAL_SENDER_MAIN);
 		}
diff --git a/src/test/recovery/t/001_stream_rep.pl b/src/test/recovery/t/001_stream_rep.pl
index ccd8417d449f..e55d8ec0ec17 100644
--- a/src/test/recovery/t/001_stream_rep.pl
+++ b/src/test/recovery/t/001_stream_rep.pl
@@ -42,6 +42,9 @@ $node_standby_2->init_from_backup($node_standby_1, $backup_name,
 	has_streaming => 1);
 $node_standby_2->start;
 
+# Reset IO statistics, for the WAL sender check with pg_stat_io.
+$node_primary->safe_psql('postgres', "SELECT pg_stat_reset_shared('io')");
+
 # Create some content on primary and check its presence in standby nodes
 $node_primary->safe_psql('postgres',
 	"CREATE TABLE tab_int AS SELECT generate_series(1,1002) AS a");
@@ -333,6 +336,18 @@ $node_primary->psql(
 
 note "switching to physical replication slot";
 
+# Wait for the walsender to update its IO statistics.  This is done before
+# the next restart and far enough from the reset done above.
+$node_primary->poll_query_until(
+	'postgres',
+	qq[SELECT sum(reads) > 0
+       FROM pg_catalog.pg_stat_io
+       WHERE backend_type = 'walsender'
+       AND object = 'wal']
+  )
+  or die
+  "Timed out while waiting for the walsender to update its IO statistics";
+
 # Switch to using a physical replication slot. We can do this without a new
 # backup since physical slots can go backwards if needed. Do so on both
 # standbys. Since we're going to be testing things that affect the slot state,
-- 
2.49.0

