summaryrefslogtreecommitdiff
path: root/src/backend/replication/logical/worker.c
diff options
context:
space:
mode:
authorAmit Kapila2024-07-24 04:43:36 +0000
committerAmit Kapila2024-07-24 04:43:36 +0000
commit1462aad2e4474ab61174f8ab00992cd3d6d57c7b (patch)
tree9649b47a93c4b7257db1295f65f8d0213c5b3537 /src/backend/replication/logical/worker.c
parent774d47b6c01a8b8111ae390b97343f25ebdf9267 (diff)
Allow altering of two_phase option of a SUBSCRIPTION.
The two_phase option is controlled by both the publisher (as a slot option) and the subscriber (as a subscription option), so the slot option must also be modified. Changing the 'two_phase' option for a subscription from 'true' to 'false' is permitted only when there are no pending prepared transactions corresponding to that subscription. Otherwise, the changes of already prepared transactions can be replicated again along with their corresponding commit leading to duplicate data or errors. To avoid data loss, the 'two_phase' option for a subscription can only be changed from 'false' to 'true' once the initial data synchronization is completed. Therefore this is performed later by the logical replication worker. Author: Hayato Kuroda, Ajin Cherian, Amit Kapila Reviewed-by: Peter Smith, Hou Zhijie, Amit Kapila, Vitaly Davydov, Vignesh C Discussion: https://postgr.es/m/8fab8-65d74c80-1-2f28e880@39088166
Diffstat (limited to 'src/backend/replication/logical/worker.c')
-rw-r--r--src/backend/replication/logical/worker.c25
1 files changed, 2 insertions, 23 deletions
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index c0bda6269bd..ec96b5fe85e 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -401,9 +401,6 @@ static void apply_handle_tuple_routing(ApplyExecutionData *edata,
LogicalRepTupleData *newtup,
CmdType operation);
-/* Compute GID for two_phase transactions */
-static void TwoPhaseTransactionGid(Oid subid, TransactionId xid, char *gid, int szgid);
-
/* Functions for skipping changes */
static void maybe_start_skipping_changes(XLogRecPtr finish_lsn);
static void stop_skipping_changes(void);
@@ -3911,7 +3908,7 @@ maybe_reread_subscription(void)
/* !slotname should never happen when enabled is true. */
Assert(newsub->slotname);
- /* two-phase should not be altered */
+ /* two-phase cannot be altered while the worker is running */
Assert(newsub->twophasestate == MySubscription->twophasestate);
/*
@@ -4397,24 +4394,6 @@ cleanup_subxact_info()
}
/*
- * Form the prepared transaction GID for two_phase transactions.
- *
- * Return the GID in the supplied buffer.
- */
-static void
-TwoPhaseTransactionGid(Oid subid, TransactionId xid, char *gid, int szgid)
-{
- Assert(subid != InvalidRepOriginId);
-
- if (!TransactionIdIsValid(xid))
- ereport(ERROR,
- (errcode(ERRCODE_PROTOCOL_VIOLATION),
- errmsg_internal("invalid two-phase transaction ID")));
-
- snprintf(gid, szgid, "pg_gid_%u_%u", subid, xid);
-}
-
-/*
* Common function to run the apply loop with error handling. Disable the
* subscription, if necessary.
*
@@ -5014,7 +4993,7 @@ AtEOXact_LogicalRepWorkers(bool isCommit)
List *workers;
ListCell *lc2;
- workers = logicalrep_workers_find(subid, true);
+ workers = logicalrep_workers_find(subid, true, false);
foreach(lc2, workers)
{
LogicalRepWorker *worker = (LogicalRepWorker *) lfirst(lc2);