summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon Riggs2011-12-31 13:30:26 +0000
committerSimon Riggs2011-12-31 13:30:26 +0000
commit64233902d22ba42846397cb7551894217522fad4 (patch)
tree3d486d3a79b1ad543d99d726da4180e375437601
parent2ae2e9c00798685cd75ea0cc5120466bf2027b90 (diff)
Send new protocol keepalive messages to standby servers.
Allows streaming replication users to calculate transfer latency and apply delay via internal functions. No external functions yet.
-rw-r--r--doc/src/sgml/protocol.sgml48
-rw-r--r--src/backend/access/transam/xlog.c43
-rw-r--r--src/backend/replication/walreceiver.c47
-rw-r--r--src/backend/replication/walreceiverfuncs.c63
-rw-r--r--src/backend/replication/walsender.c42
-rw-r--r--src/include/access/xlog.h1
-rw-r--r--src/include/replication/walprotocol.h22
-rw-r--r--src/include/replication/walreceiver.h8
8 files changed, 258 insertions, 16 deletions
diff --git a/doc/src/sgml/protocol.sgml b/doc/src/sgml/protocol.sgml
index d6332e58cf7..71c40cc592e 100644
--- a/doc/src/sgml/protocol.sgml
+++ b/doc/src/sgml/protocol.sgml
@@ -1467,6 +1467,54 @@ The commands accepted in walsender mode are:
<variablelist>
<varlistentry>
<term>
+ Primary keepalive message (B)
+ </term>
+ <listitem>
+ <para>
+ <variablelist>
+ <varlistentry>
+ <term>
+ Byte1('k')
+ </term>
+ <listitem>
+ <para>
+ Identifies the message as a sender keepalive.
+ </para>
+ </listitem>
+ </varlistentry>
+ <varlistentry>
+ <term>
+ Byte8
+ </term>
+ <listitem>
+ <para>
+ The current end of WAL on the server, given in
+ XLogRecPtr format.
+ </para>
+ </listitem>
+ </varlistentry>
+ <varlistentry>
+ <term>
+ Byte8
+ </term>
+ <listitem>
+ <para>
+ The server's system clock at the time of transmission,
+ given in TimestampTz format.
+ </para>
+ </listitem>
+ </varlistentry>
+ </variablelist>
+ </para>
+ </listitem>
+ </varlistentry>
+ </variablelist>
+ </para>
+
+ <para>
+ <variablelist>
+ <varlistentry>
+ <term>
Standby status update (F)
</term>
<listitem>
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 41800a46040..d98a763fda6 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -452,6 +452,9 @@ typedef struct XLogCtlData
XLogRecPtr recoveryLastRecPtr;
/* timestamp of last COMMIT/ABORT record replayed (or being replayed) */
TimestampTz recoveryLastXTime;
+ /* timestamp of when we started replaying the current chunk of WAL data,
+ * only relevant for replication or archive recovery */
+ TimestampTz currentChunkStartTime;
/* end of the last record restored from the archive */
XLogRecPtr restoreLastRecPtr;
/* Are we requested to pause recovery? */
@@ -606,6 +609,7 @@ static void exitArchiveRecovery(TimeLineID endTLI,
static bool recoveryStopsHere(XLogRecord *record, bool *includeThis);
static void recoveryPausesHere(void);
static void SetLatestXTime(TimestampTz xtime);
+static void SetCurrentChunkStartTime(TimestampTz xtime);
static void CheckRequiredParameterValues(void);
static void XLogReportParameters(void);
static void LocalSetXLogInsertAllowed(void);
@@ -5848,6 +5852,41 @@ GetLatestXTime(void)
}
/*
+ * Save timestamp of the next chunk of WAL records to apply.
+ *
+ * We keep this in XLogCtl, not a simple static variable, so that it can be
+ * seen by all backends.
+ */
+static void
+SetCurrentChunkStartTime(TimestampTz xtime)
+{
+ /* use volatile pointer to prevent code rearrangement */
+ volatile XLogCtlData *xlogctl = XLogCtl;
+
+ SpinLockAcquire(&xlogctl->info_lck);
+ xlogctl->currentChunkStartTime = xtime;
+ SpinLockRelease(&xlogctl->info_lck);
+}
+
+/*
+ * Fetch timestamp of latest processed commit/abort record.
+ * Startup process maintains an accurate local copy in XLogReceiptTime
+ */
+TimestampTz
+GetCurrentChunkReplayStartTime(void)
+{
+ /* use volatile pointer to prevent code rearrangement */
+ volatile XLogCtlData *xlogctl = XLogCtl;
+ TimestampTz xtime;
+
+ SpinLockAcquire(&xlogctl->info_lck);
+ xtime = xlogctl->currentChunkStartTime;
+ SpinLockRelease(&xlogctl->info_lck);
+
+ return xtime;
+}
+
+/*
* Returns time of receipt of current chunk of XLOG data, as well as
* whether it was received from streaming replication or from archives.
*/
@@ -6390,6 +6429,7 @@ StartupXLOG(void)
xlogctl->replayEndRecPtr = ReadRecPtr;
xlogctl->recoveryLastRecPtr = ReadRecPtr;
xlogctl->recoveryLastXTime = 0;
+ xlogctl->currentChunkStartTime = 0;
xlogctl->recoveryPause = false;
SpinLockRelease(&xlogctl->info_lck);
@@ -9696,7 +9736,10 @@ retry:
{
havedata = true;
if (!XLByteLT(*RecPtr, latestChunkStart))
+ {
XLogReceiptTime = GetCurrentTimestamp();
+ SetCurrentChunkStartTime(XLogReceiptTime);
+ }
}
else
havedata = false;
diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c
index 1f12dcb62aa..8106d6b3a41 100644
--- a/src/backend/replication/walreceiver.c
+++ b/src/backend/replication/walreceiver.c
@@ -124,6 +124,7 @@ static void XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr);
static void XLogWalRcvFlush(bool dying);
static void XLogWalRcvSendReply(void);
static void XLogWalRcvSendHSFeedback(void);
+static void ProcessWalSndrMessage(XLogRecPtr walEnd, TimestampTz sendTime);
/* Signal handlers */
static void WalRcvSigHupHandler(SIGNAL_ARGS);
@@ -218,6 +219,10 @@ WalReceiverMain(void)
/* Fetch information required to start streaming */
strlcpy(conninfo, (char *) walrcv->conninfo, MAXCONNINFO);
startpoint = walrcv->receiveStart;
+
+ /* Initialise to a sanish value */
+ walrcv->lastMsgSendTime = walrcv->lastMsgReceiptTime = GetCurrentTimestamp();
+
SpinLockRelease(&walrcv->mutex);
/* Arrange to clean up at walreceiver exit */
@@ -433,12 +438,28 @@ XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len)
errmsg_internal("invalid WAL message received from primary")));
/* memcpy is required here for alignment reasons */
memcpy(&msghdr, buf, sizeof(WalDataMessageHeader));
+
+ ProcessWalSndrMessage(msghdr.walEnd, msghdr.sendTime);
+
buf += sizeof(WalDataMessageHeader);
len -= sizeof(WalDataMessageHeader);
-
XLogWalRcvWrite(buf, len, msghdr.dataStart);
break;
}
+ case 'k': /* Keepalive */
+ {
+ PrimaryKeepaliveMessage keepalive;
+
+ if (len != sizeof(PrimaryKeepaliveMessage))
+ ereport(ERROR,
+ (errcode(ERRCODE_PROTOCOL_VIOLATION),
+ errmsg_internal("invalid keepalive message received from primary")));
+ /* memcpy is required here for alignment reasons */
+ memcpy(&keepalive, buf, sizeof(PrimaryKeepaliveMessage));
+
+ ProcessWalSndrMessage(keepalive.walEnd, keepalive.sendTime);
+ break;
+ }
default:
ereport(ERROR,
(errcode(ERRCODE_PROTOCOL_VIOLATION),
@@ -711,3 +732,27 @@ XLogWalRcvSendHSFeedback(void)
memcpy(&buf[1], &feedback_message, sizeof(StandbyHSFeedbackMessage));
walrcv_send(buf, sizeof(StandbyHSFeedbackMessage) + 1);
}
+
+/*
+ * Keep track of important messages from primary.
+ */
+static void
+ProcessWalSndrMessage(XLogRecPtr walEnd, TimestampTz sendTime)
+{
+ /* use volatile pointer to prevent code rearrangement */
+ volatile WalRcvData *walrcv = WalRcv;
+
+ TimestampTz lastMsgReceiptTime = GetCurrentTimestamp();
+
+ /* Update shared-memory status */
+ SpinLockAcquire(&walrcv->mutex);
+ walrcv->lastMsgSendTime = sendTime;
+ walrcv->lastMsgReceiptTime = lastMsgReceiptTime;
+ SpinLockRelease(&walrcv->mutex);
+
+ elog(DEBUG2, "sendtime %s receipttime %s replication apply delay %d transfer latency %d",
+ timestamptz_to_str(sendTime),
+ timestamptz_to_str(lastMsgReceiptTime),
+ GetReplicationApplyDelay(),
+ GetReplicationTransferLatency());
+}
diff --git a/src/backend/replication/walreceiverfuncs.c b/src/backend/replication/walreceiverfuncs.c
index 5bce1c34a1b..054355b2c59 100644
--- a/src/backend/replication/walreceiverfuncs.c
+++ b/src/backend/replication/walreceiverfuncs.c
@@ -28,6 +28,7 @@
#include "replication/walreceiver.h"
#include "storage/pmsignal.h"
#include "storage/shmem.h"
+#include "utils/timestamp.h"
WalRcvData *WalRcv = NULL;
@@ -238,3 +239,65 @@ GetWalRcvWriteRecPtr(XLogRecPtr *latestChunkStart)
return recptr;
}
+
+/*
+ * Returns the replication apply delay in ms
+ */
+int
+GetReplicationApplyDelay(void)
+{
+ /* use volatile pointer to prevent code rearrangement */
+ volatile WalRcvData *walrcv = WalRcv;
+
+ XLogRecPtr receivePtr;
+ XLogRecPtr replayPtr;
+
+ long secs;
+ int usecs;
+
+ SpinLockAcquire(&walrcv->mutex);
+ receivePtr = walrcv->receivedUpto;
+ SpinLockRelease(&walrcv->mutex);
+
+ replayPtr = GetXLogReplayRecPtr(NULL);
+
+ if (XLByteLE(receivePtr, replayPtr))
+ return 0;
+
+ TimestampDifference(GetCurrentChunkReplayStartTime(),
+ GetCurrentTimestamp(),
+ &secs, &usecs);
+
+ return (((int) secs * 1000) + (usecs / 1000));
+}
+
+/*
+ * Returns the network latency in ms, note that this includes any
+ * difference in clock settings between the servers, as well as timezone.
+ */
+int
+GetReplicationTransferLatency(void)
+{
+ /* use volatile pointer to prevent code rearrangement */
+ volatile WalRcvData *walrcv = WalRcv;
+
+ TimestampTz lastMsgSendTime;
+ TimestampTz lastMsgReceiptTime;
+
+ long secs = 0;
+ int usecs = 0;
+ int ms;
+
+ SpinLockAcquire(&walrcv->mutex);
+ lastMsgSendTime = walrcv->lastMsgSendTime;
+ lastMsgReceiptTime = walrcv->lastMsgReceiptTime;
+ SpinLockRelease(&walrcv->mutex);
+
+ TimestampDifference(lastMsgSendTime,
+ lastMsgReceiptTime,
+ &secs, &usecs);
+
+ ms = ((int) secs * 1000) + (usecs / 1000);
+
+ return ms;
+}
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index ea865204172..ed7298b6ee8 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -131,6 +131,7 @@ static void ProcessStandbyMessage(void);
static void ProcessStandbyReplyMessage(void);
static void ProcessStandbyHSFeedbackMessage(void);
static void ProcessRepliesIfAny(void);
+static void WalSndKeepalive(char *msgbuf);
/* Main entry point for walsender process */
@@ -823,30 +824,24 @@ WalSndLoop(void)
*/
if (caughtup || pq_is_send_pending())
{
- TimestampTz finish_time = 0;
- long sleeptime = -1;
+ TimestampTz timeout = 0;
+ long sleeptime = 10000; /* 10 s */
int wakeEvents;
wakeEvents = WL_LATCH_SET | WL_POSTMASTER_DEATH |
- WL_SOCKET_READABLE;
+ WL_SOCKET_READABLE | WL_TIMEOUT;
+
if (pq_is_send_pending())
wakeEvents |= WL_SOCKET_WRITEABLE;
+ else
+ WalSndKeepalive(output_message);
/* Determine time until replication timeout */
if (replication_timeout > 0)
{
- long secs;
- int usecs;
-
- finish_time = TimestampTzPlusMilliseconds(last_reply_timestamp,
+ timeout = TimestampTzPlusMilliseconds(last_reply_timestamp,
replication_timeout);
- TimestampDifference(GetCurrentTimestamp(),
- finish_time, &secs, &usecs);
- sleeptime = secs * 1000 + usecs / 1000;
- /* Avoid Assert in WaitLatchOrSocket if timeout is past */
- if (sleeptime < 0)
- sleeptime = 0;
- wakeEvents |= WL_TIMEOUT;
+ sleeptime = 1 + (replication_timeout / 10);
}
/* Sleep until something happens or replication timeout */
@@ -859,7 +854,7 @@ WalSndLoop(void)
* timeout ... he's supposed to reply *before* that.
*/
if (replication_timeout > 0 &&
- GetCurrentTimestamp() >= finish_time)
+ GetCurrentTimestamp() >= timeout)
{
/*
* Since typically expiration of replication timeout means
@@ -1627,6 +1622,23 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
return (Datum) 0;
}
+static void
+WalSndKeepalive(char *msgbuf)
+{
+ PrimaryKeepaliveMessage keepalive_message;
+
+ /* Construct a new message */
+ keepalive_message.walEnd = sentPtr;
+ keepalive_message.sendTime = GetCurrentTimestamp();
+
+ elog(DEBUG2, "sending replication keepalive");
+
+ /* Prepend with the message type and send it. */
+ msgbuf[0] = 'k';
+ memcpy(msgbuf + 1, &keepalive_message, sizeof(PrimaryKeepaliveMessage));
+ pq_putmessage_noblock('d', msgbuf, sizeof(PrimaryKeepaliveMessage) + 1);
+}
+
/*
* This isn't currently used for anything. Monitoring tools might be
* interested in the future, and we'll need something like this in the
diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h
index 86ab3276caf..4b1f8b8c2f3 100644
--- a/src/include/access/xlog.h
+++ b/src/include/access/xlog.h
@@ -293,6 +293,7 @@ extern XLogRecPtr GetXLogWriteRecPtr(void);
extern bool RecoveryIsPaused(void);
extern void SetRecoveryPause(bool recoveryPause);
extern TimestampTz GetLatestXTime(void);
+extern TimestampTz GetCurrentChunkReplayStartTime(void);
extern void UpdateControlFile(void);
extern uint64 GetSystemIdentifier(void);
diff --git a/src/include/replication/walprotocol.h b/src/include/replication/walprotocol.h
index 656c8fc17fd..053376d3774 100644
--- a/src/include/replication/walprotocol.h
+++ b/src/include/replication/walprotocol.h
@@ -17,6 +17,20 @@
/*
+ * All messages from WalSender must contain these fields to allow us to
+ * correctly calculate the replication delay.
+ */
+typedef struct
+{
+ /* Current end of WAL on the sender */
+ XLogRecPtr walEnd;
+
+ /* Sender's system clock at the time of transmission */
+ TimestampTz sendTime;
+} WalSndrMessage;
+
+
+/*
* Header for a WAL data message (message type 'w'). This is wrapped within
* a CopyData message at the FE/BE protocol level.
*
@@ -40,6 +54,14 @@ typedef struct
} WalDataMessageHeader;
/*
+ * Keepalive message from primary (message type 'k'). (lowercase k)
+ * This is wrapped within a CopyData message at the FE/BE protocol level.
+ *
+ * Note that the data length is not specified here.
+ */
+typedef WalSndrMessage PrimaryKeepaliveMessage;
+
+/*
* Reply message from standby (message type 'r'). This is wrapped within
* a CopyData message at the FE/BE protocol level.
*
diff --git a/src/include/replication/walreceiver.h b/src/include/replication/walreceiver.h
index 77f52520917..926730c9f82 100644
--- a/src/include/replication/walreceiver.h
+++ b/src/include/replication/walreceiver.h
@@ -79,6 +79,12 @@ typedef struct
XLogRecPtr latestChunkStart;
/*
+ * Time of send and receive of any message received.
+ */
+ TimestampTz lastMsgSendTime;
+ TimestampTz lastMsgReceiptTime;
+
+ /*
* connection string; is used for walreceiver to connect with the primary.
*/
char conninfo[MAXCONNINFO];
@@ -112,5 +118,7 @@ extern void ShutdownWalRcv(void);
extern bool WalRcvInProgress(void);
extern void RequestXLogStreaming(XLogRecPtr recptr, const char *conninfo);
extern XLogRecPtr GetWalRcvWriteRecPtr(XLogRecPtr *latestChunkStart);
+extern int GetReplicationApplyDelay(void);
+extern int GetReplicationTransferLatency(void);
#endif /* _WALRECEIVER_H */