diff options
Diffstat (limited to 'src/backend/replication/walreceiver.c')
-rw-r--r-- | src/backend/replication/walreceiver.c | 145 |
1 files changed, 85 insertions, 60 deletions
diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c index b1accdcceaf..62135037f10 100644 --- a/src/backend/replication/walreceiver.c +++ b/src/backend/replication/walreceiver.c @@ -39,9 +39,9 @@ #include <unistd.h> #include "access/xlog_internal.h" +#include "libpq/pqformat.h" #include "libpq/pqsignal.h" #include "miscadmin.h" -#include "replication/walprotocol.h" #include "replication/walreceiver.h" #include "replication/walsender.h" #include "storage/ipc.h" @@ -93,8 +93,8 @@ static struct XLogRecPtr Flush; /* last byte + 1 flushed in the standby */ } LogstreamResult; -static StandbyReplyMessage reply_message; -static StandbyHSFeedbackMessage feedback_message; +static StringInfoData reply_message; +static StringInfoData incoming_message; /* * About SIGTERM handling: @@ -279,10 +279,10 @@ WalReceiverMain(void) walrcv_connect(conninfo, startpoint); DisableWalRcvImmediateExit(); - /* Initialize LogstreamResult, reply_message and feedback_message */ + /* Initialize LogstreamResult and buffers for processing messages */ LogstreamResult.Write = LogstreamResult.Flush = GetXLogReplayRecPtr(NULL); - MemSet(&reply_message, 0, sizeof(reply_message)); - MemSet(&feedback_message, 0, sizeof(feedback_message)); + initStringInfo(&reply_message); + initStringInfo(&incoming_message); /* Initialize the last recv timestamp */ last_recv_timestamp = GetCurrentTimestamp(); @@ -480,41 +480,58 @@ WalRcvQuickDieHandler(SIGNAL_ARGS) static void XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len) { + int hdrlen; + XLogRecPtr dataStart; + XLogRecPtr walEnd; + TimestampTz sendTime; + bool replyRequested; + + resetStringInfo(&incoming_message); + switch (type) { case 'w': /* WAL records */ { - WalDataMessageHeader msghdr; - - if (len < sizeof(WalDataMessageHeader)) + /* copy message to StringInfo */ + hdrlen = sizeof(int64) + sizeof(int64) + sizeof(int64); + if (len < hdrlen) ereport(ERROR, (errcode(ERRCODE_PROTOCOL_VIOLATION), errmsg_internal("invalid WAL message received from primary"))); - /* memcpy is required here for alignment reasons */ - memcpy(&msghdr, buf, sizeof(WalDataMessageHeader)); - - ProcessWalSndrMessage(msghdr.walEnd, msghdr.sendTime); - - buf += sizeof(WalDataMessageHeader); - len -= sizeof(WalDataMessageHeader); - XLogWalRcvWrite(buf, len, msghdr.dataStart); + appendBinaryStringInfo(&incoming_message, buf, hdrlen); + + /* read the fields */ + dataStart = pq_getmsgint64(&incoming_message); + walEnd = pq_getmsgint64(&incoming_message); + sendTime = IntegerTimestampToTimestampTz( + pq_getmsgint64(&incoming_message)); + ProcessWalSndrMessage(walEnd, sendTime); + + buf += hdrlen; + len -= hdrlen; + XLogWalRcvWrite(buf, len, dataStart); break; } case 'k': /* Keepalive */ { - PrimaryKeepaliveMessage keepalive; - - if (len != sizeof(PrimaryKeepaliveMessage)) + /* copy message to StringInfo */ + hdrlen = sizeof(int64) + sizeof(int64) + sizeof(char); + if (len != hdrlen) ereport(ERROR, (errcode(ERRCODE_PROTOCOL_VIOLATION), errmsg_internal("invalid keepalive message received from primary"))); - /* memcpy is required here for alignment reasons */ - memcpy(&keepalive, buf, sizeof(PrimaryKeepaliveMessage)); + appendBinaryStringInfo(&incoming_message, buf, hdrlen); - ProcessWalSndrMessage(keepalive.walEnd, keepalive.sendTime); + /* read the fields */ + walEnd = pq_getmsgint64(&incoming_message); + sendTime = IntegerTimestampToTimestampTz( + pq_getmsgint64(&incoming_message)); + replyRequested = pq_getmsgbyte(&incoming_message); + + ProcessWalSndrMessage(walEnd, sendTime); /* If the primary requested a reply, send one immediately */ - if (keepalive.replyRequested) + if (replyRequested) XLogWalRcvSendReply(true, false); break; } @@ -685,7 +702,10 @@ XLogWalRcvFlush(bool dying) static void XLogWalRcvSendReply(bool force, bool requestReply) { - char buf[sizeof(StandbyReplyMessage) + 1]; + static XLogRecPtr writePtr = 0; + static XLogRecPtr flushPtr = 0; + XLogRecPtr applyPtr; + static TimestampTz sendTime = 0; TimestampTz now; /* @@ -708,28 +728,34 @@ XLogWalRcvSendReply(bool force, bool requestReply) * probably OK. */ if (!force - && XLByteEQ(reply_message.write, LogstreamResult.Write) - && XLByteEQ(reply_message.flush, LogstreamResult.Flush) - && !TimestampDifferenceExceeds(reply_message.sendTime, now, + && XLByteEQ(writePtr, LogstreamResult.Write) + && XLByteEQ(flushPtr, LogstreamResult.Flush) + && !TimestampDifferenceExceeds(sendTime, now, wal_receiver_status_interval * 1000)) return; + sendTime = now; /* Construct a new message */ - reply_message.write = LogstreamResult.Write; - reply_message.flush = LogstreamResult.Flush; - reply_message.apply = GetXLogReplayRecPtr(NULL); - reply_message.sendTime = now; - reply_message.replyRequested = requestReply; - - elog(DEBUG2, "sending write %X/%X flush %X/%X apply %X/%X", - (uint32) (reply_message.write >> 32), (uint32) reply_message.write, - (uint32) (reply_message.flush >> 32), (uint32) reply_message.flush, - (uint32) (reply_message.apply >> 32), (uint32) reply_message.apply); - - /* Prepend with the message type and send it. */ - buf[0] = 'r'; - memcpy(&buf[1], &reply_message, sizeof(StandbyReplyMessage)); - walrcv_send(buf, sizeof(StandbyReplyMessage) + 1); + writePtr = LogstreamResult.Write; + flushPtr = LogstreamResult.Flush; + applyPtr = GetXLogReplayRecPtr(NULL); + + resetStringInfo(&reply_message); + pq_sendbyte(&reply_message, 'r'); + pq_sendint64(&reply_message, writePtr); + pq_sendint64(&reply_message, flushPtr); + pq_sendint64(&reply_message, applyPtr); + pq_sendint64(&reply_message, GetCurrentIntegerTimestamp()); + pq_sendbyte(&reply_message, requestReply ? 1 : 0); + + /* Send it */ + elog(DEBUG2, "sending write %X/%X flush %X/%X apply %X/%X%s", + (uint32) (writePtr >> 32), (uint32) writePtr, + (uint32) (flushPtr >> 32), (uint32) flushPtr, + (uint32) (applyPtr >> 32), (uint32) applyPtr, + requestReply ? " (reply requested)" : ""); + + walrcv_send(reply_message.data, reply_message.len); } /* @@ -739,11 +765,11 @@ XLogWalRcvSendReply(bool force, bool requestReply) static void XLogWalRcvSendHSFeedback(void) { - char buf[sizeof(StandbyHSFeedbackMessage) + 1]; TimestampTz now; TransactionId nextXid; uint32 nextEpoch; TransactionId xmin; + static TimestampTz sendTime = 0; /* * If the user doesn't want status to be reported to the master, be sure @@ -758,9 +784,10 @@ XLogWalRcvSendHSFeedback(void) /* * Send feedback at most once per wal_receiver_status_interval. */ - if (!TimestampDifferenceExceeds(feedback_message.sendTime, now, + if (!TimestampDifferenceExceeds(sendTime, now, wal_receiver_status_interval * 1000)) return; + sendTime = now; /* * If Hot Standby is not yet active there is nothing to send. Check this @@ -783,25 +810,23 @@ XLogWalRcvSendHSFeedback(void) if (nextXid < xmin) nextEpoch--; - /* - * Always send feedback message. - */ - feedback_message.sendTime = now; - feedback_message.xmin = xmin; - feedback_message.epoch = nextEpoch; - elog(DEBUG2, "sending hot standby feedback xmin %u epoch %u", - feedback_message.xmin, - feedback_message.epoch); - - /* Prepend with the message type and send it. */ - buf[0] = 'h'; - memcpy(&buf[1], &feedback_message, sizeof(StandbyHSFeedbackMessage)); - walrcv_send(buf, sizeof(StandbyHSFeedbackMessage) + 1); + xmin, nextEpoch); + + /* Construct the the message and send it. */ + resetStringInfo(&reply_message); + pq_sendbyte(&reply_message, 'h'); + pq_sendint64(&reply_message, GetCurrentIntegerTimestamp()); + pq_sendint(&reply_message, xmin, 4); + pq_sendint(&reply_message, nextEpoch, 4); + walrcv_send(reply_message.data, reply_message.len); } /* - * Keep track of important messages from primary. + * Update shared memory status upon receiving a message from primary. + * + * 'walEnd' and 'sendTime' are the end-of-WAL and timestamp of the latest + * message, reported by primary. */ static void ProcessWalSndrMessage(XLogRecPtr walEnd, TimestampTz sendTime) |