Fix data loss in logical replication.
authorAmit Kapila <[email protected]>
Thu, 24 Apr 2025 05:01:40 +0000 (10:31 +0530)
committerAmit Kapila <[email protected]>
Thu, 24 Apr 2025 05:01:40 +0000 (10:31 +0530)
This commit is a backpatch of commit 4909b38af0 for 13.

Data loss can happen when the DDLs like ALTER PUBLICATION ... ADD TABLE ...
or ALTER TYPE ...  that don't take a strong lock on table happens
concurrently to DMLs on the tables involved in the DDL. This happens
because logical decoding doesn't distribute invalidations to concurrent
transactions and those transactions use stale cache data to decode the
changes. The problem becomes bigger because we keep using the stale cache
even after those in-progress transactions are finished and skip the
changes required to be sent to the client.

This commit fixes the issue by distributing invalidation messages from
catalog-modifying transactions to all concurrent in-progress transactions.
This allows the necessary rebuild of the catalog cache when decoding new
changes after concurrent DDL.

The fix for 13 is different from what we did in branches 14 and above,
such that for 13, the concurrent DDL changes (from DDL types mentioned
earlier) will be visible for any newly started transactions. To make them
visible in concurrent transactions, we need to introduce a new change type
REORDER_BUFFER_CHANGE_INVALIDATION, already present in branches 14 and
greater. We decided not to take the risk of a bigger change and fix the
issue partially in 13.

Reported-by: hubert depesz lubaczewski <[email protected]>
Reported-by: Tomas Vondra <[email protected]>
Author: Shlok Kyal <[email protected]>
Author: Hayato Kuroda <[email protected]>
Reviewed-by: Zhijie Hou <[email protected]>
Reviewed-by: Masahiko Sawada <[email protected]>
Reviewed-by: Amit Kapila <[email protected]>
Tested-by: Benoit LobrĂ©au <[email protected]>
Discussion: https://postgr.es/m/de52b282-1166-1180-45a2-8d8917ca74c6@enterprisedb.com
Discussion: https://postgr.es/m/CAD21AoAenVqiMjpN-PvGHL1N9DWnHSq673bfgr6phmBUzx=kLQ@mail.gmail.com
Discussion: https://postgr.es/m/CAD21AoAhU3kp8shYqP=ExiFDZ9sZxpFb5WzLa0p+vEe5j+7CWQ@mail.gmail.com

contrib/test_decoding/Makefile
contrib/test_decoding/expected/invalidation_distrubution.out [new file with mode: 0644]
contrib/test_decoding/specs/invalidation_distrubution.spec [new file with mode: 0644]
src/backend/replication/logical/reorderbuffer.c
src/backend/replication/logical/snapbuild.c
src/include/replication/reorderbuffer.h

index 735b7e7653cf3798443423a549a36ccced9e6179..f122dc3a82d044af9767c91aa17d88c655e2a6a8 100644 (file)
@@ -8,7 +8,7 @@ REGRESS = ddl xact rewrite toast permissions decoding_in_xact \
    spill slot truncate
 ISOLATION = mxact delayed_startup ondisk_startup concurrent_ddl_dml \
    oldest_xmin snapshot_transfer subxact_without_top catalog_change_snapshot \
-   skip_snapshot_restore
+   skip_snapshot_restore invalidation_distrubution
 
 REGRESS_OPTS = --temp-config $(top_srcdir)/contrib/test_decoding/logical.conf
 ISOLATION_OPTS = --temp-config $(top_srcdir)/contrib/test_decoding/logical.conf
