diff options
Diffstat (limited to 'src/bin/pg_basebackup/pg_receivexlog.c')
-rw-r--r-- | src/bin/pg_basebackup/pg_receivexlog.c | 104 |
1 files changed, 100 insertions, 4 deletions
diff --git a/src/bin/pg_basebackup/pg_receivexlog.c b/src/bin/pg_basebackup/pg_receivexlog.c index 11c31bbe137..135e2070f37 100644 --- a/src/bin/pg_basebackup/pg_receivexlog.c +++ b/src/bin/pg_basebackup/pg_receivexlog.c @@ -34,6 +34,7 @@ /* Global options */ static char *basedir = NULL; static int verbose = 0; +static int compresslevel = 0; static int noloop = 0; static int standby_message_timeout = 10 * 1000; /* 10 sec = default */ static volatile bool time_to_abort = false; @@ -58,6 +59,15 @@ static bool stop_streaming(XLogRecPtr segendpos, uint32 timeline, exit(code); \ } +/* Routines to evaluate segment file format */ +#define IsCompressXLogFileName(fname) \ + (strlen(fname) == XLOG_FNAME_LEN + strlen(".gz") && \ + strspn(fname, "0123456789ABCDEF") == XLOG_FNAME_LEN && \ + strcmp((fname) + XLOG_FNAME_LEN, ".gz") == 0) +#define IsPartialCompressXLogFileName(fname) \ + (strlen(fname) == XLOG_FNAME_LEN + strlen(".gz.partial") && \ + strspn(fname, "0123456789ABCDEF") == XLOG_FNAME_LEN && \ + strcmp((fname) + XLOG_FNAME_LEN, ".gz.partial") == 0) static void usage(void) @@ -76,6 +86,7 @@ usage(void) printf(_(" --synchronous flush transaction log immediately after writing\n")); printf(_(" -v, --verbose output verbose messages\n")); printf(_(" -V, --version output version information, then exit\n")); + printf(_(" -Z, --compress=0-9 compress logs with given compression level\n")); printf(_(" -?, --help show this help, then exit\n")); printf(_("\nConnection options:\n")); printf(_(" -d, --dbname=CONNSTR connection string\n")); @@ -188,14 +199,31 @@ FindStreamingStart(uint32 *tli) uint32 tli; XLogSegNo segno; bool ispartial; + bool iscompress; /* * Check if the filename looks like an xlog file, or a .partial file. */ if (IsXLogFileName(dirent->d_name)) + { ispartial = false; + iscompress = false; + } else if (IsPartialXLogFileName(dirent->d_name)) + { + ispartial = true; + iscompress = false; + } + else if (IsCompressXLogFileName(dirent->d_name)) + { + ispartial = false; + iscompress = true; + } + else if (IsPartialCompressXLogFileName(dirent->d_name)) + { ispartial = true; + iscompress = true; + } else continue; @@ -206,9 +234,15 @@ FindStreamingStart(uint32 *tli) /* * Check that the segment has the right size, if it's supposed to be - * completed. + * completed. For non-compressed segments just check the on-disk size + * and see if it matches a completed segment. + * For compressed segments, look at the last 4 bytes of the compressed + * file, which is where the uncompressed size is located for gz files + * with a size lower than 4GB, and then compare it to the size of a + * completed segment. The 4 last bytes correspond to the ISIZE member + * according to http://www.zlib.org/rfc-gzip.html. */ - if (!ispartial) + if (!ispartial && !iscompress) { struct stat statbuf; char fullpath[MAXPGPATH]; @@ -229,6 +263,47 @@ FindStreamingStart(uint32 *tli) continue; } } + else if (!ispartial && iscompress) + { + int fd; + char buf[4]; + int bytes_out; + char fullpath[MAXPGPATH]; + + snprintf(fullpath, sizeof(fullpath), "%s/%s", basedir, dirent->d_name); + + fd = open(fullpath, O_RDONLY | PG_BINARY); + if (fd < 0) + { + fprintf(stderr, _("%s: could not open compressed file \"%s\": %s\n"), + progname, fullpath, strerror(errno)); + disconnect_and_exit(1); + } + if (lseek(fd, (off_t)(-4), SEEK_END) < 0) + { + fprintf(stderr, _("%s: could not seek compressed file \"%s\": %s\n"), + progname, fullpath, strerror(errno)); + disconnect_and_exit(1); + } + if (read(fd, (char *) buf, sizeof(buf)) != sizeof(buf)) + { + fprintf(stderr, _("%s: could not read compressed file \"%s\": %s\n"), + progname, fullpath, strerror(errno)); + disconnect_and_exit(1); + } + + close(fd); + bytes_out = (buf[3] << 24) | (buf[2] << 16) | + (buf[1] << 8) | buf[0]; + + if (bytes_out != XLOG_SEG_SIZE) + { + fprintf(stderr, + _("%s: compressed segment file \"%s\" has incorrect uncompressed size %d, skipping\n"), + progname, dirent->d_name, bytes_out); + continue; + } + } /* Looks like a valid segment. Remember that we saw it. */ if ((segno > high_segno) || @@ -339,7 +414,8 @@ StreamLog(void) stream.synchronous = synchronous; stream.do_sync = true; stream.mark_done = false; - stream.walmethod = CreateWalDirectoryMethod(basedir, stream.do_sync); + stream.walmethod = CreateWalDirectoryMethod(basedir, compresslevel, + stream.do_sync); stream.partial_suffix = ".partial"; stream.replication_slot = replication_slot; stream.temp_slot = false; @@ -392,6 +468,7 @@ main(int argc, char **argv) {"status-interval", required_argument, NULL, 's'}, {"slot", required_argument, NULL, 'S'}, {"verbose", no_argument, NULL, 'v'}, + {"compress", required_argument, NULL, 'Z'}, /* action */ {"create-slot", no_argument, NULL, 1}, {"drop-slot", no_argument, NULL, 2}, @@ -422,7 +499,7 @@ main(int argc, char **argv) } } - while ((c = getopt_long(argc, argv, "D:d:h:p:U:s:S:nwWv", + while ((c = getopt_long(argc, argv, "D:d:h:p:U:s:S:nwWvZ:", long_options, &option_index)) != -1) { switch (c) @@ -472,6 +549,15 @@ main(int argc, char **argv) case 'v': verbose++; break; + case 'Z': + compresslevel = atoi(optarg); + if (compresslevel < 0 || compresslevel > 9) + { + fprintf(stderr, _("%s: invalid compression level \"%s\"\n"), + progname, optarg); + exit(1); + } + break; /* action */ case 1: do_create_slot = true; @@ -538,6 +624,16 @@ main(int argc, char **argv) exit(1); } +#ifndef HAVE_LIBZ + if (compresslevel != 0) + { + fprintf(stderr, + _("%s: this build does not support compression\n"), + progname); + exit(1); + } +#endif + /* * Check existence of destination folder. */ |