diff options
Diffstat (limited to 'src/backend/replication/libpqwalreceiver/libpqwalreceiver.c')
-rw-r--r-- | src/backend/replication/libpqwalreceiver/libpqwalreceiver.c | 208 |
1 files changed, 42 insertions, 166 deletions
diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c index ee3101c093e..200e8f1aabd 100644 --- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c +++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c @@ -25,6 +25,7 @@ #include "common/connect.h" #include "funcapi.h" #include "libpq-fe.h" +#include "libpq/libpq-be-fe-helpers.h" #include "mb/pg_wchar.h" #include "miscadmin.h" #include "pgstat.h" @@ -113,8 +114,6 @@ static WalReceiverFunctionsType PQWalReceiverFunctions = { }; /* Prototypes for private functions */ -static PGresult *libpqrcv_PQexec(PGconn *streamConn, const char *query); -static PGresult *libpqrcv_PQgetResult(PGconn *streamConn); static char *stringlist_to_identifierstr(PGconn *conn, List *strings); /* @@ -148,7 +147,6 @@ libpqrcv_connect(const char *conninfo, bool replication, bool logical, bool must_use_password, const char *appname, char **err) { WalReceiverConn *conn; - PostgresPollingStatusType status; const char *keys[6]; const char *vals[6]; int i = 0; @@ -214,56 +212,17 @@ libpqrcv_connect(const char *conninfo, bool replication, bool logical, Assert(i < lengthof(keys)); conn = palloc0(sizeof(WalReceiverConn)); - conn->streamConn = PQconnectStartParams(keys, vals, - /* expand_dbname = */ true); - if (PQstatus(conn->streamConn) == CONNECTION_BAD) - goto bad_connection_errmsg; - - /* - * Poll connection until we have OK or FAILED status. - * - * Per spec for PQconnectPoll, first wait till socket is write-ready. - */ - status = PGRES_POLLING_WRITING; - do - { - int io_flag; - int rc; - - if (status == PGRES_POLLING_READING) - io_flag = WL_SOCKET_READABLE; -#ifdef WIN32 - /* Windows needs a different test while waiting for connection-made */ - else if (PQstatus(conn->streamConn) == CONNECTION_STARTED) - io_flag = WL_SOCKET_CONNECTED; -#endif - else - io_flag = WL_SOCKET_WRITEABLE; - - rc = WaitLatchOrSocket(MyLatch, - WL_EXIT_ON_PM_DEATH | WL_LATCH_SET | io_flag, - PQsocket(conn->streamConn), - 0, - WAIT_EVENT_LIBPQWALRECEIVER_CONNECT); - - /* Interrupted? */ - if (rc & WL_LATCH_SET) - { - ResetLatch(MyLatch); - ProcessWalRcvInterrupts(); - } - - /* If socket is ready, advance the libpq state machine */ - if (rc & io_flag) - status = PQconnectPoll(conn->streamConn); - } while (status != PGRES_POLLING_OK && status != PGRES_POLLING_FAILED); + conn->streamConn = + libpqsrv_connect_params(keys, vals, + /* expand_dbname = */ true, + WAIT_EVENT_LIBPQWALRECEIVER_CONNECT); if (PQstatus(conn->streamConn) != CONNECTION_OK) goto bad_connection_errmsg; if (must_use_password && !PQconnectionUsedPassword(conn->streamConn)) { - PQfinish(conn->streamConn); + libpqsrv_disconnect(conn->streamConn); pfree(conn); ereport(ERROR, @@ -281,8 +240,9 @@ libpqrcv_connect(const char *conninfo, bool replication, bool logical, { PGresult *res; - res = libpqrcv_PQexec(conn->streamConn, - ALWAYS_SECURE_SEARCH_PATH_SQL); + res = libpqsrv_exec(conn->streamConn, + ALWAYS_SECURE_SEARCH_PATH_SQL, + WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE); if (PQresultStatus(res) != PGRES_TUPLES_OK) { PQclear(res); @@ -303,7 +263,7 @@ bad_connection_errmsg: /* error path, error already set */ bad_connection: - PQfinish(conn->streamConn); + libpqsrv_disconnect(conn->streamConn); pfree(conn); return NULL; } @@ -454,7 +414,9 @@ libpqrcv_identify_system(WalReceiverConn *conn, TimeLineID *primary_tli) * Get the system identifier and timeline ID as a DataRow message from the * primary server. */ - res = libpqrcv_PQexec(conn->streamConn, "IDENTIFY_SYSTEM"); + res = libpqsrv_exec(conn->streamConn, + "IDENTIFY_SYSTEM", + WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE); if (PQresultStatus(res) != PGRES_TUPLES_OK) { PQclear(res); @@ -631,7 +593,9 @@ libpqrcv_startstreaming(WalReceiverConn *conn, options->proto.physical.startpointTLI); /* Start streaming. */ - res = libpqrcv_PQexec(conn->streamConn, cmd.data); + res = libpqsrv_exec(conn->streamConn, + cmd.data, + WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE); pfree(cmd.data); if (PQresultStatus(res) == PGRES_COMMAND_OK) @@ -661,7 +625,7 @@ libpqrcv_endstreaming(WalReceiverConn *conn, TimeLineID *next_tli) PGresult *res; /* - * Send copy-end message. As in libpqrcv_PQexec, this could theoretically + * Send copy-end message. As in libpqsrv_exec, this could theoretically * block, but the risk seems small. */ if (PQputCopyEnd(conn->streamConn, NULL) <= 0 || @@ -681,7 +645,8 @@ libpqrcv_endstreaming(WalReceiverConn *conn, TimeLineID *next_tli) * If we had not yet received CopyDone from the backend, PGRES_COPY_OUT is * also possible in case we aborted the copy in mid-stream. */ - res = libpqrcv_PQgetResult(conn->streamConn); + res = libpqsrv_get_result(conn->streamConn, + WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE); if (PQresultStatus(res) == PGRES_TUPLES_OK) { /* @@ -696,7 +661,8 @@ libpqrcv_endstreaming(WalReceiverConn *conn, TimeLineID *next_tli) PQclear(res); /* the result set should be followed by CommandComplete */ - res = libpqrcv_PQgetResult(conn->streamConn); + res = libpqsrv_get_result(conn->streamConn, + WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE); } else if (PQresultStatus(res) == PGRES_COPY_OUT) { @@ -710,7 +676,8 @@ libpqrcv_endstreaming(WalReceiverConn *conn, TimeLineID *next_tli) pchomp(PQerrorMessage(conn->streamConn))))); /* CommandComplete should follow */ - res = libpqrcv_PQgetResult(conn->streamConn); + res = libpqsrv_get_result(conn->streamConn, + WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE); } if (PQresultStatus(res) != PGRES_COMMAND_OK) @@ -721,7 +688,8 @@ libpqrcv_endstreaming(WalReceiverConn *conn, TimeLineID *next_tli) PQclear(res); /* Verify that there are no more results */ - res = libpqrcv_PQgetResult(conn->streamConn); + res = libpqsrv_get_result(conn->streamConn, + WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE); if (res != NULL) ereport(ERROR, (errcode(ERRCODE_PROTOCOL_VIOLATION), @@ -746,7 +714,9 @@ libpqrcv_readtimelinehistoryfile(WalReceiverConn *conn, * Request the primary to send over the history file for given timeline. */ snprintf(cmd, sizeof(cmd), "TIMELINE_HISTORY %u", tli); - res = libpqrcv_PQexec(conn->streamConn, cmd); + res = libpqsrv_exec(conn->streamConn, + cmd, + WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE); if (PQresultStatus(res) != PGRES_TUPLES_OK) { PQclear(res); @@ -777,113 +747,12 @@ libpqrcv_readtimelinehistoryfile(WalReceiverConn *conn, } /* - * Send a query and wait for the results by using the asynchronous libpq - * functions and socket readiness events. - * - * The function is modeled on libpqsrv_exec(), with the behavior difference - * being that it calls ProcessWalRcvInterrupts(). As an optimization, it - * skips try/catch, since all errors terminate the process. - * - * May return NULL, rather than an error result, on failure. - */ -static PGresult * -libpqrcv_PQexec(PGconn *streamConn, const char *query) -{ - PGresult *lastResult = NULL; - - /* - * PQexec() silently discards any prior query results on the connection. - * This is not required for this function as it's expected that the caller - * (which is this library in all cases) will behave correctly and we don't - * have to be backwards compatible with old libpq. - */ - - /* - * Submit the query. Since we don't use non-blocking mode, this could - * theoretically block. In practice, since we don't send very long query - * strings, the risk seems negligible. - */ - if (!PQsendQuery(streamConn, query)) - return NULL; - - for (;;) - { - /* Wait for, and collect, the next PGresult. */ - PGresult *result; - - result = libpqrcv_PQgetResult(streamConn); - if (result == NULL) - break; /* query is complete, or failure */ - - /* - * Emulate PQexec()'s behavior of returning the last result when there - * are many. We are fine with returning just last error message. - */ - PQclear(lastResult); - lastResult = result; - - if (PQresultStatus(lastResult) == PGRES_COPY_IN || - PQresultStatus(lastResult) == PGRES_COPY_OUT || - PQresultStatus(lastResult) == PGRES_COPY_BOTH || - PQstatus(streamConn) == CONNECTION_BAD) - break; - } - - return lastResult; -} - -/* - * Perform the equivalent of PQgetResult(), but watch for interrupts. - */ -static PGresult * -libpqrcv_PQgetResult(PGconn *streamConn) -{ - /* - * Collect data until PQgetResult is ready to get the result without - * blocking. - */ - while (PQisBusy(streamConn)) - { - int rc; - - /* - * We don't need to break down the sleep into smaller increments, - * since we'll get interrupted by signals and can handle any - * interrupts here. - */ - rc = WaitLatchOrSocket(MyLatch, - WL_EXIT_ON_PM_DEATH | WL_SOCKET_READABLE | - WL_LATCH_SET, - PQsocket(streamConn), - 0, - WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE); - - /* Interrupted? */ - if (rc & WL_LATCH_SET) - { - ResetLatch(MyLatch); - ProcessWalRcvInterrupts(); - } - - /* Consume whatever data is available from the socket */ - if (PQconsumeInput(streamConn) == 0) - { - /* trouble; return NULL */ - return NULL; - } - } - - /* Now we can collect and return the next PGresult */ - return PQgetResult(streamConn); -} - -/* * Disconnect connection to primary, if any. */ static void libpqrcv_disconnect(WalReceiverConn *conn) { - PQfinish(conn->streamConn); + libpqsrv_disconnect(conn->streamConn); PQfreemem(conn->recvBuf); pfree(conn); } @@ -937,13 +806,15 @@ libpqrcv_receive(WalReceiverConn *conn, char **buffer, { PGresult *res; - res = libpqrcv_PQgetResult(conn->streamConn); + res = libpqsrv_get_result(conn->streamConn, + WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE); if (PQresultStatus(res) == PGRES_COMMAND_OK) { PQclear(res); /* Verify that there are no more results. */ - res = libpqrcv_PQgetResult(conn->streamConn); + res = libpqsrv_get_result(conn->streamConn, + WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE); if (res != NULL) { PQclear(res); @@ -1094,7 +965,9 @@ libpqrcv_create_slot(WalReceiverConn *conn, const char *slotname, appendStringInfoString(&cmd, " PHYSICAL RESERVE_WAL"); } - res = libpqrcv_PQexec(conn->streamConn, cmd.data); + res = libpqsrv_exec(conn->streamConn, + cmd.data, + WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE); pfree(cmd.data); if (PQresultStatus(res) != PGRES_TUPLES_OK) @@ -1147,7 +1020,8 @@ libpqrcv_alter_slot(WalReceiverConn *conn, const char *slotname, appendStringInfoString(&cmd, " );"); - res = libpqrcv_PQexec(conn->streamConn, cmd.data); + res = libpqsrv_exec(conn->streamConn, cmd.data, + WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE); pfree(cmd.data); if (PQresultStatus(res) != PGRES_COMMAND_OK) @@ -1214,7 +1088,7 @@ libpqrcv_processTuples(PGresult *pgres, WalRcvExecResult *walres, { char *cstrs[MaxTupleAttributeNumber]; - ProcessWalRcvInterrupts(); + CHECK_FOR_INTERRUPTS(); /* Do the allocations in temporary context. */ oldcontext = MemoryContextSwitchTo(rowcontext); @@ -1260,7 +1134,9 @@ libpqrcv_exec(WalReceiverConn *conn, const char *query, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), errmsg("the query interface requires a database connection"))); - pgres = libpqrcv_PQexec(conn->streamConn, query); + pgres = libpqsrv_exec(conn->streamConn, + query, + WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE); switch (PQresultStatus(pgres)) { |