diff --git a/contrib/test_decoding/expected/invalidation_distrubution.out b/contrib/test_decoding/expected/invalidation_distrubution.out
new file mode 100644 (file)
index 0000000..eb70eda
--- /dev/null
@@ -0,0 +1,20 @@
+Parsed test spec with 2 sessions
+
+starting permutation: s1_insert_tbl1 s1_begin s1_insert_tbl1 s2_alter_pub_add_tbl s1_commit s1_insert_tbl1 s2_get_binary_changes
+step s1_insert_tbl1: INSERT INTO tbl1 (val1, val2) VALUES (1, 1);
+step s1_begin: BEGIN;
+step s1_insert_tbl1: INSERT INTO tbl1 (val1, val2) VALUES (1, 1);
+step s2_alter_pub_add_tbl: ALTER PUBLICATION pub ADD TABLE tbl1;
+step s1_commit: COMMIT;
+step s1_insert_tbl1: INSERT INTO tbl1 (val1, val2) VALUES (1, 1);
+step s2_get_binary_changes: SELECT count(data) FROM pg_logical_slot_get_binary_changes('isolation_slot', NULL, NULL, 'proto_version', '1', 'publication_names', 'pub') WHERE get_byte(data, 0) = 73;
+count
+-----
+    1
+(1 row)
+
+?column?
+--------
+stop    
+(1 row)
+
diff --git a/contrib/test_decoding/specs/invalidation_distrubution.spec b/contrib/test_decoding/specs/invalidation_distrubution.spec
new file mode 100644 (file)
index 0000000..ca051fc
--- /dev/null
@@ -0,0 +1,32 @@
+# Test that catalog cache invalidation messages are distributed to ongoing
+# transactions, ensuring they can access the updated catalog content after
+# processing these messages.
+setup
+{
+    SELECT 'init' FROM pg_create_logical_replication_slot('isolation_slot', 'pgoutput');
+    CREATE TABLE tbl1(val1 integer, val2 integer);
+    CREATE PUBLICATION pub;
+}
+
+teardown
+{
+    DROP TABLE tbl1;
+    DROP PUBLICATION pub;
+    SELECT 'stop' FROM pg_drop_replication_slot('isolation_slot');
+}
+
+session "s1"
+setup { SET synchronous_commit=on; }
+
+step "s1_begin" { BEGIN; }
+step "s1_insert_tbl1" { INSERT INTO tbl1 (val1, val2) VALUES (1, 1); }
+step "s1_commit" { COMMIT; }
+
+session "s2"
+setup { SET synchronous_commit=on; }
+
+step "s2_alter_pub_add_tbl" { ALTER PUBLICATION pub ADD TABLE tbl1; }
+step "s2_get_binary_changes" { SELECT count(data) FROM pg_logical_slot_get_binary_changes('isolation_slot', NULL, NULL, 'proto_version', '1', 'publication_names', 'pub') WHERE get_byte(data, 0) = 73; }
+
+# Expect to get one insert change. LOGICAL_REP_MSG_INSERT = 'I'
+permutation "s1_insert_tbl1" "s1_begin" "s1_insert_tbl1" "s2_alter_pub_add_tbl" "s1_commit" "s1_insert_tbl1" "s2_get_binary_changes"
index 56c25e3a6da5fe5d7ea980ef13aa5ab7599c1c72..fa9413fa2a0acb93d8186de0ae67f56f953c2905 100644 (file)
@@ -2264,20 +2264,45 @@ ReorderBufferAddInvalidations(ReorderBuffer *rb, TransactionId xid,
                              SharedInvalidationMessage *msgs)
 {
    ReorderBufferTXN *txn;
+   MemoryContext oldcontext;
 
    txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
 
-   if (txn->ninvalidations != 0)
-       elog(ERROR, "only ever add one set of invalidations");
+   oldcontext = MemoryContextSwitchTo(rb->context);
+
+   /*
+    * Collect all the invalidations under the top transaction, if available,
+    * so that we can execute them all together.
+    */
+   if (txn->toplevel_xid)
+   {
+       txn = ReorderBufferTXNByXid(rb, txn->toplevel_xid, true, NULL, lsn,
+                                   true);
+   }
 
    Assert(nmsgs > 0);
 
-   txn->ninvalidations = nmsgs;
-   txn->invalidations = (SharedInvalidationMessage *)
-       MemoryContextAlloc(rb->context,
-                          sizeof(SharedInvalidationMessage) * nmsgs);
-   memcpy(txn->invalidations, msgs,
-          sizeof(SharedInvalidationMessage) * nmsgs);
+   /* Accumulate invalidations. */
+   if (txn->ninvalidations == 0)
+   {
+       txn->ninvalidations = nmsgs;
+       txn->invalidations = (SharedInvalidationMessage *)
+           palloc(sizeof(SharedInvalidationMessage) * nmsgs);
+       memcpy(txn->invalidations, msgs,
+              sizeof(SharedInvalidationMessage) * nmsgs);
+   }
+   else
+   {
+       txn->invalidations = (SharedInvalidationMessage *)
+           repalloc(txn->invalidations, sizeof(SharedInvalidationMessage) *
+                    (txn->ninvalidations + nmsgs));
+
+       memcpy(txn->invalidations + txn->ninvalidations, msgs,
+              nmsgs * sizeof(SharedInvalidationMessage));
+       txn->ninvalidations += nmsgs;
+   }
+
+   MemoryContextSwitchTo(oldcontext);
 }
 
 /*
@@ -3895,3 +3920,26 @@ restart:
        *cmax = ent->cmax;
    return true;
 }
+
+/*
+ * Count invalidation messages of specified transaction.
+ *
+ * Returns number of messages, and msgs is set to the pointer of the linked
+ * list for the messages.
+ */
+uint32
+ReorderBufferGetInvalidations(ReorderBuffer *rb, TransactionId xid,
+                             SharedInvalidationMessage **msgs)
+{
+   ReorderBufferTXN *txn;
+
+   txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
+                               false);
+
+   if (txn == NULL)
+       return 0;
+
+   *msgs = txn->invalidations;
+
+   return txn->ninvalidations;
+}
index 7546de967634ed98aea7faa650ebd158c2ecfcda..3bda41c52511c496cde47245662618e5df073305 100644 (file)
@@ -292,7 +292,7 @@ static void SnapBuildFreeSnapshot(Snapshot snap);
 
 static void SnapBuildSnapIncRefcount(Snapshot snap);
 
