summaryrefslogtreecommitdiff
path: root/src/backend/utils/cache/inval.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/utils/cache/inval.c')
-rw-r--r--src/backend/utils/cache/inval.c125
1 files changed, 125 insertions, 0 deletions
diff --git a/src/backend/utils/cache/inval.c b/src/backend/utils/cache/inval.c
index 700ccb6df9b..4eb67720737 100644
--- a/src/backend/utils/cache/inval.c
+++ b/src/backend/utils/cache/inval.c
@@ -271,6 +271,7 @@ int debug_discard_caches = 0;
#define MAX_SYSCACHE_CALLBACKS 64
#define MAX_RELCACHE_CALLBACKS 10
+#define MAX_RELSYNC_CALLBACKS 10
static struct SYSCACHECALLBACK
{
@@ -292,6 +293,15 @@ static struct RELCACHECALLBACK
static int relcache_callback_count = 0;
+static struct RELSYNCCALLBACK
+{
+ RelSyncCallbackFunction function;
+ Datum arg;
+} relsync_callback_list[MAX_RELSYNC_CALLBACKS];
+
+static int relsync_callback_count = 0;
+
+
/* ----------------------------------------------------------------
* Invalidation subgroup support functions
* ----------------------------------------------------------------
@@ -485,6 +495,36 @@ AddRelcacheInvalidationMessage(InvalidationMsgsGroup *group,
}
/*
+ * Add a relsync inval entry
+ *
+ * We put these into the relcache subgroup for simplicity. This message is the
+ * same as AddRelcacheInvalidationMessage() except that it is for
+ * RelationSyncCache maintained by decoding plugin pgoutput.
+ */
+static void
+AddRelsyncInvalidationMessage(InvalidationMsgsGroup *group,
+ Oid dbId, Oid relId)
+{
+ SharedInvalidationMessage msg;
+
+ /* Don't add a duplicate item. */
+ ProcessMessageSubGroup(group, RelCacheMsgs,
+ if (msg->rc.id == SHAREDINVALRELSYNC_ID &&
+ (msg->rc.relId == relId ||
+ msg->rc.relId == InvalidOid))
+ return);
+
+ /* OK, add the item */
+ msg.rc.id = SHAREDINVALRELSYNC_ID;
+ msg.rc.dbId = dbId;
+ msg.rc.relId = relId;
+ /* check AddCatcacheInvalidationMessage() for an explanation */
+ VALGRIND_MAKE_MEM_DEFINED(&msg, sizeof(msg));
+
+ AddInvalidationMessage(group, RelCacheMsgs, &msg);
+}
+
+/*
* Add a snapshot inval entry
*
* We put these into the relcache subgroup for simplicity.
@@ -612,6 +652,17 @@ RegisterRelcacheInvalidation(InvalidationInfo *info, Oid dbId, Oid relId)
}
/*
+ * RegisterRelsyncInvalidation
+ *
+ * As above, but register a relsynccache invalidation event.
+ */
+static void
+RegisterRelsyncInvalidation(InvalidationInfo *info, Oid dbId, Oid relId)
+{
+ AddRelsyncInvalidationMessage(&info->CurrentCmdInvalidMsgs, dbId, relId);
+}
+
+/*
* RegisterSnapshotInvalidation
*
* Register an invalidation event for MVCC scans against a given catalog.
@@ -751,6 +802,13 @@ InvalidateSystemCachesExtended(bool debug_discard)
ccitem->function(ccitem->arg, InvalidOid);
}
+
+ for (i = 0; i < relsync_callback_count; i++)
+ {
+ struct RELSYNCCALLBACK *ccitem = relsync_callback_list + i;
+
+ ccitem->function(ccitem->arg, InvalidOid);
+ }
}
/*
@@ -832,6 +890,12 @@ LocalExecuteInvalidationMessage(SharedInvalidationMessage *msg)
else if (msg->sn.dbId == MyDatabaseId)
InvalidateCatalogSnapshot();
}
+ else if (msg->id == SHAREDINVALRELSYNC_ID)
+ {
+ /* We only care about our own database */
+ if (msg->rs.dbId == MyDatabaseId)
+ CallRelSyncCallbacks(msg->rs.relid);
+ }
else
elog(FATAL, "unrecognized SI message ID: %d", msg->id);
}
@@ -1621,6 +1685,32 @@ CacheInvalidateRelcacheByRelid(Oid relid)
ReleaseSysCache(tup);
}
+/*
+ * CacheInvalidateRelSync
+ * Register invalidation of the cache in logical decoding output plugin
+ * for a database.
+ *
+ * This type of invalidation message is used for the specific purpose of output
+ * plugins. Processes which do not decode WALs would do nothing even when it
+ * receives the message.
+ */
+void
+CacheInvalidateRelSync(Oid relid)
+{
+ RegisterRelsyncInvalidation(PrepareInvalidationState(),
+ MyDatabaseId, relid);
+}
+
+/*
+ * CacheInvalidateRelSyncAll
+ * Register invalidation of the whole cache in logical decoding output
+ * plugin.
+ */
+void
+CacheInvalidateRelSyncAll(void)
+{
+ CacheInvalidateRelSync(InvalidOid);
+}
/*
* CacheInvalidateSmgr
@@ -1764,6 +1854,27 @@ CacheRegisterRelcacheCallback(RelcacheCallbackFunction func,
}
/*
+ * CacheRegisterRelSyncCallback
+ * Register the specified function to be called for all future
+ * relsynccache invalidation events.
+ *
+ * This function is intended to be call from the logical decoding output
+ * plugins.
+ */
+void
+CacheRegisterRelSyncCallback(RelSyncCallbackFunction func,
+ Datum arg)
+{
+ if (relsync_callback_count >= MAX_RELSYNC_CALLBACKS)
+ elog(FATAL, "out of relsync_callback_list slots");
+
+ relsync_callback_list[relsync_callback_count].function = func;
+ relsync_callback_list[relsync_callback_count].arg = arg;
+
+ ++relsync_callback_count;
+}
+
+/*
* CallSyscacheCallbacks
*
* This is exported so that CatalogCacheFlushCatalog can call it, saving
@@ -1789,6 +1900,20 @@ CallSyscacheCallbacks(int cacheid, uint32 hashvalue)
}
/*
+ * CallSyscacheCallbacks
+ */
+void
+CallRelSyncCallbacks(Oid relid)
+{
+ for (int i = 0; i < relsync_callback_count; i++)
+ {
+ struct RELSYNCCALLBACK *ccitem = relsync_callback_list + i;
+
+ ccitem->function(ccitem->arg, relid);
+ }
+}
+
+/*
* LogLogicalInvalidations
*
* Emit WAL for invalidations caused by the current command.