summaryrefslogtreecommitdiff
path: root/src/include/replication
diff options
context:
space:
mode:
authorAmit Kapila2024-07-24 04:43:36 +0000
committerAmit Kapila2024-07-24 04:43:36 +0000
commit1462aad2e4474ab61174f8ab00992cd3d6d57c7b (patch)
tree9649b47a93c4b7257db1295f65f8d0213c5b3537 /src/include/replication
parent774d47b6c01a8b8111ae390b97343f25ebdf9267 (diff)
Allow altering of two_phase option of a SUBSCRIPTION.
The two_phase option is controlled by both the publisher (as a slot option) and the subscriber (as a subscription option), so the slot option must also be modified. Changing the 'two_phase' option for a subscription from 'true' to 'false' is permitted only when there are no pending prepared transactions corresponding to that subscription. Otherwise, the changes of already prepared transactions can be replicated again along with their corresponding commit leading to duplicate data or errors. To avoid data loss, the 'two_phase' option for a subscription can only be changed from 'false' to 'true' once the initial data synchronization is completed. Therefore this is performed later by the logical replication worker. Author: Hayato Kuroda, Ajin Cherian, Amit Kapila Reviewed-by: Peter Smith, Hou Zhijie, Amit Kapila, Vitaly Davydov, Vignesh C Discussion: https://postgr.es/m/8fab8-65d74c80-1-2f28e880@39088166
Diffstat (limited to 'src/include/replication')
-rw-r--r--src/include/replication/slot.h3
-rw-r--r--src/include/replication/walreceiver.h12
-rw-r--r--src/include/replication/worker_internal.h3
3 files changed, 11 insertions, 7 deletions
diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h
index c9675ee87cc..c2ee149fd66 100644
--- a/src/include/replication/slot.h
+++ b/src/include/replication/slot.h
@@ -243,7 +243,8 @@ extern void ReplicationSlotCreate(const char *name, bool db_specific,
extern void ReplicationSlotPersist(void);
extern void ReplicationSlotDrop(const char *name, bool nowait);
extern void ReplicationSlotDropAcquired(void);
-extern void ReplicationSlotAlter(const char *name, bool failover);
+extern void ReplicationSlotAlter(const char *name, const bool *failover,
+ const bool *two_phase);
extern void ReplicationSlotAcquire(const char *name, bool nowait);
extern void ReplicationSlotRelease(void);
diff --git a/src/include/replication/walreceiver.h b/src/include/replication/walreceiver.h
index 12f71fa99b0..132e789948b 100644
--- a/src/include/replication/walreceiver.h
+++ b/src/include/replication/walreceiver.h
@@ -372,12 +372,14 @@ typedef char *(*walrcv_create_slot_fn) (WalReceiverConn *conn,
/*
* walrcv_alter_slot_fn
*
- * Change the definition of a replication slot. Currently, it only supports
- * changing the failover property of the slot.
+ * Change the definition of a replication slot. Currently, it supports
+ * changing the failover and two_phase properties of the slot.
*/
typedef void (*walrcv_alter_slot_fn) (WalReceiverConn *conn,
const char *slotname,
- bool failover);
+ const bool *failover,
+ const bool *two_phase);
+
/*
* walrcv_get_backend_pid_fn
@@ -455,8 +457,8 @@ extern PGDLLIMPORT WalReceiverFunctionsType *WalReceiverFunctions;
WalReceiverFunctions->walrcv_send(conn, buffer, nbytes)
#define walrcv_create_slot(conn, slotname, temporary, two_phase, failover, snapshot_action, lsn) \
WalReceiverFunctions->walrcv_create_slot(conn, slotname, temporary, two_phase, failover, snapshot_action, lsn)
-#define walrcv_alter_slot(conn, slotname, failover) \
- WalReceiverFunctions->walrcv_alter_slot(conn, slotname, failover)
+#define walrcv_alter_slot(conn, slotname, failover, two_phase) \
+ WalReceiverFunctions->walrcv_alter_slot(conn, slotname, failover, two_phase)
#define walrcv_get_backend_pid(conn) \
WalReceiverFunctions->walrcv_get_backend_pid(conn)
#define walrcv_exec(conn, exec, nRetTypes, retTypes) \
diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h
index 515aefd5191..9646261d7e9 100644
--- a/src/include/replication/worker_internal.h
+++ b/src/include/replication/worker_internal.h
@@ -240,7 +240,8 @@ extern PGDLLIMPORT bool InitializingApplyWorker;
extern void logicalrep_worker_attach(int slot);
extern LogicalRepWorker *logicalrep_worker_find(Oid subid, Oid relid,
bool only_running);
-extern List *logicalrep_workers_find(Oid subid, bool only_running);
+extern List *logicalrep_workers_find(Oid subid, bool only_running,
+ bool acquire_lock);
extern bool logicalrep_worker_launch(LogicalRepWorkerType wtype,
Oid dbid, Oid subid, const char *subname,
Oid userid, Oid relid,