-static void SnapBuildDistributeNewCatalogSnapshot(SnapBuild *builder, XLogRecPtr lsn);
+static void SnapBuildDistributeSnapshotAndInval(SnapBuild *builder, XLogRecPtr lsn, TransactionId xid);
 
 /* xlog reading helper functions for SnapBuildProcessRunningXacts */
 static bool SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *running);
@@ -861,15 +861,15 @@ SnapBuildProcessNewCid(SnapBuild *builder, TransactionId xid,
 }
 
 /*
- * Add a new Snapshot to all transactions we're decoding that currently are
- * in-progress so they can see new catalog contents made by the transaction
- * that just committed. This is necessary because those in-progress
- * transactions will use the new catalog's contents from here on (at the very
- * least everything they do needs to be compatible with newer catalog
- * contents).
+ * Add a new Snapshot and invalidation messages to all transactions we're
+ * decoding that currently are in-progress so they can see new catalog contents
+ * made by the transaction that just committed. This is necessary because those
+ * in-progress transactions will use the new catalog's contents from here on
+ * (at the very least everything they do needs to be compatible with newer
+ * catalog contents).
  */
 static void
-SnapBuildDistributeNewCatalogSnapshot(SnapBuild *builder, XLogRecPtr lsn)
+SnapBuildDistributeSnapshotAndInval(SnapBuild *builder, XLogRecPtr lsn, TransactionId xid)
 {
    dlist_iter  txn_i;
    ReorderBufferTXN *txn;
@@ -877,7 +877,8 @@ SnapBuildDistributeNewCatalogSnapshot(SnapBuild *builder, XLogRecPtr lsn)
    /*
     * Iterate through all toplevel transactions. This can include
     * subtransactions which we just don't yet know to be that, but that's
-    * fine, they will just get an unnecessary snapshot queued.
+    * fine, they will just get an unnecessary snapshot and invalidations
+    * queued.
     */
    dlist_foreach(txn_i, &builder->reorder->toplevel_by_lsn)
    {
@@ -890,6 +891,14 @@ SnapBuildDistributeNewCatalogSnapshot(SnapBuild *builder, XLogRecPtr lsn)
         * transaction which in turn implies we don't yet need a snapshot at
         * all. We'll add a snapshot when the first change gets queued.
         *
+        * Similarly, we don't need to add invalidations to a transaction whose
+        * base snapshot is not yet set. Once a base snapshot is built, it will
+        * include the xids of committed transactions that have modified the
+        * catalog, thus reflecting the new catalog contents. The existing
+        * catalog cache will have already been invalidated after processing
+        * the invalidations in the transaction that modified catalogs,
+        * ensuring that a fresh cache is constructed during decoding.
+        *
         * NB: This works correctly even for subtransactions because
         * ReorderBufferAssignChild() takes care to transfer the base snapshot
         * to the top-level transaction, and while iterating the changequeue
@@ -898,7 +907,7 @@ SnapBuildDistributeNewCatalogSnapshot(SnapBuild *builder, XLogRecPtr lsn)
        if (!ReorderBufferXidHasBaseSnapshot(builder->reorder, txn->xid))
            continue;
 
-       elog(DEBUG2, "adding a new snapshot to %u at %X/%X",
+       elog(DEBUG2, "adding a new snapshot and invalidations to %u at %X/%X",
             txn->xid, (uint32) (lsn >> 32), (uint32) lsn);
 
        /*
@@ -908,6 +917,33 @@ SnapBuildDistributeNewCatalogSnapshot(SnapBuild *builder, XLogRecPtr lsn)
        SnapBuildSnapIncRefcount(builder->snapshot);
        ReorderBufferAddSnapshot(builder->reorder, txn->xid, lsn,
                                 builder->snapshot);
+
+       /*
+        * Add invalidation messages to the reorder buffer of in-progress
+        * transactions except the current committed transaction, for which we
+        * will execute invalidations at the end.
+        *
+        * It is required, otherwise, we will end up using the stale catcache
+        * contents built by the current transaction even after its decoding,
+        * which should have been invalidated due to concurrent catalog
+        * changing transaction.
+        */
+       if (txn->xid != xid)
+       {
+           uint32 ninvalidations;
+           SharedInvalidationMessage *msgs = NULL;
+
+           ninvalidations = ReorderBufferGetInvalidations(builder->reorder,
+                                                          xid, &msgs);
+
+           if (ninvalidations > 0)
+           {
+               Assert(msgs != NULL);
+
+               ReorderBufferAddInvalidations(builder->reorder, txn->xid, lsn,
+                                             ninvalidations, msgs);
+           }
+       }
    }
 }
 
@@ -1186,8 +1222,11 @@ SnapBuildCommitTxn(SnapBuild *builder, XLogRecPtr lsn, TransactionId xid,
        /* refcount of the snapshot builder for the new snapshot */
        SnapBuildSnapIncRefcount(builder->snapshot);
 
-       /* add a new catalog snapshot to all currently running transactions */
-       SnapBuildDistributeNewCatalogSnapshot(builder, lsn);
+       /*
+        * Add a new catalog snapshot and invalidations messages to all
+        * currently running transactions.
+        */
+       SnapBuildDistributeSnapshotAndInval(builder, lsn, xid);
    }
 }
 
index 5347597e92b4d454a3d80bcd8947b280bda8b463..545cee891eda8eb8be028b2aa1c5d2f7a442015b 100644 (file)
@@ -463,6 +463,10 @@ TransactionId ReorderBufferGetOldestXmin(ReorderBuffer *rb);
 
 void       ReorderBufferSetRestartPoint(ReorderBuffer *, XLogRecPtr ptr);
 
+uint32     ReorderBufferGetInvalidations(ReorderBuffer *rb,
+                                         TransactionId xid,
+                                         SharedInvalidationMessage **msgs);
+
 void       StartupReorderBuffer(void);
 
 #endif