diff options
Diffstat (limited to 'src/bin/pg_basebackup/pg_basebackup.c')
-rw-r--r-- | src/bin/pg_basebackup/pg_basebackup.c | 538 |
1 files changed, 390 insertions, 148 deletions
diff --git a/src/bin/pg_basebackup/pg_basebackup.c b/src/bin/pg_basebackup/pg_basebackup.c index 5c62be576ee..68e40f478ff 100644 --- a/src/bin/pg_basebackup/pg_basebackup.c +++ b/src/bin/pg_basebackup/pg_basebackup.c @@ -11,12 +11,20 @@ *------------------------------------------------------------------------- */ -#include "postgres_fe.h" +/* + * We have to use postgres.h not postgres_fe.h here, because there's so much + * backend-only stuff in the XLOG include files we need. But we need a + * frontend-ish environment otherwise. Hence this ugly hack. + */ +#define FRONTEND 1 +#include "postgres.h" #include "libpq-fe.h" #include <unistd.h> #include <dirent.h> #include <sys/stat.h> +#include <sys/types.h> +#include <sys/wait.h> #ifdef HAVE_LIBZ #include <zlib.h> @@ -24,9 +32,11 @@ #include "getopt_long.h" +#include "receivelog.h" +#include "streamutil.h" + /* Global options */ -static const char *progname; char *basedir = NULL; char format = 'p'; /* p(lain)/t(ar) */ char *label = "pg_basebackup base backup"; @@ -34,38 +44,38 @@ bool showprogress = false; int verbose = 0; int compresslevel = 0; bool includewal = false; +bool streamwal = false; bool fastcheckpoint = false; -char *dbhost = NULL; -char *dbuser = NULL; -char *dbport = NULL; -int dbgetpassword = 0; /* 0=auto, -1=never, 1=always */ +int standby_message_timeout = 10; /* 10 sec = default */ /* Progress counters */ static uint64 totalsize; static uint64 totaldone; static int tablespacecount; -/* Connection kept global so we can disconnect easily */ -static PGconn *conn = NULL; +/* Pipe to communicate with background wal receiver process */ +#ifndef WIN32 +static int bgpipe[2] = {-1, -1}; +#endif -#define disconnect_and_exit(code) \ - { \ - if (conn != NULL) PQfinish(conn); \ - exit(code); \ - } +/* Handle to child process */ +static pid_t bgchild = -1; + +/* End position for xlog streaming, empty string if unknown yet */ +static XLogRecPtr xlogendptr; +static int has_xlogendptr = 0; /* Function headers */ -static char *xstrdup(const char *s); -static void *xmalloc0(int size); static void usage(void); static void verify_dir_is_empty_or_create(char *dirname); static void progress_report(int tablespacenum, const char *filename); -static PGconn *GetConnection(void); static void ReceiveTarFile(PGconn *conn, PGresult *res, int rownum); static void ReceiveAndUnpackTarFile(PGconn *conn, PGresult *res, int rownum); static void BaseBackup(void); +static bool segment_callback(XLogRecPtr segendpos, uint32 timeline); + #ifdef HAVE_LIBZ static const char * get_gz_error(gzFile *gzf) @@ -81,39 +91,6 @@ get_gz_error(gzFile *gzf) } #endif -/* - * strdup() and malloc() replacements that prints an error and exits - * if something goes wrong. Can never return NULL. - */ -static char * -xstrdup(const char *s) -{ - char *result; - - result = strdup(s); - if (!result) - { - fprintf(stderr, _("%s: out of memory\n"), progname); - exit(1); - } - return result; -} - -static void * -xmalloc0(int size) -{ - void *result; - - result = malloc(size); - if (!result) - { - fprintf(stderr, _("%s: out of memory\n"), progname); - exit(1); - } - MemSet(result, 0, size); - return result; -} - static void usage(void) @@ -125,7 +102,7 @@ usage(void) printf(_("\nOptions controlling the output:\n")); printf(_(" -D, --pgdata=DIRECTORY receive base backup into directory\n")); printf(_(" -F, --format=p|t output format (plain, tar)\n")); - printf(_(" -x, --xlog include required WAL files in backup\n")); + printf(_(" -x, --xlog=fetch|stream include required WAL files in backup\n")); printf(_(" -z, --gzip compress tar output\n")); printf(_(" -Z, --compress=0-9 compress tar output with given compression level\n")); printf(_("\nGeneral options:\n")); @@ -137,6 +114,7 @@ usage(void) printf(_(" --help show this help, then exit\n")); printf(_(" --version output version information, then exit\n")); printf(_("\nConnection options:\n")); + printf(_(" -s, --statusint=INTERVAL time between status packets sent to server (in seconds)\n")); printf(_(" -h, --host=HOSTNAME database server host or socket directory\n")); printf(_(" -p, --port=PORT database server port number\n")); printf(_(" -U, --username=NAME connect as specified database user\n")); @@ -147,6 +125,199 @@ usage(void) /* + * Called in the background process whenever a complete segment of WAL + * has been received. + * On Unix, we check to see if there is any data on our pipe + * (which would mean we have a stop position), and if it is, check if + * it is time to stop. + * On Windows, we are in a single process, so we can just check if it's + * time to stop. + */ +static bool +segment_callback(XLogRecPtr segendpos, uint32 timeline) +{ + if (!has_xlogendptr) + { +#ifndef WIN32 + fd_set fds; + struct timeval tv; + int r; + + /* + * Don't have the end pointer yet - check our pipe to see if it has + * been sent yet. + */ + FD_ZERO(&fds); + FD_SET(bgpipe[0], &fds); + + MemSet(&tv, 0, sizeof(tv)); + + r = select(bgpipe[0] + 1, &fds, NULL, NULL, &tv); + if (r == 1) + { + char xlogend[64]; + + MemSet(xlogend, 0, sizeof(xlogend)); + r = piperead(bgpipe[0], xlogend, sizeof(xlogend)); + if (r < 0) + { + fprintf(stderr, _("%s: could not read from ready pipe: %s\n"), + progname, strerror(errno)); + exit(1); + } + + if (sscanf(xlogend, "%X/%X", &xlogendptr.xlogid, &xlogendptr.xrecoff) != 2) + { + fprintf(stderr, _("%s: could not parse xlog end position \"%s\"\n"), + progname, xlogend); + exit(1); + } + has_xlogendptr = 1; + + /* + * Fall through to check if we've reached the point further + * already. + */ + } + else + { + /* + * No data received on the pipe means we don't know the end + * position yet - so just say it's not time to stop yet. + */ + return false; + } +#else + + /* + * On win32, has_xlogendptr is set by the main thread, so if it's not + * set here, we just go back and wait until it shows up. + */ + return false; +#endif + } + + /* + * At this point we have an end pointer, so compare it to the current + * position to figure out if it's time to stop. + */ + if (segendpos.xlogid > xlogendptr.xlogid || + (segendpos.xlogid == xlogendptr.xlogid && + segendpos.xrecoff >= xlogendptr.xrecoff)) + return true; + + /* + * Have end pointer, but haven't reached it yet - so tell the caller to + * keep streaming. + */ + return false; +} + +typedef struct +{ + PGconn *bgconn; + XLogRecPtr startptr; + char xlogdir[MAXPGPATH]; + char *sysidentifier; + int timeline; +} logstreamer_param; + +static int +LogStreamerMain(logstreamer_param * param) +{ + if (!ReceiveXlogStream(param->bgconn, param->startptr, param->timeline, + param->sysidentifier, param->xlogdir, + segment_callback, NULL, standby_message_timeout)) + + /* + * Any errors will already have been reported in the function process, + * but we need to tell the parent that we didn't shutdown in a nice + * way. + */ + return 1; + + PQfinish(param->bgconn); + return 0; +} + +/* + * Initiate background process for receiving xlog during the backup. + * The background stream will use its own database connection so we can + * stream the logfile in parallel with the backups. + */ +static void +StartLogStreamer(char *startpos, uint32 timeline, char *sysidentifier) +{ + logstreamer_param *param; + + param = xmalloc0(sizeof(logstreamer_param)); + param->timeline = timeline; + param->sysidentifier = sysidentifier; + + /* Convert the starting position */ + if (sscanf(startpos, "%X/%X", ¶m->startptr.xlogid, ¶m->startptr.xrecoff) != 2) + { + fprintf(stderr, _("%s: invalid format of xlog location: %s\n"), + progname, startpos); + disconnect_and_exit(1); + } + /* Round off to even segment position */ + param->startptr.xrecoff -= param->startptr.xrecoff % XLOG_SEG_SIZE; + +#ifndef WIN32 + /* Create our background pipe */ + if (pgpipe(bgpipe) < 0) + { + fprintf(stderr, _("%s: could not create pipe for background process: %s\n"), + progname, strerror(errno)); + disconnect_and_exit(1); + } +#endif + + /* Get a second connection */ + param->bgconn = GetConnection(); + + /* + * Always in plain format, so we can write to basedir/pg_xlog. But the + * directory entry in the tar file may arrive later, so make sure it's + * created before we start. + */ + snprintf(param->xlogdir, sizeof(param->xlogdir), "%s/pg_xlog", basedir); + verify_dir_is_empty_or_create(param->xlogdir); + + /* + * Start a child process and tell it to start streaming. On Unix, this is + * a fork(). On Windows, we create a thread. + */ +#ifndef WIN32 + bgchild = fork(); + if (bgchild == 0) + { + /* in child process */ + exit(LogStreamerMain(param)); + } + else if (bgchild < 0) + { + fprintf(stderr, _("%s: could not create background process: %s\n"), + progname, strerror(errno)); + disconnect_and_exit(1); + } + + /* + * Else we are in the parent process and all is well. + */ +#else /* WIN32 */ + bgchild = _beginthreadex(NULL, 0, (void *) LogStreamerMain, param, 0, NULL); + if (bgchild == 0) + { + fprintf(stderr, _("%s: could not create background thread: %s\n"), + progname, strerror(errno)); + disconnect_and_exit(1); + } +#endif +} + +/* * Verify that the given directory exists and is empty. If it does not * exist, it is created. If it exists but is not empty, an error will * be give and the process ended. @@ -503,11 +674,6 @@ ReceiveAndUnpackTarFile(PGconn *conn, PGresult *res, int rownum) strcpy(current_path, PQgetvalue(res, rownum, 1)); /* - * Make sure we're unpacking into an empty directory - */ - verify_dir_is_empty_or_create(current_path); - - /* * Get the COPY data */ res = PQgetResult(conn); @@ -597,13 +763,21 @@ ReceiveAndUnpackTarFile(PGconn *conn, PGresult *res, int rownum) /* * Directory */ - filename[strlen(filename) - 1] = '\0'; /* Remove trailing slash */ + filename[strlen(filename) - 1] = '\0'; /* Remove trailing slash */ if (mkdir(filename, S_IRWXU) != 0) { - fprintf(stderr, + /* + * When streaming WAL, pg_xlog will have been created + * by the wal receiver process, so just ignore failure + * on that. + */ + if (!streamwal || strcmp(filename + strlen(filename) - 8, "/pg_xlog") != 0) + { + fprintf(stderr, _("%s: could not create directory \"%s\": %s\n"), - progname, filename, strerror(errno)); - disconnect_and_exit(1); + progname, filename, strerror(errno)); + disconnect_and_exit(1); + } } #ifndef WIN32 if (chmod(filename, (mode_t) filemode)) @@ -616,12 +790,12 @@ ReceiveAndUnpackTarFile(PGconn *conn, PGresult *res, int rownum) /* * Symbolic link */ - filename[strlen(filename) - 1] = '\0'; /* Remove trailing slash */ + filename[strlen(filename) - 1] = '\0'; /* Remove trailing slash */ if (symlink(©buf[157], filename) != 0) { fprintf(stderr, _("%s: could not create symbolic link from \"%s\" to \"%s\": %s\n"), - progname, filename, ©buf[157], strerror(errno)); + progname, filename, ©buf[157], strerror(errno)); disconnect_and_exit(1); } } @@ -714,94 +888,12 @@ ReceiveAndUnpackTarFile(PGconn *conn, PGresult *res, int rownum) } -static PGconn * -GetConnection(void) -{ - PGconn *tmpconn; - int argcount = 4; /* dbname, replication, fallback_app_name, - * password */ - int i; - const char **keywords; - const char **values; - char *password = NULL; - - if (dbhost) - argcount++; - if (dbuser) - argcount++; - if (dbport) - argcount++; - - keywords = xmalloc0((argcount + 1) * sizeof(*keywords)); - values = xmalloc0((argcount + 1) * sizeof(*values)); - - keywords[0] = "dbname"; - values[0] = "replication"; - keywords[1] = "replication"; - values[1] = "true"; - keywords[2] = "fallback_application_name"; - values[2] = progname; - i = 3; - if (dbhost) - { - keywords[i] = "host"; - values[i] = dbhost; - i++; - } - if (dbuser) - { - keywords[i] = "user"; - values[i] = dbuser; - i++; - } - if (dbport) - { - keywords[i] = "port"; - values[i] = dbport; - i++; - } - - while (true) - { - if (dbgetpassword == 1) - { - /* Prompt for a password */ - password = simple_prompt(_("Password: "), 100, false); - keywords[argcount - 1] = "password"; - values[argcount - 1] = password; - } - - tmpconn = PQconnectdbParams(keywords, values, true); - if (password) - free(password); - - if (PQstatus(tmpconn) == CONNECTION_BAD && - PQconnectionNeedsPassword(tmpconn) && - dbgetpassword != -1) - { - dbgetpassword = 1; /* ask for password next time */ - PQfinish(tmpconn); - continue; - } - - if (PQstatus(tmpconn) != CONNECTION_OK) - { - fprintf(stderr, _("%s: could not connect to server: %s"), - progname, PQerrorMessage(tmpconn)); - exit(1); - } - - /* Connection ok! */ - free(values); - free(keywords); - return tmpconn; - } -} - static void BaseBackup(void) { PGresult *res; + char *sysidentifier; + uint32 timeline; char current_path[MAXPGPATH]; char escaped_label[MAXPGPATH]; int i; @@ -814,13 +906,33 @@ BaseBackup(void) conn = GetConnection(); /* + * Run IDENTIFY_SYSTEM so we can get the timeline + */ + res = PQexec(conn, "IDENTIFY_SYSTEM"); + if (PQresultStatus(res) != PGRES_TUPLES_OK) + { + fprintf(stderr, _("%s: could not identify system: %s\n"), + progname, PQerrorMessage(conn)); + disconnect_and_exit(1); + } + if (PQntuples(res) != 1) + { + fprintf(stderr, _("%s: could not identify system, got %i rows\n"), + progname, PQntuples(res)); + disconnect_and_exit(1); + } + sysidentifier = strdup(PQgetvalue(res, 0, 0)); + timeline = atoi(PQgetvalue(res, 0, 1)); + PQclear(res); + + /* * Start the actual backup */ PQescapeStringConn(conn, escaped_label, label, sizeof(escaped_label), &i); snprintf(current_path, sizeof(current_path), "BASE_BACKUP LABEL '%s' %s %s %s %s", escaped_label, showprogress ? "PROGRESS" : "", - includewal ? "WAL" : "", + includewal && !streamwal ? "WAL" : "", fastcheckpoint ? "FAST" : "", includewal ? "NOWAIT" : ""); @@ -899,6 +1011,18 @@ BaseBackup(void) } /* + * If we're streaming WAL, start the streaming session before we start + * receiving the actual data chunks. + */ + if (streamwal) + { + if (verbose) + fprintf(stderr, _("%s: starting background WAL receiver\n"), + progname); + StartLogStreamer(xlogstart, timeline, sysidentifier); + } + + /* * Start receiving chunks */ for (i = 0; i < PQntuples(res); i++) @@ -945,6 +1069,92 @@ BaseBackup(void) disconnect_and_exit(1); } + if (bgchild > 0) + { + int status; + +#ifndef WIN32 + int r; +#endif + + if (verbose) + fprintf(stderr, _("%s: waiting for background process to finish streaming...\n"), progname); + +#ifndef WIN32 + if (pipewrite(bgpipe[1], xlogend, strlen(xlogend)) != strlen(xlogend)) + { + fprintf(stderr, _("%s: could not send command to background pipe: %s\n"), + progname, strerror(errno)); + disconnect_and_exit(1); + } + + /* Just wait for the background process to exit */ + r = waitpid(bgchild, &status, 0); + if (r == -1) + { + fprintf(stderr, _("%s: could not wait for child process: %s\n"), + progname, strerror(errno)); + disconnect_and_exit(1); + } + if (r != bgchild) + { + fprintf(stderr, _("%s: child %i died, expected %i\n"), + progname, r, bgchild); + disconnect_and_exit(1); + } + if (!WIFEXITED(status)) + { + fprintf(stderr, _("%s: child process did not exit normally\n"), + progname); + disconnect_and_exit(1); + } + if (WEXITSTATUS(status) != 0) + { + fprintf(stderr, _("%s: child process exited with error %i\n"), + progname, WEXITSTATUS(status)); + disconnect_and_exit(1); + } + /* Exited normally, we're happy! */ +#else /* WIN32 */ + + /* + * On Windows, since we are in the same process, we can just store the + * value directly in the variable, and then set the flag that says + * it's there. + */ + if (sscanf(xlogend, "%X/%X", &xlogendptr.xlogid, &xlogendptr.xrecoff) != 2) + { + fprintf(stderr, _("%s: could not parse xlog end position \"%s\"\n"), + progname, xlogend); + exit(1); + } + InterlockedIncrement(&has_xlogendptr); + + /* First wait for the thread to exit */ + if (WaitForSingleObjectEx((HANDLE) bgchild, INFINITE, FALSE) != WAIT_OBJECT_0) + { + _dosmaperr(GetLastError()); + fprintf(stderr, _("%s: could not wait for child thread: %s\n"), + progname, strerror(errno)); + disconnect_and_exit(1); + } + if (GetExitCodeThread((HANDLE) bgchild, &status) == 0) + { + _dosmaperr(GetLastError()); + fprintf(stderr, _("%s: could not get child thread exit status: %s\n"), + progname, strerror(errno)); + disconnect_and_exit(1); + } + if (status != 0) + { + fprintf(stderr, _("%s: child thread exited with error %u\n"), + progname, status); + disconnect_and_exit(1); + } + /* Exited normally, we're happy */ +#endif + } + /* * End of copy data. Final result is already checked inside the loop. */ @@ -964,7 +1174,7 @@ main(int argc, char **argv) {"pgdata", required_argument, NULL, 'D'}, {"format", required_argument, NULL, 'F'}, {"checkpoint", required_argument, NULL, 'c'}, - {"xlog", no_argument, NULL, 'x'}, + {"xlog", required_argument, NULL, 'x'}, {"gzip", no_argument, NULL, 'z'}, {"compress", required_argument, NULL, 'Z'}, {"label", required_argument, NULL, 'l'}, @@ -973,6 +1183,7 @@ main(int argc, char **argv) {"username", required_argument, NULL, 'U'}, {"no-password", no_argument, NULL, 'w'}, {"password", no_argument, NULL, 'W'}, + {"statusint", required_argument, NULL, 's'}, {"verbose", no_argument, NULL, 'v'}, {"progress", no_argument, NULL, 'P'}, {NULL, 0, NULL, 0} @@ -999,7 +1210,7 @@ main(int argc, char **argv) } } - while ((c = getopt_long(argc, argv, "D:F:xl:zZ:c:h:p:U:wWvP", + while ((c = getopt_long(argc, argv, "D:F:x:l:zZ:c:h:p:U:s:wWvP", long_options, &option_index)) != -1) { switch (c) @@ -1021,6 +1232,18 @@ main(int argc, char **argv) break; case 'x': includewal = true; + if (strcmp(optarg, "f") == 0 || + strcmp(optarg, "fetch") == 0) + streamwal = false; + else if (strcmp(optarg, "s") == 0 || + strcmp(optarg, "stream") == 0) + streamwal = true; + else + { + fprintf(stderr, _("%s: invalid xlog option \"%s\", must be empty, \"fetch\" or \"stream\"\n"), + progname, optarg); + exit(1); + } break; case 'l': label = xstrdup(optarg); @@ -1068,6 +1291,15 @@ main(int argc, char **argv) case 'W': dbgetpassword = 1; break; + case 's': + standby_message_timeout = atoi(optarg); + if (standby_message_timeout < 0) + { + fprintf(stderr, _("%s: invalid status interval \"%s\"\n"), + progname, optarg); + exit(1); + } + break; case 'v': verbose++; break; @@ -1122,6 +1354,16 @@ main(int argc, char **argv) exit(1); } + if (format != 'p' && streamwal) + { + fprintf(stderr, + _("%s: wal streaming can only be used in plain mode\n"), + progname); + fprintf(stderr, _("Try \"%s --help\" for more information.\n"), + progname); + exit(1); + } + #ifndef HAVE_LIBZ if (compresslevel != 0) { |