diff options
Diffstat (limited to 'src/bin/pg_basebackup/receivelog.c')
-rw-r--r-- | src/bin/pg_basebackup/receivelog.c | 156 |
1 files changed, 98 insertions, 58 deletions
diff --git a/src/bin/pg_basebackup/receivelog.c b/src/bin/pg_basebackup/receivelog.c index 404ff917150..aed90954e6c 100644 --- a/src/bin/pg_basebackup/receivelog.c +++ b/src/bin/pg_basebackup/receivelog.c @@ -21,7 +21,6 @@ #include "postgres.h" #include "libpq-fe.h" #include "access/xlog_internal.h" -#include "replication/walprotocol.h" #include "utils/datetime.h" #include "utils/timestamp.h" @@ -34,14 +33,9 @@ #include <unistd.h> -/* Size of the streaming replication protocol headers */ -#define STREAMING_HEADER_SIZE (1+sizeof(WalDataMessageHeader)) -#define STREAMING_KEEPALIVE_SIZE (1+sizeof(PrimaryKeepaliveMessage)) - /* fd for currently open WAL file */ static int walfile = -1; - /* * Open a new WAL file in the specified directory. Store the name * (not including the full directory) in namebuf. Assumes there is @@ -189,37 +183,34 @@ close_walfile(char *basedir, char *walname, bool segment_complete) /* * Local version of GetCurrentTimestamp(), since we are not linked with - * backend code. + * backend code. The protocol always uses integer timestamps, regardless of + * server setting. */ -static TimestampTz +static int64 localGetCurrentTimestamp(void) { - TimestampTz result; + int64 result; struct timeval tp; gettimeofday(&tp, NULL); - result = (TimestampTz) tp.tv_sec - + result = (int64) tp.tv_sec - ((POSTGRES_EPOCH_JDATE - UNIX_EPOCH_JDATE) * SECS_PER_DAY); -#ifdef HAVE_INT64_TIMESTAMP result = (result * USECS_PER_SEC) + tp.tv_usec; -#else - result = result + (tp.tv_usec / 1000000.0); -#endif return result; } /* - * Local version of TimestampDifference(), since we are not - * linked with backend code. + * Local version of TimestampDifference(), since we are not linked with + * backend code. */ static void -localTimestampDifference(TimestampTz start_time, TimestampTz stop_time, +localTimestampDifference(int64 start_time, int64 stop_time, long *secs, int *microsecs) { - TimestampTz diff = stop_time - start_time; + int64 diff = stop_time - start_time; if (diff <= 0) { @@ -228,13 +219,8 @@ localTimestampDifference(TimestampTz start_time, TimestampTz stop_time, } else { -#ifdef HAVE_INT64_TIMESTAMP *secs = (long) (diff / USECS_PER_SEC); *microsecs = (int) (diff % USECS_PER_SEC); -#else - *secs = (long) diff; - *microsecs = (int) ((diff - *secs) * 1000000.0); -#endif } } @@ -243,17 +229,86 @@ localTimestampDifference(TimestampTz start_time, TimestampTz stop_time, * linked with backend code. */ static bool -localTimestampDifferenceExceeds(TimestampTz start_time, - TimestampTz stop_time, +localTimestampDifferenceExceeds(int64 start_time, + int64 stop_time, int msec) { - TimestampTz diff = stop_time - start_time; + int64 diff = stop_time - start_time; -#ifdef HAVE_INT64_TIMESTAMP return (diff >= msec * INT64CONST(1000)); -#else - return (diff * 1000.0 >= msec); -#endif +} + +/* + * Converts an int64 to network byte order. + */ +static void +sendint64(int64 i, char *buf) +{ + uint32 n32; + + /* High order half first, since we're doing MSB-first */ + n32 = (uint32) (i >> 32); + n32 = htonl(n32); + memcpy(&buf[0], &n32, 4); + + /* Now the low order half */ + n32 = (uint32) i; + n32 = htonl(n32); + memcpy(&buf[4], &n32, 4); +} + +/* + * Converts an int64 from network byte order to native format. + */ +static int64 +recvint64(char *buf) +{ + int64 result; + uint32 h32; + uint32 l32; + + memcpy(&h32, buf, 4); + memcpy(&l32, buf + 4, 4); + h32 = ntohl(h32); + l32 = ntohl(l32); + + result = h32; + result <<= 32; + result |= l32; + + return result; +} + +/* + * Send a Standby Status Update message to server. + */ +static bool +sendFeedback(PGconn *conn, XLogRecPtr blockpos, int64 now) +{ + char replybuf[1 + 8 + 8 + 8 + 8 + 1]; + int len = 0; + + replybuf[len] = 'r'; + len += 1; + sendint64(blockpos, &replybuf[len]); /* write */ + len += 8; + sendint64(InvalidXLogRecPtr, &replybuf[len]); /* flush */ + len += 8; + sendint64(InvalidXLogRecPtr, &replybuf[len]); /* apply */ + len += 8; + sendint64(now, &replybuf[len]); /* sendTime */ + len += 8; + replybuf[len] = 0; /* replyRequested */ + len += 1; + + if (PQputCopyData(conn, replybuf, len) <= 0 || PQflush(conn)) + { + fprintf(stderr, _("%s: could not send feedback packet: %s"), + progname, PQerrorMessage(conn)); + return false; + } + + return true; } /* @@ -382,24 +437,8 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, standby_message_timeout)) { /* Time to send feedback! */ - char replybuf[sizeof(StandbyReplyMessage) + 1]; - StandbyReplyMessage *replymsg; - - replymsg = (StandbyReplyMessage *) (replybuf + 1); - replymsg->write = blockpos; - replymsg->flush = InvalidXLogRecPtr; - replymsg->apply = InvalidXLogRecPtr; - replymsg->sendTime = now; - replybuf[0] = 'r'; - - if (PQputCopyData(conn, replybuf, sizeof(replybuf)) <= 0 || - PQflush(conn)) - { - fprintf(stderr, _("%s: could not send feedback packet: %s"), - progname, PQerrorMessage(conn)); + if (!sendFeedback(conn, blockpos, now)) goto error; - } - last_status = now; } @@ -419,12 +458,11 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, FD_SET(PQsocket(conn), &input_mask); if (standby_message_timeout) { - TimestampTz targettime; + int64 targettime; long secs; int usecs; - targettime = TimestampTzPlusMilliseconds(last_status, - standby_message_timeout - 1); + targettime = last_status + (standby_message_timeout - 1) * ((int64) 1000); localTimestampDifference(now, targettime, &secs, @@ -474,19 +512,14 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, progname, PQerrorMessage(conn)); goto error; } + + /* Check the message type. */ if (copybuf[0] == 'k') { /* * keepalive message, sent in 9.2 and newer. We just ignore this * message completely, but need to skip past it in the stream. */ - if (r != STREAMING_KEEPALIVE_SIZE) - { - fprintf(stderr, - _("%s: keepalive message has incorrect size %d\n"), - progname, r); - goto error; - } continue; } else if (copybuf[0] != 'w') @@ -495,15 +528,22 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, progname, copybuf[0]); goto error; } + + /* + * Read the header of the XLogData message, enclosed in the CopyData + * message. We only need the WAL location field (dataStart), the rest + * of the header is ignored. + */ +#define STREAMING_HEADER_SIZE (1 /* msgtype */ + 8 /* dataStart */ + 8 /* walEnd */ + 8 /* sendTime */) if (r < STREAMING_HEADER_SIZE + 1) { fprintf(stderr, _("%s: streaming header too small: %d\n"), progname, r); goto error; } + blockpos = recvint64(©buf[1]); /* Extract WAL location for this block */ - memcpy(&blockpos, copybuf + 1, 8); xlogoff = blockpos % XLOG_SEG_SIZE; /* |