PostgreSQL Source Code git master
launcher.c File Reference
#include "postgres.h"
#include "access/heapam.h"
#include "access/htup.h"
#include "access/htup_details.h"
#include "access/tableam.h"
#include "access/xact.h"
#include "catalog/pg_subscription.h"
#include "catalog/pg_subscription_rel.h"
#include "funcapi.h"
#include "lib/dshash.h"
#include "miscadmin.h"
#include "pgstat.h"
#include "postmaster/bgworker.h"
#include "postmaster/interrupt.h"
#include "replication/logicallauncher.h"
#include "replication/origin.h"
#include "replication/slot.h"
#include "replication/walreceiver.h"
#include "replication/worker_internal.h"
#include "storage/ipc.h"
#include "storage/proc.h"
#include "storage/procarray.h"
#include "tcop/tcopprot.h"
#include "utils/builtins.h"
#include "utils/memutils.h"
#include "utils/pg_lsn.h"
#include "utils/snapmgr.h"
#include "utils/syscache.h"
Include dependency graph for launcher.c:

Go to the source code of this file.

Data Structures

struct  LogicalRepCtxStruct
 
struct  LauncherLastStartTimesEntry
 

Macros

#define DEFAULT_NAPTIME_PER_CYCLE   180000L
 
#define PG_STAT_GET_SUBSCRIPTION_COLS   10
 

Typedefs

typedef struct LogicalRepCtxStruct LogicalRepCtxStruct
 
typedef struct LauncherLastStartTimesEntry LauncherLastStartTimesEntry
 

Functions

static void logicalrep_launcher_onexit (int code, Datum arg)
 
static void logicalrep_worker_onexit (int code, Datum arg)
 
static void logicalrep_worker_detach (void)
 
static void logicalrep_worker_cleanup (LogicalRepWorker *worker)
 
static int logicalrep_pa_worker_count (Oid subid)
 
static void logicalrep_launcher_attach_dshmem (void)
 
static void ApplyLauncherSetWorkerStartTime (Oid subid, TimestampTz start_time)
 
static TimestampTz ApplyLauncherGetWorkerStartTime (Oid subid)
 
static void compute_min_nonremovable_xid (LogicalRepWorker *worker, TransactionId *xmin)
 
static bool acquire_conflict_slot_if_exists (void)
 
static void update_conflict_slot_xmin (TransactionId new_xmin)
 
static void init_conflict_slot_xmin (void)
 
static Listget_subscription_list (void)
 
static bool WaitForReplicationWorkerAttach (LogicalRepWorker *worker, uint16 generation, BackgroundWorkerHandle *handle)
 
LogicalRepWorkerlogicalrep_worker_find (LogicalRepWorkerType wtype, Oid subid, Oid relid, bool only_running)
 
Listlogicalrep_workers_find (Oid subid, bool only_running, bool acquire_lock)
 
bool logicalrep_worker_launch (LogicalRepWorkerType wtype, Oid dbid, Oid subid, const char *subname, Oid userid, Oid relid, dsm_handle subworker_dsm, bool retain_dead_tuples)
 
static void logicalrep_worker_stop_internal (LogicalRepWorker *worker, int signo)
 
void logicalrep_worker_stop (LogicalRepWorkerType wtype, Oid subid, Oid relid)
 
void logicalrep_pa_worker_stop (ParallelApplyWorkerInfo *winfo)
 
void logicalrep_worker_wakeup (LogicalRepWorkerType wtype, Oid subid, Oid relid)
 
void logicalrep_worker_wakeup_ptr (LogicalRepWorker *worker)
 
void logicalrep_worker_attach (int slot)
 
int logicalrep_sync_worker_count (Oid subid)
 
Size ApplyLauncherShmemSize (void)
 
void ApplyLauncherRegister (void)
 
void ApplyLauncherShmemInit (void)
 
void ApplyLauncherForgetWorkerStartTime (Oid subid)
 
void AtEOXact_ApplyLauncher (bool isCommit)
 
void ApplyLauncherWakeupAtCommit (void)
 
void ApplyLauncherWakeup (void)
 
void ApplyLauncherMain (Datum main_arg)
 
void CreateConflictDetectionSlot (void)
 
bool IsLogicalLauncher (void)
 
pid_t GetLeaderApplyWorkerPid (pid_t pid)
 
Datum pg_stat_get_subscription (PG_FUNCTION_ARGS)
 

Variables

int max_logical_replication_workers = 4
 
int max_sync_workers_per_subscription = 2
 
int max_parallel_apply_workers_per_subscription = 2
 
LogicalRepWorkerMyLogicalRepWorker = NULL
 
static LogicalRepCtxStructLogicalRepCtx
 
static const dshash_parameters dsh_params
 
static dsa_arealast_start_times_dsa = NULL
 
static dshash_tablelast_start_times = NULL
 
static bool on_commit_launcher_wakeup = false
 

Macro Definition Documentation

◆ DEFAULT_NAPTIME_PER_CYCLE

#define DEFAULT_NAPTIME_PER_CYCLE   180000L

Definition at line 49 of file launcher.c.

◆ PG_STAT_GET_SUBSCRIPTION_COLS

#define PG_STAT_GET_SUBSCRIPTION_COLS   10

Typedef Documentation

◆ LauncherLastStartTimesEntry

◆ LogicalRepCtxStruct

Function Documentation

◆ acquire_conflict_slot_if_exists()

static bool acquire_conflict_slot_if_exists ( void  )
static

Definition at line 1445 of file launcher.c.

1446{
1448 return false;
1449
1451 return true;
1452}
void ReplicationSlotAcquire(const char *name, bool nowait, bool error_if_invalid)
Definition: slot.c:625
ReplicationSlot * SearchNamedReplicationSlot(const char *name, bool need_lock)
Definition: slot.c:545
#define CONFLICT_DETECTION_SLOT
Definition: slot.h:28

References CONFLICT_DETECTION_SLOT, ReplicationSlotAcquire(), and SearchNamedReplicationSlot().

Referenced by ApplyLauncherMain().

◆ ApplyLauncherForgetWorkerStartTime()

void ApplyLauncherForgetWorkerStartTime ( Oid  subid)

Definition at line 1114 of file launcher.c.

1115{
1117
1118 (void) dshash_delete_key(last_start_times, &subid);
1119}
bool dshash_delete_key(dshash_table *hash_table, const void *key)
Definition: dshash.c:503
static dshash_table * last_start_times
Definition: launcher.c:91
static void logicalrep_launcher_attach_dshmem(void)
Definition: launcher.c:1028

References dshash_delete_key(), last_start_times, and logicalrep_launcher_attach_dshmem().

Referenced by apply_worker_exit(), DisableSubscriptionAndExit(), DropSubscription(), InitializeLogRepWorker(), maybe_reread_subscription(), and ProcessSyncingTablesForApply().

◆ ApplyLauncherGetWorkerStartTime()

static TimestampTz ApplyLauncherGetWorkerStartTime ( Oid  subid)
static

Definition at line 1088 of file launcher.c.

1089{
1091 TimestampTz ret;
1092
1094
1095 entry = dshash_find(last_start_times, &subid, false);
1096 if (entry == NULL)
1097 return 0;
1098
1099 ret = entry->last_start_time;
1101
1102 return ret;
1103}
int64 TimestampTz
Definition: timestamp.h:39
void dshash_release_lock(dshash_table *hash_table, void *entry)
Definition: dshash.c:558
void * dshash_find(dshash_table *hash_table, const void *key, bool exclusive)
Definition: dshash.c:390
TimestampTz last_start_time
Definition: launcher.c:77

References dshash_find(), dshash_release_lock(), LauncherLastStartTimesEntry::last_start_time, last_start_times, and logicalrep_launcher_attach_dshmem().

Referenced by ApplyLauncherMain().

◆ ApplyLauncherMain()

void ApplyLauncherMain ( Datum  main_arg)

Definition at line 1164 of file launcher.c.

1165{
1167 (errmsg_internal("logical replication launcher started")));
1168
1170
1173
1174 /* Establish signal handlers. */
1176 pqsignal(SIGTERM, die);
1178
1179 /*
1180 * Establish connection to nailed catalogs (we only ever access
1181 * pg_subscription).
1182 */
1184
1185 /*
1186 * Acquire the conflict detection slot at startup to ensure it can be
1187 * dropped if no longer needed after a restart.
1188 */
1190
1191 /* Enter main loop */
1192 for (;;)
1193 {
1194 int rc;
1195 List *sublist;
1196 ListCell *lc;
1197 MemoryContext subctx;
1198 MemoryContext oldctx;
1199 long wait_time = DEFAULT_NAPTIME_PER_CYCLE;
1200 bool can_update_xmin = true;
1201 bool retain_dead_tuples = false;
1203
1205
1206 /* Use temporary context to avoid leaking memory across cycles. */
1208 "Logical Replication Launcher sublist",
1210 oldctx = MemoryContextSwitchTo(subctx);
1211
1212 /*
1213 * Start any missing workers for enabled subscriptions.
1214 *
1215 * Also, during the iteration through all subscriptions, we compute
1216 * the minimum XID required to protect deleted tuples for conflict
1217 * detection if one of the subscription enables retain_dead_tuples
1218 * option.
1219 */
1220 sublist = get_subscription_list();
1221 foreach(lc, sublist)
1222 {
1223 Subscription *sub = (Subscription *) lfirst(lc);
1225 TimestampTz last_start;
1227 long elapsed;
1228
1229 if (sub->retaindeadtuples)
1230 {
1231 retain_dead_tuples = true;
1232
1233 /*
1234 * Create a replication slot to retain information necessary
1235 * for conflict detection such as dead tuples, commit
1236 * timestamps, and origins.
1237 *
1238 * The slot is created before starting the apply worker to
1239 * prevent it from unnecessarily maintaining its
1240 * oldest_nonremovable_xid.
1241 *
1242 * The slot is created even for a disabled subscription to
1243 * ensure that conflict-related information is available when
1244 * applying remote changes that occurred before the
1245 * subscription was enabled.
1246 */
1248
1249 if (sub->retentionactive)
1250 {
1251 /*
1252 * Can't advance xmin of the slot unless all the
1253 * subscriptions actively retaining dead tuples are
1254 * enabled. This is required to ensure that we don't
1255 * advance the xmin of CONFLICT_DETECTION_SLOT if one of
1256 * the subscriptions is not enabled. Otherwise, we won't
1257 * be able to detect conflicts reliably for such a
1258 * subscription even though it has set the
1259 * retain_dead_tuples option.
1260 */
1261 can_update_xmin &= sub->enabled;
1262
1263 /*
1264 * Initialize the slot once the subscription activiates
1265 * retention.
1266 */
1269 }
1270 }
1271
1272 if (!sub->enabled)
1273 continue;
1274
1275 LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
1277 false);
1278
1279 if (w != NULL)
1280 {
1281 /*
1282 * Compute the minimum xmin required to protect dead tuples
1283 * required for conflict detection among all running apply
1284 * workers. This computation is performed while holding
1285 * LogicalRepWorkerLock to prevent accessing invalid worker
1286 * data, in scenarios where a worker might exit and reset its
1287 * state concurrently.
1288 */
1289 if (sub->retaindeadtuples &&
1290 sub->retentionactive &&
1291 can_update_xmin)
1293
1294 LWLockRelease(LogicalRepWorkerLock);
1295
1296 /* worker is running already */
1297 continue;
1298 }
1299
1300 LWLockRelease(LogicalRepWorkerLock);
1301
1302 /*
1303 * Can't advance xmin of the slot unless all the workers
1304 * corresponding to subscriptions actively retaining dead tuples
1305 * are running, disabling the further computation of the minimum
1306 * nonremovable xid.
1307 */
1308 if (sub->retaindeadtuples && sub->retentionactive)
1309 can_update_xmin = false;
1310
1311 /*
1312 * If the worker is eligible to start now, launch it. Otherwise,
1313 * adjust wait_time so that we'll wake up as soon as it can be
1314 * started.
1315 *
1316 * Each subscription's apply worker can only be restarted once per
1317 * wal_retrieve_retry_interval, so that errors do not cause us to
1318 * repeatedly restart the worker as fast as possible. In cases
1319 * where a restart is expected (e.g., subscription parameter
1320 * changes), another process should remove the last-start entry
1321 * for the subscription so that the worker can be restarted
1322 * without waiting for wal_retrieve_retry_interval to elapse.
1323 */
1324 last_start = ApplyLauncherGetWorkerStartTime(sub->oid);
1326 if (last_start == 0 ||
1328 {
1331 sub->dbid, sub->oid, sub->name,
1332 sub->owner, InvalidOid,
1334 sub->retaindeadtuples &&
1335 sub->retentionactive))
1336 {
1337 /*
1338 * We get here either if we failed to launch a worker
1339 * (perhaps for resource-exhaustion reasons) or if we
1340 * launched one but it immediately quit. Either way, it
1341 * seems appropriate to try again after
1342 * wal_retrieve_retry_interval.
1343 */
1344 wait_time = Min(wait_time,
1346 }
1347 }
1348 else
1349 {
1350 wait_time = Min(wait_time,
1351 wal_retrieve_retry_interval - elapsed);
1352 }
1353 }
1354
1355 /*
1356 * Drop the CONFLICT_DETECTION_SLOT slot if there is no subscription
1357 * that requires us to retain dead tuples. Otherwise, if required,
1358 * advance the slot's xmin to protect dead tuples required for the
1359 * conflict detection.
1360 *
1361 * Additionally, if all apply workers for subscriptions with
1362 * retain_dead_tuples enabled have requested to stop retention, the
1363 * slot's xmin will be set to InvalidTransactionId allowing the
1364 * removal of dead tuples.
1365 */
1367 {
1368 if (!retain_dead_tuples)
1370 else if (can_update_xmin)
1372 }
1373
1374 /* Switch back to original memory context. */
1375 MemoryContextSwitchTo(oldctx);
1376 /* Clean the temporary memory. */
1377 MemoryContextDelete(subctx);
1378
1379 /* Wait for more work. */
1380 rc = WaitLatch(MyLatch,
1382 wait_time,
1383 WAIT_EVENT_LOGICAL_LAUNCHER_MAIN);
1384
1385 if (rc & WL_LATCH_SET)
1386 {
1389 }
1390
1392 {
1393 ConfigReloadPending = false;
1395 }
1396 }
1397
1398 /* Not reachable */
1399}
long TimestampDifferenceMilliseconds(TimestampTz start_time, TimestampTz stop_time)
Definition: timestamp.c:1757
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1645
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1609
void BackgroundWorkerInitializeConnection(const char *dbname, const char *username, uint32 flags)
Definition: bgworker.c:853
void BackgroundWorkerUnblockSignals(void)
Definition: bgworker.c:927
#define Min(x, y)
Definition: c.h:1007
uint32 TransactionId
Definition: c.h:661
#define DSM_HANDLE_INVALID
Definition: dsm_impl.h:58
int errmsg_internal(const char *fmt,...)
Definition: elog.c:1170
#define DEBUG1
Definition: elog.h:30
#define ereport(elevel,...)
Definition: elog.h:150
int MyProcPid
Definition: globals.c:47
struct Latch * MyLatch
Definition: globals.c:63
void ProcessConfigFile(GucContext context)
Definition: guc-file.l:120
@ PGC_SIGHUP
Definition: guc.h:75
Assert(PointerIsAligned(start, uint64))
volatile sig_atomic_t ConfigReloadPending
Definition: interrupt.c:27
void SignalHandlerForConfigReload(SIGNAL_ARGS)
Definition: interrupt.c:61
void before_shmem_exit(pg_on_exit_callback function, Datum arg)
Definition: ipc.c:337
void ResetLatch(Latch *latch)
Definition: latch.c:374
int WaitLatch(Latch *latch, int wakeEvents, long timeout, uint32 wait_event_info)
Definition: latch.c:172
#define DEFAULT_NAPTIME_PER_CYCLE
Definition: launcher.c:49
bool logicalrep_worker_launch(LogicalRepWorkerType wtype, Oid dbid, Oid subid, const char *subname, Oid userid, Oid relid, dsm_handle subworker_dsm, bool retain_dead_tuples)
Definition: launcher.c:322
static void ApplyLauncherSetWorkerStartTime(Oid subid, TimestampTz start_time)
Definition: launcher.c:1072
LogicalRepWorker * logicalrep_worker_find(LogicalRepWorkerType wtype, Oid subid, Oid relid, bool only_running)
Definition: launcher.c:256
static void update_conflict_slot_xmin(TransactionId new_xmin)
Definition: launcher.c:1459
static void compute_min_nonremovable_xid(LogicalRepWorker *worker, TransactionId *xmin)
Definition: launcher.c:1407
static void logicalrep_launcher_onexit(int code, Datum arg)
Definition: launcher.c:846
void CreateConflictDetectionSlot(void)
Definition: launcher.c:1526
static void init_conflict_slot_xmin(void)
Definition: launcher.c:1495
static TimestampTz ApplyLauncherGetWorkerStartTime(Oid subid)
Definition: launcher.c:1088
static LogicalRepCtxStruct * LogicalRepCtx
Definition: launcher.c:71
static bool acquire_conflict_slot_if_exists(void)
Definition: launcher.c:1445
static List * get_subscription_list(void)
Definition: launcher.c:117
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1174
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1894
@ LW_SHARED
Definition: lwlock.h:113
MemoryContext TopMemoryContext
Definition: mcxt.c:166
void MemoryContextDelete(MemoryContext context)
Definition: mcxt.c:469
#define AllocSetContextCreate
Definition: memutils.h:129
#define ALLOCSET_DEFAULT_SIZES
Definition: memutils.h:160
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:123
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:124
#define lfirst(lc)
Definition: pg_list.h:172
#define die(msg)
#define pqsignal
Definition: port.h:531
uint64_t Datum
Definition: postgres.h:70
#define InvalidOid
Definition: postgres_ext.h:37
void ReplicationSlotDropAcquired(void)
Definition: slot.c:996
ReplicationSlot * MyReplicationSlot
Definition: slot.c:148
Definition: pg_list.h:54
TransactionId xmin
Definition: slot.h:96
ReplicationSlotPersistentData data
Definition: slot.h:192
#define InvalidTransactionId
Definition: transam.h:31
#define TransactionIdIsValid(xid)
Definition: transam.h:41
#define WL_TIMEOUT
Definition: waiteventset.h:37
#define WL_EXIT_ON_PM_DEATH
Definition: waiteventset.h:39
#define WL_LATCH_SET
Definition: waiteventset.h:34
#define SIGHUP
Definition: win32_port.h:158
@ WORKERTYPE_APPLY
int wal_retrieve_retry_interval
Definition: xlog.c:135

References acquire_conflict_slot_if_exists(), ALLOCSET_DEFAULT_SIZES, AllocSetContextCreate, ApplyLauncherGetWorkerStartTime(), ApplyLauncherSetWorkerStartTime(), Assert(), BackgroundWorkerInitializeConnection(), BackgroundWorkerUnblockSignals(), before_shmem_exit(), CHECK_FOR_INTERRUPTS, compute_min_nonremovable_xid(), ConfigReloadPending, CreateConflictDetectionSlot(), ReplicationSlot::data, Subscription::dbid, DEBUG1, DEFAULT_NAPTIME_PER_CYCLE, die, DSM_HANDLE_INVALID, Subscription::enabled, ereport, errmsg_internal(), get_subscription_list(), GetCurrentTimestamp(), init_conflict_slot_xmin(), InvalidOid, InvalidTransactionId, LogicalRepCtxStruct::launcher_pid, lfirst, logicalrep_launcher_onexit(), logicalrep_worker_find(), logicalrep_worker_launch(), LogicalRepCtx, LW_SHARED, LWLockAcquire(), LWLockRelease(), MemoryContextDelete(), MemoryContextSwitchTo(), Min, MyLatch, MyProcPid, MyReplicationSlot, Subscription::name, now(), Subscription::oid, Subscription::owner, PGC_SIGHUP, pqsignal, ProcessConfigFile(), ReplicationSlotDropAcquired(), ResetLatch(), Subscription::retaindeadtuples, Subscription::retentionactive, SIGHUP, SignalHandlerForConfigReload(), TimestampDifferenceMilliseconds(), TopMemoryContext, TransactionIdIsValid, update_conflict_slot_xmin(), WaitLatch(), wal_retrieve_retry_interval, WL_EXIT_ON_PM_DEATH, WL_LATCH_SET, WL_TIMEOUT, WORKERTYPE_APPLY, and ReplicationSlotPersistentData::xmin.

◆ ApplyLauncherRegister()

void ApplyLauncherRegister ( void  )

Definition at line 957 of file launcher.c.

958{
960
961 /*
962 * The logical replication launcher is disabled during binary upgrades, to
963 * prevent logical replication workers from running on the source cluster.
964 * That could cause replication origins to move forward after having been
965 * copied to the target cluster, potentially creating conflicts with the
966 * copied data files.
967 */
969 return;
970
971 memset(&bgw, 0, sizeof(bgw));
975 snprintf(bgw.bgw_library_name, MAXPGPATH, "postgres");
976 snprintf(bgw.bgw_function_name, BGW_MAXLEN, "ApplyLauncherMain");
978 "logical replication launcher");
980 "logical replication launcher");
981 bgw.bgw_restart_time = 5;
982 bgw.bgw_notify_pid = 0;
983 bgw.bgw_main_arg = (Datum) 0;
984
986}
void RegisterBackgroundWorker(BackgroundWorker *worker)
Definition: bgworker.c:940
@ BgWorkerStart_RecoveryFinished
Definition: bgworker.h:81
#define BGWORKER_BACKEND_DATABASE_CONNECTION
Definition: bgworker.h:60
#define BGWORKER_SHMEM_ACCESS
Definition: bgworker.h:53
#define BGW_MAXLEN
Definition: bgworker.h:86
bool IsBinaryUpgrade
Definition: globals.c:121
int max_logical_replication_workers
Definition: launcher.c:52
#define MAXPGPATH
#define snprintf
Definition: port.h:239
char bgw_function_name[BGW_MAXLEN]
Definition: bgworker.h:97
Datum bgw_main_arg
Definition: bgworker.h:98
char bgw_name[BGW_MAXLEN]
Definition: bgworker.h:91
int bgw_restart_time
Definition: bgworker.h:95
char bgw_type[BGW_MAXLEN]
Definition: bgworker.h:92
BgWorkerStartTime bgw_start_time
Definition: bgworker.h:94
pid_t bgw_notify_pid
Definition: bgworker.h:100
char bgw_library_name[MAXPGPATH]
Definition: bgworker.h:96

References BackgroundWorker::bgw_flags, BackgroundWorker::bgw_function_name, BackgroundWorker::bgw_library_name, BackgroundWorker::bgw_main_arg, BGW_MAXLEN, BackgroundWorker::bgw_name, BackgroundWorker::bgw_notify_pid, BackgroundWorker::bgw_restart_time, BackgroundWorker::bgw_start_time, BackgroundWorker::bgw_type, BGWORKER_BACKEND_DATABASE_CONNECTION, BGWORKER_SHMEM_ACCESS, BgWorkerStart_RecoveryFinished, IsBinaryUpgrade, max_logical_replication_workers, MAXPGPATH, RegisterBackgroundWorker(), and snprintf.

Referenced by PostmasterMain().

◆ ApplyLauncherSetWorkerStartTime()

static void ApplyLauncherSetWorkerStartTime ( Oid  subid,
TimestampTz  start_time 
)
static

Definition at line 1072 of file launcher.c.

1073{
1075 bool found;
1076
1078
1079 entry = dshash_find_or_insert(last_start_times, &subid, &found);
1080 entry->last_start_time = start_time;
1082}
void * dshash_find_or_insert(dshash_table *hash_table, const void *key, bool *found)
Definition: dshash.c:433
static time_t start_time
Definition: pg_ctl.c:96

References dshash_find_or_insert(), dshash_release_lock(), LauncherLastStartTimesEntry::last_start_time, last_start_times, logicalrep_launcher_attach_dshmem(), and start_time.

Referenced by ApplyLauncherMain().

◆ ApplyLauncherShmemInit()

void ApplyLauncherShmemInit ( void  )

Definition at line 993 of file launcher.c.

994{
995 bool found;
996
998 ShmemInitStruct("Logical Replication Launcher Data",
1000 &found);
1001
1002 if (!found)
1003 {
1004 int slot;
1005
1007
1010
1011 /* Initialize memory and spin locks for each worker slot. */
1012 for (slot = 0; slot < max_logical_replication_workers; slot++)
1013 {
1014 LogicalRepWorker *worker = &LogicalRepCtx->workers[slot];
1015
1016 memset(worker, 0, sizeof(LogicalRepWorker));
1017 SpinLockInit(&worker->relmutex);
1018 }
1019 }
1020}
#define DSA_HANDLE_INVALID
Definition: dsa.h:139
#define DSHASH_HANDLE_INVALID
Definition: dshash.h:27
Size ApplyLauncherShmemSize(void)
Definition: launcher.c:938
void * ShmemInitStruct(const char *name, Size size, bool *foundPtr)
Definition: shmem.c:387
#define SpinLockInit(lock)
Definition: spin.h:57
dsa_handle last_start_dsa
Definition: launcher.c:64
dshash_table_handle last_start_dsh
Definition: launcher.c:65
LogicalRepWorker workers[FLEXIBLE_ARRAY_MEMBER]
Definition: launcher.c:68

References ApplyLauncherShmemSize(), DSA_HANDLE_INVALID, DSHASH_HANDLE_INVALID, LogicalRepCtxStruct::last_start_dsa, LogicalRepCtxStruct::last_start_dsh, LogicalRepCtx, max_logical_replication_workers, LogicalRepWorker::relmutex, ShmemInitStruct(), SpinLockInit, and LogicalRepCtxStruct::workers.

Referenced by CreateOrAttachShmemStructs().

◆ ApplyLauncherShmemSize()

Size ApplyLauncherShmemSize ( void  )

Definition at line 938 of file launcher.c.

939{
940 Size size;
941
942 /*
943 * Need the fixed struct and the array of LogicalRepWorker.
944 */
945 size = sizeof(LogicalRepCtxStruct);
946 size = MAXALIGN(size);
948 sizeof(LogicalRepWorker)));
949 return size;
950}
#define MAXALIGN(LEN)
Definition: c.h:814
size_t Size
Definition: c.h:614
struct LogicalRepCtxStruct LogicalRepCtxStruct
Size add_size(Size s1, Size s2)
Definition: shmem.c:493
Size mul_size(Size s1, Size s2)
Definition: shmem.c:510

References add_size(), max_logical_replication_workers, MAXALIGN, and mul_size().

Referenced by ApplyLauncherShmemInit(), and CalculateShmemSize().

◆ ApplyLauncherWakeup()

void ApplyLauncherWakeup ( void  )

Definition at line 1154 of file launcher.c.

1155{
1156 if (LogicalRepCtx->launcher_pid != 0)
1158}
#define kill(pid, sig)
Definition: win32_port.h:493
#define SIGUSR1
Definition: win32_port.h:170

References kill, LogicalRepCtxStruct::launcher_pid, LogicalRepCtx, and SIGUSR1.

Referenced by AtEOXact_ApplyLauncher(), logicalrep_worker_onexit(), update_retention_status(), and wait_for_local_flush().

◆ ApplyLauncherWakeupAtCommit()

void ApplyLauncherWakeupAtCommit ( void  )

Definition at line 1144 of file launcher.c.

1145{
1148}
static bool on_commit_launcher_wakeup
Definition: launcher.c:93

References on_commit_launcher_wakeup.

Referenced by AlterSubscription(), AlterSubscriptionOwner_internal(), and CreateSubscription().

◆ AtEOXact_ApplyLauncher()

void AtEOXact_ApplyLauncher ( bool  isCommit)

Definition at line 1125 of file launcher.c.

1126{
1127 if (isCommit)
1128 {
1131 }
1132
1134}
void ApplyLauncherWakeup(void)
Definition: launcher.c:1154

References ApplyLauncherWakeup(), and on_commit_launcher_wakeup.

Referenced by AbortTransaction(), CommitTransaction(), and PrepareTransaction().

◆ compute_min_nonremovable_xid()

static void compute_min_nonremovable_xid ( LogicalRepWorker worker,
TransactionId xmin 
)
static

Definition at line 1407 of file launcher.c.

1408{
1409 TransactionId nonremovable_xid;
1410
1411 Assert(worker != NULL);
1412
1413 /*
1414 * The replication slot for conflict detection must be created before the
1415 * worker starts.
1416 */
1418
1419 SpinLockAcquire(&worker->relmutex);
1420 nonremovable_xid = worker->oldest_nonremovable_xid;
1421 SpinLockRelease(&worker->relmutex);
1422
1423 /*
1424 * Return if the apply worker has stopped retention concurrently.
1425 *
1426 * Although this function is invoked only when retentionactive is true,
1427 * the apply worker might stop retention after the launcher fetches the
1428 * retentionactive flag.
1429 */
1430 if (!TransactionIdIsValid(nonremovable_xid))
1431 return;
1432
1433 if (!TransactionIdIsValid(*xmin) ||
1434 TransactionIdPrecedes(nonremovable_xid, *xmin))
1435 *xmin = nonremovable_xid;
1436}
#define SpinLockRelease(lock)
Definition: spin.h:61
#define SpinLockAcquire(lock)
Definition: spin.h:59
TransactionId oldest_nonremovable_xid
static bool TransactionIdPrecedes(TransactionId id1, TransactionId id2)
Definition: transam.h:263

References Assert(), MyReplicationSlot, LogicalRepWorker::oldest_nonremovable_xid, LogicalRepWorker::relmutex, SpinLockAcquire, SpinLockRelease, TransactionIdIsValid, and TransactionIdPrecedes().

Referenced by ApplyLauncherMain().

◆ CreateConflictDetectionSlot()

void CreateConflictDetectionSlot ( void  )

Definition at line 1526 of file launcher.c.

1527{
1528 /* Exit early, if the replication slot is already created and acquired */
1530 return;
1531
1532 ereport(LOG,
1533 errmsg("creating replication conflict detection slot"));
1534
1536 false, false);
1537
1539}
int errmsg(const char *fmt,...)
Definition: elog.c:1080
#define LOG
Definition: elog.h:31
void ReplicationSlotCreate(const char *name, bool db_specific, ReplicationSlotPersistency persistency, bool two_phase, bool failover, bool synced)
Definition: slot.c:384
@ RS_PERSISTENT
Definition: slot.h:45

References CONFLICT_DETECTION_SLOT, ereport, errmsg(), init_conflict_slot_xmin(), LOG, MyReplicationSlot, ReplicationSlotCreate(), and RS_PERSISTENT.

Referenced by ApplyLauncherMain(), and binary_upgrade_create_conflict_detection_slot().

◆ get_subscription_list()

static List * get_subscription_list ( void  )
static

Definition at line 117 of file launcher.c.

118{
119 List *res = NIL;
120 Relation rel;
121 TableScanDesc scan;
122 HeapTuple tup;
123 MemoryContext resultcxt;
124
125 /* This is the context that we will allocate our output data in */
126 resultcxt = CurrentMemoryContext;
127
128 /*
129 * Start a transaction so we can access pg_subscription.
130 */
132
133 rel = table_open(SubscriptionRelationId, AccessShareLock);
134 scan = table_beginscan_catalog(rel, 0, NULL);
135
137 {
139 Subscription *sub;
140 MemoryContext oldcxt;
141
142 /*
143 * Allocate our results in the caller's context, not the
144 * transaction's. We do this inside the loop, and restore the original
145 * context at the end, so that leaky things like heap_getnext() are
146 * not called in a potentially long-lived context.
147 */
148 oldcxt = MemoryContextSwitchTo(resultcxt);
149
150 sub = (Subscription *) palloc0(sizeof(Subscription));
151 sub->oid = subform->oid;
152 sub->dbid = subform->subdbid;
153 sub->owner = subform->subowner;
154 sub->enabled = subform->subenabled;
155 sub->name = pstrdup(NameStr(subform->subname));
156 sub->retaindeadtuples = subform->subretaindeadtuples;
157 sub->retentionactive = subform->subretentionactive;
158 /* We don't fill fields we are not interested in. */
159
160 res = lappend(res, sub);
161 MemoryContextSwitchTo(oldcxt);
162 }
163
164 table_endscan(scan);
166
168
169 return res;
170}
#define NameStr(name)
Definition: c.h:755
HeapTuple heap_getnext(TableScanDesc sscan, ScanDirection direction)
Definition: heapam.c:1346
#define HeapTupleIsValid(tuple)
Definition: htup.h:78
static void * GETSTRUCT(const HeapTupleData *tuple)
Definition: htup_details.h:728
List * lappend(List *list, void *datum)
Definition: list.c:339
#define AccessShareLock
Definition: lockdefs.h:36
char * pstrdup(const char *in)
Definition: mcxt.c:1759
void * palloc0(Size size)
Definition: mcxt.c:1395
MemoryContext CurrentMemoryContext
Definition: mcxt.c:160
#define NIL
Definition: pg_list.h:68
FormData_pg_subscription * Form_pg_subscription
@ ForwardScanDirection
Definition: sdir.h:28
void table_close(Relation relation, LOCKMODE lockmode)
Definition: table.c:126
Relation table_open(Oid relationId, LOCKMODE lockmode)
Definition: table.c:40
TableScanDesc table_beginscan_catalog(Relation relation, int nkeys, ScanKeyData *key)
Definition: tableam.c:113
static void table_endscan(TableScanDesc scan)
Definition: tableam.h:985
void StartTransactionCommand(void)
Definition: xact.c:3071
void CommitTransactionCommand(void)
Definition: xact.c:3169

References AccessShareLock, CommitTransactionCommand(), CurrentMemoryContext, Subscription::dbid, Subscription::enabled, ForwardScanDirection, GETSTRUCT(), heap_getnext(), HeapTupleIsValid, lappend(), MemoryContextSwitchTo(), Subscription::name, NameStr, NIL, Subscription::oid, Subscription::owner, palloc0(), pstrdup(), Subscription::retaindeadtuples, Subscription::retentionactive, StartTransactionCommand(), table_beginscan_catalog(), table_close(), table_endscan(), and table_open().

Referenced by ApplyLauncherMain().

◆ GetLeaderApplyWorkerPid()

pid_t GetLeaderApplyWorkerPid ( pid_t  pid)

Definition at line 1555 of file launcher.c.

1556{
1557 int leader_pid = InvalidPid;
1558 int i;
1559
1560 LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
1561
1562 for (i = 0; i < max_logical_replication_workers; i++)
1563 {
1565
1566 if (isParallelApplyWorker(w) && w->proc && pid == w->proc->pid)
1567 {
1568 leader_pid = w->leader_pid;
1569 break;
1570 }
1571 }
1572
1573 LWLockRelease(LogicalRepWorkerLock);
1574
1575 return leader_pid;
1576}
int i
Definition: isn.c:77
#define InvalidPid
Definition: miscadmin.h:32
int pid
Definition: proc.h:199
#define isParallelApplyWorker(worker)

References i, InvalidPid, isParallelApplyWorker, LogicalRepWorker::leader_pid, LogicalRepCtx, LW_SHARED, LWLockAcquire(), LWLockRelease(), max_logical_replication_workers, PGPROC::pid, LogicalRepWorker::proc, and LogicalRepCtxStruct::workers.

Referenced by pg_stat_get_activity().

◆ init_conflict_slot_xmin()

static void init_conflict_slot_xmin ( void  )
static

Definition at line 1495 of file launcher.c.

1496{
1497 TransactionId xmin_horizon;
1498
1499 /* Replication slot must exist but shouldn't be initialized. */
1502
1503 LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
1504
1505 xmin_horizon = GetOldestSafeDecodingTransactionId(false);
1506
1508 MyReplicationSlot->effective_xmin = xmin_horizon;
1509 MyReplicationSlot->data.xmin = xmin_horizon;
1511
1513
1514 LWLockRelease(ProcArrayLock);
1515
1516 /* Write this slot to disk */
1519}
@ LW_EXCLUSIVE
Definition: lwlock.h:112
TransactionId GetOldestSafeDecodingTransactionId(bool catalogOnly)
Definition: procarray.c:2907
void ReplicationSlotMarkDirty(void)
Definition: slot.c:1138
void ReplicationSlotsComputeRequiredXmin(bool already_locked)
Definition: slot.c:1177
void ReplicationSlotSave(void)
Definition: slot.c:1120
slock_t mutex
Definition: slot.h:165
TransactionId effective_xmin
Definition: slot.h:188

References Assert(), ReplicationSlot::data, ReplicationSlot::effective_xmin, GetOldestSafeDecodingTransactionId(), LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), ReplicationSlot::mutex, MyReplicationSlot, ReplicationSlotMarkDirty(), ReplicationSlotSave(), ReplicationSlotsComputeRequiredXmin(), SpinLockAcquire, SpinLockRelease, TransactionIdIsValid, and ReplicationSlotPersistentData::xmin.

Referenced by ApplyLauncherMain(), and CreateConflictDetectionSlot().

◆ IsLogicalLauncher()

bool IsLogicalLauncher ( void  )

◆ logicalrep_launcher_attach_dshmem()

static void logicalrep_launcher_attach_dshmem ( void  )
static

Definition at line 1028 of file launcher.c.

1029{
1030 MemoryContext oldcontext;
1031
1032 /* Quick exit if we already did this. */
1034 last_start_times != NULL)
1035 return;
1036
1037 /* Otherwise, use a lock to ensure only one process creates the table. */
1038 LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
1039
1040 /* Be sure any local memory allocated by DSA routines is persistent. */
1042
1044 {
1045 /* Initialize dynamic shared hash table for last-start times. */
1046 last_start_times_dsa = dsa_create(LWTRANCHE_LAUNCHER_DSA);
1050
1051 /* Store handles in shared memory for other backends to use. */
1054 }
1055 else if (!last_start_times)
1056 {
1057 /* Attach to existing dynamic shared hash table. */
1062 }
1063
1064 MemoryContextSwitchTo(oldcontext);
1065 LWLockRelease(LogicalRepWorkerLock);
1066}
dsa_area * dsa_attach(dsa_handle handle)
Definition: dsa.c:510
void dsa_pin_mapping(dsa_area *area)
Definition: dsa.c:650
dsa_handle dsa_get_handle(dsa_area *area)
Definition: dsa.c:498
void dsa_pin(dsa_area *area)
Definition: dsa.c:990
#define dsa_create(tranche_id)
Definition: dsa.h:117
dshash_table_handle dshash_get_hash_table_handle(dshash_table *hash_table)
Definition: dshash.c:367
dshash_table * dshash_attach(dsa_area *area, const dshash_parameters *params, dshash_table_handle handle, void *arg)
Definition: dshash.c:270
dshash_table * dshash_create(dsa_area *area, const dshash_parameters *params, void *arg)
Definition: dshash.c:206
static dsa_area * last_start_times_dsa
Definition: launcher.c:90
static const dshash_parameters dsh_params
Definition: launcher.c:81

References dsa_attach(), dsa_create, dsa_get_handle(), dsa_pin(), dsa_pin_mapping(), dsh_params, dshash_attach(), dshash_create(), dshash_get_hash_table_handle(), DSHASH_HANDLE_INVALID, LogicalRepCtxStruct::last_start_dsa, LogicalRepCtxStruct::last_start_dsh, last_start_times, last_start_times_dsa, LogicalRepCtx, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), MemoryContextSwitchTo(), and TopMemoryContext.

Referenced by ApplyLauncherForgetWorkerStartTime(), ApplyLauncherGetWorkerStartTime(), and ApplyLauncherSetWorkerStartTime().

◆ logicalrep_launcher_onexit()

static void logicalrep_launcher_onexit ( int  code,
Datum  arg 
)
static

Definition at line 846 of file launcher.c.

847{
849}

References LogicalRepCtxStruct::launcher_pid, and LogicalRepCtx.

Referenced by ApplyLauncherMain().

◆ logicalrep_pa_worker_count()

static int logicalrep_pa_worker_count ( Oid  subid)
static

Definition at line 911 of file launcher.c.

912{
913 int i;
914 int res = 0;
915
916 Assert(LWLockHeldByMe(LogicalRepWorkerLock));
917
918 /*
919 * Scan all attached parallel apply workers, only counting those which
920 * have the given subscription id.
921 */
922 for (i = 0; i < max_logical_replication_workers; i++)
923 {
925
926 if (isParallelApplyWorker(w) && w->subid == subid)
927 res++;
928 }
929
930 return res;
931}
bool LWLockHeldByMe(LWLock *lock)
Definition: lwlock.c:1977

References Assert(), i, isParallelApplyWorker, LogicalRepCtx, LWLockHeldByMe(), max_logical_replication_workers, LogicalRepWorker::subid, and LogicalRepCtxStruct::workers.

Referenced by logicalrep_worker_launch().

◆ logicalrep_pa_worker_stop()

void logicalrep_pa_worker_stop ( ParallelApplyWorkerInfo winfo)

Definition at line 666 of file launcher.c.

667{
668 int slot_no;
669 uint16 generation;
670 LogicalRepWorker *worker;
671
672 SpinLockAcquire(&winfo->shared->mutex);
673 generation = winfo->shared->logicalrep_worker_generation;
674 slot_no = winfo->shared->logicalrep_worker_slot_no;
675 SpinLockRelease(&winfo->shared->mutex);
676
677 Assert(slot_no >= 0 && slot_no < max_logical_replication_workers);
678
679 /*
680 * Detach from the error_mq_handle for the parallel apply worker before
681 * stopping it. This prevents the leader apply worker from trying to
682 * receive the message from the error queue that might already be detached
683 * by the parallel apply worker.
684 */
685 if (winfo->error_mq_handle)
686 {
688 winfo->error_mq_handle = NULL;
689 }
690
691 LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
692
693 worker = &LogicalRepCtx->workers[slot_no];
695
696 /*
697 * Only stop the worker if the generation matches and the worker is alive.
698 */
699 if (worker->generation == generation && worker->proc)
701
702 LWLockRelease(LogicalRepWorkerLock);
703}
uint16_t uint16
Definition: c.h:541
static void logicalrep_worker_stop_internal(LogicalRepWorker *worker, int signo)
Definition: launcher.c:556
void shm_mq_detach(shm_mq_handle *mqh)
Definition: shm_mq.c:843
shm_mq_handle * error_mq_handle
ParallelApplyWorkerShared * shared
#define SIGUSR2
Definition: win32_port.h:171

References Assert(), ParallelApplyWorkerInfo::error_mq_handle, LogicalRepWorker::generation, isParallelApplyWorker, ParallelApplyWorkerShared::logicalrep_worker_generation, ParallelApplyWorkerShared::logicalrep_worker_slot_no, logicalrep_worker_stop_internal(), LogicalRepCtx, LW_SHARED, LWLockAcquire(), LWLockRelease(), max_logical_replication_workers, ParallelApplyWorkerShared::mutex, LogicalRepWorker::proc, ParallelApplyWorkerInfo::shared, shm_mq_detach(), SIGUSR2, SpinLockAcquire, SpinLockRelease, and LogicalRepCtxStruct::workers.

Referenced by pa_free_worker().

◆ logicalrep_sync_worker_count()

int logicalrep_sync_worker_count ( Oid  subid)

Definition at line 887 of file launcher.c.

888{
889 int i;
890 int res = 0;
891
892 Assert(LWLockHeldByMe(LogicalRepWorkerLock));
893
894 /* Search for attached worker for a given subscription id. */
895 for (i = 0; i < max_logical_replication_workers; i++)
896 {
898
899 if (isTablesyncWorker(w) && w->subid == subid)
900 res++;
901 }
902
903 return res;
904}
#define isTablesyncWorker(worker)

References Assert(), i, isTablesyncWorker, LogicalRepCtx, LWLockHeldByMe(), max_logical_replication_workers, LogicalRepWorker::subid, and LogicalRepCtxStruct::workers.

Referenced by logicalrep_worker_launch(), and ProcessSyncingTablesForApply().

◆ logicalrep_worker_attach()

void logicalrep_worker_attach ( int  slot)

Definition at line 744 of file launcher.c.

745{
746 /* Block concurrent access. */
747 LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
748
749 Assert(slot >= 0 && slot < max_logical_replication_workers);
751
753 {
754 LWLockRelease(LogicalRepWorkerLock);
756 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
757 errmsg("logical replication worker slot %d is empty, cannot attach",
758 slot)));
759 }
760
762 {
763 LWLockRelease(LogicalRepWorkerLock);
765 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
766 errmsg("logical replication worker slot %d is already used by "
767 "another worker, cannot attach", slot)));
768 }
769
772
773 LWLockRelease(LogicalRepWorkerLock);
774}
int errcode(int sqlerrcode)
Definition: elog.c:863
#define ERROR
Definition: elog.h:39
LogicalRepWorker * MyLogicalRepWorker
Definition: launcher.c:56
static void logicalrep_worker_onexit(int code, Datum arg)
Definition: launcher.c:857
PGPROC * MyProc
Definition: proc.c:66

References Assert(), before_shmem_exit(), ereport, errcode(), errmsg(), ERROR, LogicalRepWorker::in_use, logicalrep_worker_onexit(), LogicalRepCtx, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), max_logical_replication_workers, MyLogicalRepWorker, MyProc, LogicalRepWorker::proc, and LogicalRepCtxStruct::workers.

Referenced by ParallelApplyWorkerMain(), and SetupApplyOrSyncWorker().

◆ logicalrep_worker_cleanup()

static void logicalrep_worker_cleanup ( LogicalRepWorker worker)
static

◆ logicalrep_worker_detach()

static void logicalrep_worker_detach ( void  )
static

Definition at line 781 of file launcher.c.

782{
783 /* Stop the parallel apply workers. */
785 {
786 List *workers;
787 ListCell *lc;
788
789 /*
790 * Detach from the error_mq_handle for all parallel apply workers
791 * before terminating them. This prevents the leader apply worker from
792 * receiving the worker termination message and sending it to logs
793 * when the same is already done by the parallel worker.
794 */
796
797 LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
798
799 workers = logicalrep_workers_find(MyLogicalRepWorker->subid, true, false);
800 foreach(lc, workers)
801 {
803
806 }
807
808 LWLockRelease(LogicalRepWorkerLock);
809
810 list_free(workers);
811 }
812
813 /* Block concurrent access. */
814 LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
815
817
818 LWLockRelease(LogicalRepWorkerLock);
819}
void pa_detach_all_error_mq(void)
List * logicalrep_workers_find(Oid subid, bool only_running, bool acquire_lock)
Definition: launcher.c:291
static void logicalrep_worker_cleanup(LogicalRepWorker *worker)
Definition: launcher.c:825
void list_free(List *list)
Definition: list.c:1546
static bool am_leader_apply_worker(void)

References am_leader_apply_worker(), isParallelApplyWorker, lfirst, list_free(), logicalrep_worker_cleanup(), logicalrep_worker_stop_internal(), logicalrep_workers_find(), LW_EXCLUSIVE, LW_SHARED, LWLockAcquire(), LWLockRelease(), MyLogicalRepWorker, pa_detach_all_error_mq(), and LogicalRepWorker::subid.

Referenced by logicalrep_worker_onexit().

◆ logicalrep_worker_find()

LogicalRepWorker * logicalrep_worker_find ( LogicalRepWorkerType  wtype,
Oid  subid,
Oid  relid,
bool  only_running 
)

Definition at line 256 of file launcher.c.

258{
259 int i;
260 LogicalRepWorker *res = NULL;
261
262 /* relid must be valid only for table sync workers */
263 Assert((wtype == WORKERTYPE_TABLESYNC) == OidIsValid(relid));
264 Assert(LWLockHeldByMe(LogicalRepWorkerLock));
265
266 /* Search for an attached worker that matches the specified criteria. */
267 for (i = 0; i < max_logical_replication_workers; i++)
268 {
270
271 /* Skip parallel apply workers. */
273 continue;
274
275 if (w->in_use && w->subid == subid && w->relid == relid &&
276 w->type == wtype && (!only_running || w->proc))
277 {
278 res = w;
279 break;
280 }
281 }
282
283 return res;
284}
#define OidIsValid(objectId)
Definition: c.h:778
@ WORKERTYPE_TABLESYNC

References Assert(), i, LogicalRepWorker::in_use, isParallelApplyWorker, LogicalRepCtx, LWLockHeldByMe(), max_logical_replication_workers, OidIsValid, LogicalRepWorker::proc, LogicalRepWorker::relid, LogicalRepWorker::subid, LogicalRepWorker::type, LogicalRepCtxStruct::workers, and WORKERTYPE_TABLESYNC.

Referenced by ApplyLauncherMain(), FindDeletedTupleInLocalRel(), logicalrep_worker_stop(), logicalrep_worker_wakeup(), ProcessSyncingTablesForApply(), wait_for_table_state_change(), and wait_for_worker_state_change().

◆ logicalrep_worker_launch()

bool logicalrep_worker_launch ( LogicalRepWorkerType  wtype,
Oid  dbid,
Oid  subid,
const char *  subname,
Oid  userid,
Oid  relid,
dsm_handle  subworker_dsm,
bool  retain_dead_tuples 
)

Definition at line 322 of file launcher.c.

326{
328 BackgroundWorkerHandle *bgw_handle;
329 uint16 generation;
330 int i;
331 int slot = 0;
332 LogicalRepWorker *worker = NULL;
333 int nsyncworkers;
334 int nparallelapplyworkers;
336 bool is_tablesync_worker = (wtype == WORKERTYPE_TABLESYNC);
337 bool is_parallel_apply_worker = (wtype == WORKERTYPE_PARALLEL_APPLY);
338
339 /*----------
340 * Sanity checks:
341 * - must be valid worker type
342 * - tablesync workers are only ones to have relid
343 * - parallel apply worker is the only kind of subworker
344 * - The replication slot used in conflict detection is created when
345 * retain_dead_tuples is enabled
346 */
347 Assert(wtype != WORKERTYPE_UNKNOWN);
348 Assert(is_tablesync_worker == OidIsValid(relid));
349 Assert(is_parallel_apply_worker == (subworker_dsm != DSM_HANDLE_INVALID));
350 Assert(!retain_dead_tuples || MyReplicationSlot);
351
353 (errmsg_internal("starting logical replication worker for subscription \"%s\"",
354 subname)));
355
356 /* Report this after the initial starting message for consistency. */
359 (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
360 errmsg("cannot start logical replication workers when \"max_active_replication_origins\" is 0")));
361
362 /*
363 * We need to do the modification of the shared memory under lock so that
364 * we have consistent view.
365 */
366 LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
367
368retry:
369 /* Find unused worker slot. */
370 for (i = 0; i < max_logical_replication_workers; i++)
371 {
373
374 if (!w->in_use)
375 {
376 worker = w;
377 slot = i;
378 break;
379 }
380 }
381
382 nsyncworkers = logicalrep_sync_worker_count(subid);
383
385
386 /*
387 * If we didn't find a free slot, try to do garbage collection. The
388 * reason we do this is because if some worker failed to start up and its
389 * parent has crashed while waiting, the in_use state was never cleared.
390 */
391 if (worker == NULL || nsyncworkers >= max_sync_workers_per_subscription)
392 {
393 bool did_cleanup = false;
394
395 for (i = 0; i < max_logical_replication_workers; i++)
396 {
398
399 /*
400 * If the worker was marked in use but didn't manage to attach in
401 * time, clean it up.
402 */
403 if (w->in_use && !w->proc &&
406 {
408 "logical replication worker for subscription %u took too long to start; canceled",
409 w->subid);
410
412 did_cleanup = true;
413 }
414 }
415
416 if (did_cleanup)
417 goto retry;
418 }
419
420 /*
421 * We don't allow to invoke more sync workers once we have reached the
422 * sync worker limit per subscription. So, just return silently as we
423 * might get here because of an otherwise harmless race condition.
424 */
425 if (is_tablesync_worker && nsyncworkers >= max_sync_workers_per_subscription)
426 {
427 LWLockRelease(LogicalRepWorkerLock);
428 return false;
429 }
430
431 nparallelapplyworkers = logicalrep_pa_worker_count(subid);
432
433 /*
434 * Return false if the number of parallel apply workers reached the limit
435 * per subscription.
436 */
437 if (is_parallel_apply_worker &&
438 nparallelapplyworkers >= max_parallel_apply_workers_per_subscription)
439 {
440 LWLockRelease(LogicalRepWorkerLock);
441 return false;
442 }
443
444 /*
445 * However if there are no more free worker slots, inform user about it
446 * before exiting.
447 */
448 if (worker == NULL)
449 {
450 LWLockRelease(LogicalRepWorkerLock);
452 (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
453 errmsg("out of logical replication worker slots"),
454 errhint("You might need to increase \"%s\".", "max_logical_replication_workers")));
455 return false;
456 }
457
458 /* Prepare the worker slot. */
459 worker->type = wtype;
460 worker->launch_time = now;
461 worker->in_use = true;
462 worker->generation++;
463 worker->proc = NULL;
464 worker->dbid = dbid;
465 worker->userid = userid;
466 worker->subid = subid;
467 worker->relid = relid;
468 worker->relstate = SUBREL_STATE_UNKNOWN;
470 worker->stream_fileset = NULL;
471 worker->leader_pid = is_parallel_apply_worker ? MyProcPid : InvalidPid;
472 worker->parallel_apply = is_parallel_apply_worker;
473 worker->oldest_nonremovable_xid = retain_dead_tuples
476 worker->last_lsn = InvalidXLogRecPtr;
481
482 /* Before releasing lock, remember generation for future identification. */
483 generation = worker->generation;
484
485 LWLockRelease(LogicalRepWorkerLock);
486
487 /* Register the new dynamic worker. */
488 memset(&bgw, 0, sizeof(bgw));
492 snprintf(bgw.bgw_library_name, MAXPGPATH, "postgres");
493
494 switch (worker->type)
495 {
496 case WORKERTYPE_APPLY:
497 snprintf(bgw.bgw_function_name, BGW_MAXLEN, "ApplyWorkerMain");
499 "logical replication apply worker for subscription %u",
500 subid);
501 snprintf(bgw.bgw_type, BGW_MAXLEN, "logical replication apply worker");
502 break;
503
505 snprintf(bgw.bgw_function_name, BGW_MAXLEN, "ParallelApplyWorkerMain");
507 "logical replication parallel apply worker for subscription %u",
508 subid);
509 snprintf(bgw.bgw_type, BGW_MAXLEN, "logical replication parallel worker");
510
511 memcpy(bgw.bgw_extra, &subworker_dsm, sizeof(dsm_handle));
512 break;
513
515 snprintf(bgw.bgw_function_name, BGW_MAXLEN, "TablesyncWorkerMain");
517 "logical replication tablesync worker for subscription %u sync %u",
518 subid,
519 relid);
520 snprintf(bgw.bgw_type, BGW_MAXLEN, "logical replication tablesync worker");
521 break;
522
524 /* Should never happen. */
525 elog(ERROR, "unknown worker type");
526 }
527
530 bgw.bgw_main_arg = Int32GetDatum(slot);
531
532 if (!RegisterDynamicBackgroundWorker(&bgw, &bgw_handle))
533 {
534 /* Failed to start worker, so clean up the worker slot. */
535 LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
536 Assert(generation == worker->generation);
538 LWLockRelease(LogicalRepWorkerLock);
539
541 (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
542 errmsg("out of background worker slots"),
543 errhint("You might need to increase \"%s\".", "max_worker_processes")));
544 return false;
545 }
546
547 /* Now wait until it attaches. */
548 return WaitForReplicationWorkerAttach(worker, generation, bgw_handle);
549}
bool TimestampDifferenceExceeds(TimestampTz start_time, TimestampTz stop_time, int msec)
Definition: timestamp.c:1781
bool RegisterDynamicBackgroundWorker(BackgroundWorker *worker, BackgroundWorkerHandle **handle)
Definition: bgworker.c:1046
#define BGW_NEVER_RESTART
Definition: bgworker.h:85
#define TIMESTAMP_NOBEGIN(j)
Definition: timestamp.h:159
uint32 dsm_handle
Definition: dsm_impl.h:55
int errhint(const char *fmt,...)
Definition: elog.c:1330
#define WARNING
Definition: elog.h:36
#define elog(elevel,...)
Definition: elog.h:226
static int logicalrep_pa_worker_count(Oid subid)
Definition: launcher.c:911
int max_sync_workers_per_subscription
Definition: launcher.c:53
static bool WaitForReplicationWorkerAttach(LogicalRepWorker *worker, uint16 generation, BackgroundWorkerHandle *handle)
Definition: launcher.c:181
int logicalrep_sync_worker_count(Oid subid)
Definition: launcher.c:887
int max_parallel_apply_workers_per_subscription
Definition: launcher.c:54
int max_active_replication_origins
Definition: origin.c:104
NameData subname
static Datum Int32GetDatum(int32 X)
Definition: postgres.h:222
char bgw_extra[BGW_EXTRALEN]
Definition: bgworker.h:99
XLogRecPtr relstate_lsn
TimestampTz last_recv_time
TimestampTz launch_time
TimestampTz reply_time
FileSet * stream_fileset
XLogRecPtr reply_lsn
TimestampTz last_send_time
int wal_receiver_timeout
Definition: walreceiver.c:89
@ WORKERTYPE_PARALLEL_APPLY
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28

References Assert(), BackgroundWorker::bgw_extra, BackgroundWorker::bgw_flags, BackgroundWorker::bgw_function_name, BackgroundWorker::bgw_library_name, BackgroundWorker::bgw_main_arg, BGW_MAXLEN, BackgroundWorker::bgw_name, BGW_NEVER_RESTART, BackgroundWorker::bgw_notify_pid, BackgroundWorker::bgw_restart_time, BackgroundWorker::bgw_start_time, BackgroundWorker::bgw_type, BGWORKER_BACKEND_DATABASE_CONNECTION, BGWORKER_SHMEM_ACCESS, BgWorkerStart_RecoveryFinished, ReplicationSlot::data, LogicalRepWorker::dbid, DEBUG1, DSM_HANDLE_INVALID, elog, ereport, errcode(), errhint(), errmsg(), errmsg_internal(), ERROR, LogicalRepWorker::generation, GetCurrentTimestamp(), i, LogicalRepWorker::in_use, Int32GetDatum(), InvalidPid, InvalidTransactionId, InvalidXLogRecPtr, LogicalRepWorker::last_lsn, LogicalRepWorker::last_recv_time, LogicalRepWorker::last_send_time, LogicalRepWorker::launch_time, LogicalRepWorker::leader_pid, logicalrep_pa_worker_count(), logicalrep_sync_worker_count(), logicalrep_worker_cleanup(), LogicalRepCtx, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), max_active_replication_origins, max_logical_replication_workers, max_parallel_apply_workers_per_subscription, max_sync_workers_per_subscription, MAXPGPATH, MyProcPid, MyReplicationSlot, now(), OidIsValid, LogicalRepWorker::oldest_nonremovable_xid, LogicalRepWorker::parallel_apply, LogicalRepWorker::proc, RegisterDynamicBackgroundWorker(), LogicalRepWorker::relid, LogicalRepWorker::relstate, LogicalRepWorker::relstate_lsn, LogicalRepWorker::reply_lsn, LogicalRepWorker::reply_time, snprintf, LogicalRepWorker::stream_fileset, LogicalRepWorker::subid, subname, TIMESTAMP_NOBEGIN, TimestampDifferenceExceeds(), LogicalRepWorker::type, LogicalRepWorker::userid, WaitForReplicationWorkerAttach(), wal_receiver_timeout, WARNING, LogicalRepCtxStruct::workers, WORKERTYPE_APPLY, WORKERTYPE_PARALLEL_APPLY, WORKERTYPE_TABLESYNC, WORKERTYPE_UNKNOWN, and ReplicationSlotPersistentData::xmin.

Referenced by ApplyLauncherMain(), pa_launch_parallel_worker(), and ProcessSyncingTablesForApply().

◆ logicalrep_worker_onexit()

static void logicalrep_worker_onexit ( int  code,
Datum  arg 
)
static

Definition at line 857 of file launcher.c.

858{
859 /* Disconnect gracefully from the remote side. */
862
864
865 /* Cleanup fileset used for streaming transactions. */
868
869 /*
870 * Session level locks may be acquired outside of a transaction in
871 * parallel apply mode and will not be released when the worker
872 * terminates, so manually release all locks before the worker exits.
873 *
874 * The locks will be acquired once the worker is initialized.
875 */
878
880}
bool InitializingApplyWorker
Definition: worker.c:499
WalReceiverConn * LogRepWorkerWalRcvConn
Definition: worker.c:477
void FileSetDeleteAll(FileSet *fileset)
Definition: fileset.c:150
static void logicalrep_worker_detach(void)
Definition: launcher.c:781
void LockReleaseAll(LOCKMETHODID lockmethodid, bool allLocks)
Definition: lock.c:2307
#define DEFAULT_LOCKMETHOD
Definition: lock.h:127
#define walrcv_disconnect(conn)
Definition: walreceiver.h:467

References ApplyLauncherWakeup(), DEFAULT_LOCKMETHOD, FileSetDeleteAll(), InitializingApplyWorker, LockReleaseAll(), logicalrep_worker_detach(), LogRepWorkerWalRcvConn, MyLogicalRepWorker, LogicalRepWorker::stream_fileset, and walrcv_disconnect.

Referenced by logicalrep_worker_attach().

◆ logicalrep_worker_stop()

void logicalrep_worker_stop ( LogicalRepWorkerType  wtype,
Oid  subid,
Oid  relid 
)

Definition at line 639 of file launcher.c.

640{
641 LogicalRepWorker *worker;
642
643 /* relid must be valid only for table sync workers */
644 Assert((wtype == WORKERTYPE_TABLESYNC) == OidIsValid(relid));
645
646 LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
647
648 worker = logicalrep_worker_find(wtype, subid, relid, false);
649
650 if (worker)
651 {
653 logicalrep_worker_stop_internal(worker, SIGTERM);
654 }
655
656 LWLockRelease(LogicalRepWorkerLock);
657}

References Assert(), isParallelApplyWorker, logicalrep_worker_find(), logicalrep_worker_stop_internal(), LW_SHARED, LWLockAcquire(), LWLockRelease(), OidIsValid, and WORKERTYPE_TABLESYNC.

Referenced by AlterSubscription_refresh(), and DropSubscription().

◆ logicalrep_worker_stop_internal()

static void logicalrep_worker_stop_internal ( LogicalRepWorker worker,
int  signo 
)
static

Definition at line 556 of file launcher.c.

557{
558 uint16 generation;
559
560 Assert(LWLockHeldByMeInMode(LogicalRepWorkerLock, LW_SHARED));
561
562 /*
563 * Remember which generation was our worker so we can check if what we see
564 * is still the same one.
565 */
566 generation = worker->generation;
567
568 /*
569 * If we found a worker but it does not have proc set then it is still
570 * starting up; wait for it to finish starting and then kill it.
571 */
572 while (worker->in_use && !worker->proc)
573 {
574 int rc;
575
576 LWLockRelease(LogicalRepWorkerLock);
577
578 /* Wait a bit --- we don't expect to have to wait long. */
579 rc = WaitLatch(MyLatch,
581 10L, WAIT_EVENT_BGWORKER_STARTUP);
582
583 if (rc & WL_LATCH_SET)
584 {
587 }
588
589 /* Recheck worker status. */
590 LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
591
592 /*
593 * Check whether the worker slot is no longer used, which would mean
594 * that the worker has exited, or whether the worker generation is
595 * different, meaning that a different worker has taken the slot.
596 */
597 if (!worker->in_use || worker->generation != generation)
598 return;
599
600 /* Worker has assigned proc, so it has started. */
601 if (worker->proc)
602 break;
603 }
604
605 /* Now terminate the worker ... */
606 kill(worker->proc->pid, signo);
607
608 /* ... and wait for it to die. */
609 for (;;)
610 {
611 int rc;
612
613 /* is it gone? */
614 if (!worker->proc || worker->generation != generation)
615 break;
616
617 LWLockRelease(LogicalRepWorkerLock);
618
619 /* Wait a bit --- we don't expect to have to wait long. */
620 rc = WaitLatch(MyLatch,
622 10L, WAIT_EVENT_BGWORKER_SHUTDOWN);
623
624 if (rc & WL_LATCH_SET)
625 {
628 }
629
630 LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
631 }
632}

References Assert(), CHECK_FOR_INTERRUPTS, LogicalRepWorker::generation, LogicalRepWorker::in_use, kill, LW_SHARED, LWLockAcquire(), LWLockHeldByMeInMode(), LWLockRelease(), MyLatch, PGPROC::pid, LogicalRepWorker::proc, ResetLatch(), WaitLatch(), WL_EXIT_ON_PM_DEATH, WL_LATCH_SET, and WL_TIMEOUT.

Referenced by logicalrep_pa_worker_stop(), logicalrep_worker_detach(), and logicalrep_worker_stop().

◆ logicalrep_worker_wakeup()

void logicalrep_worker_wakeup ( LogicalRepWorkerType  wtype,
Oid  subid,
Oid  relid 
)

Definition at line 710 of file launcher.c.

711{
712 LogicalRepWorker *worker;
713
714 /* relid must be valid only for table sync workers */
715 Assert((wtype == WORKERTYPE_TABLESYNC) == OidIsValid(relid));
716
717 LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
718
719 worker = logicalrep_worker_find(wtype, subid, relid, true);
720
721 if (worker)
723
724 LWLockRelease(LogicalRepWorkerLock);
725}
void logicalrep_worker_wakeup_ptr(LogicalRepWorker *worker)
Definition: launcher.c:733

References Assert(), logicalrep_worker_find(), logicalrep_worker_wakeup_ptr(), LW_SHARED, LWLockAcquire(), LWLockRelease(), OidIsValid, and WORKERTYPE_TABLESYNC.

Referenced by apply_handle_stream_start(), and FinishSyncWorker().

◆ logicalrep_worker_wakeup_ptr()

void logicalrep_worker_wakeup_ptr ( LogicalRepWorker worker)

Definition at line 733 of file launcher.c.

734{
735 Assert(LWLockHeldByMe(LogicalRepWorkerLock));
736
737 SetLatch(&worker->proc->procLatch);
738}
void SetLatch(Latch *latch)
Definition: latch.c:290
Latch procLatch
Definition: proc.h:186

References Assert(), LWLockHeldByMe(), LogicalRepWorker::proc, PGPROC::procLatch, and SetLatch().

Referenced by AtEOXact_LogicalRepWorkers(), logicalrep_worker_wakeup(), ProcessSyncingTablesForApply(), and wait_for_worker_state_change().

◆ logicalrep_workers_find()

List * logicalrep_workers_find ( Oid  subid,
bool  only_running,
bool  acquire_lock 
)

Definition at line 291 of file launcher.c.

292{
293 int i;
294 List *res = NIL;
295
296 if (acquire_lock)
297 LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
298
299 Assert(LWLockHeldByMe(LogicalRepWorkerLock));
300
301 /* Search for attached worker for a given subscription id. */
302 for (i = 0; i < max_logical_replication_workers; i++)
303 {
305
306 if (w->in_use && w->subid == subid && (!only_running || w->proc))
307 res = lappend(res, w);
308 }
309
310 if (acquire_lock)
311 LWLockRelease(LogicalRepWorkerLock);
312
313 return res;
314}

References Assert(), i, LogicalRepWorker::in_use, lappend(), LogicalRepCtx, LW_SHARED, LWLockAcquire(), LWLockHeldByMe(), LWLockRelease(), max_logical_replication_workers, NIL, LogicalRepWorker::proc, LogicalRepWorker::subid, and LogicalRepCtxStruct::workers.

Referenced by AlterSubscription(), AtEOXact_LogicalRepWorkers(), DropSubscription(), and logicalrep_worker_detach().

◆ pg_stat_get_subscription()

Datum pg_stat_get_subscription ( PG_FUNCTION_ARGS  )

Definition at line 1582 of file launcher.c.

1583{
1584#define PG_STAT_GET_SUBSCRIPTION_COLS 10
1585 Oid subid = PG_ARGISNULL(0) ? InvalidOid : PG_GETARG_OID(0);
1586 int i;
1587 ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
1588
1589 InitMaterializedSRF(fcinfo, 0);
1590
1591 /* Make sure we get consistent view of the workers. */
1592 LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
1593
1594 for (i = 0; i < max_logical_replication_workers; i++)
1595 {
1596 /* for each row */
1598 bool nulls[PG_STAT_GET_SUBSCRIPTION_COLS] = {0};
1599 int worker_pid;
1600 LogicalRepWorker worker;
1601
1602 memcpy(&worker, &LogicalRepCtx->workers[i],
1603 sizeof(LogicalRepWorker));
1604 if (!worker.proc || !IsBackendPid(worker.proc->pid))
1605 continue;
1606
1607 if (OidIsValid(subid) && worker.subid != subid)
1608 continue;
1609
1610 worker_pid = worker.proc->pid;
1611
1612 values[0] = ObjectIdGetDatum(worker.subid);
1613 if (isTablesyncWorker(&worker))
1614 values[1] = ObjectIdGetDatum(worker.relid);
1615 else
1616 nulls[1] = true;
1617 values[2] = Int32GetDatum(worker_pid);
1618
1619 if (isParallelApplyWorker(&worker))
1620 values[3] = Int32GetDatum(worker.leader_pid);
1621 else
1622 nulls[3] = true;
1623
1624 if (XLogRecPtrIsInvalid(worker.last_lsn))
1625 nulls[4] = true;
1626 else
1627 values[4] = LSNGetDatum(worker.last_lsn);
1628 if (worker.last_send_time == 0)
1629 nulls[5] = true;
1630 else
1632 if (worker.last_recv_time == 0)
1633 nulls[6] = true;
1634 else
1636 if (XLogRecPtrIsInvalid(worker.reply_lsn))
1637 nulls[7] = true;
1638 else
1639 values[7] = LSNGetDatum(worker.reply_lsn);
1640 if (worker.reply_time == 0)
1641 nulls[8] = true;
1642 else
1644
1645 switch (worker.type)
1646 {
1647 case WORKERTYPE_APPLY:
1648 values[9] = CStringGetTextDatum("apply");
1649 break;
1651 values[9] = CStringGetTextDatum("parallel apply");
1652 break;
1654 values[9] = CStringGetTextDatum("table synchronization");
1655 break;
1656 case WORKERTYPE_UNKNOWN:
1657 /* Should never happen. */
1658 elog(ERROR, "unknown worker type");
1659 }
1660
1661 tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc,
1662 values, nulls);
1663
1664 /*
1665 * If only a single subscription was requested, and we found it,
1666 * break.
1667 */
1668 if (OidIsValid(subid))
1669 break;
1670 }
1671
1672 LWLockRelease(LogicalRepWorkerLock);
1673
1674 return (Datum) 0;
1675}
static Datum values[MAXATTR]
Definition: bootstrap.c:153
#define CStringGetTextDatum(s)
Definition: builtins.h:97
#define PG_GETARG_OID(n)
Definition: fmgr.h:275
#define PG_ARGISNULL(n)
Definition: fmgr.h:209
void InitMaterializedSRF(FunctionCallInfo fcinfo, bits32 flags)
Definition: funcapi.c:76
#define PG_STAT_GET_SUBSCRIPTION_COLS
static Datum LSNGetDatum(XLogRecPtr X)
Definition: pg_lsn.h:31
static Datum ObjectIdGetDatum(Oid X)
Definition: postgres.h:262
unsigned int Oid
Definition: postgres_ext.h:32
bool IsBackendPid(int pid)
Definition: procarray.c:3253
TupleDesc setDesc
Definition: execnodes.h:364
Tuplestorestate * setResult
Definition: execnodes.h:363
void tuplestore_putvalues(Tuplestorestate *state, TupleDesc tdesc, const Datum *values, const bool *isnull)
Definition: tuplestore.c:784
static Datum TimestampTzGetDatum(TimestampTz X)
Definition: timestamp.h:52
#define XLogRecPtrIsInvalid(r)
Definition: xlogdefs.h:29

References CStringGetTextDatum, elog, ERROR, i, InitMaterializedSRF(), Int32GetDatum(), InvalidOid, IsBackendPid(), isParallelApplyWorker, isTablesyncWorker, LogicalRepWorker::last_lsn, LogicalRepWorker::last_recv_time, LogicalRepWorker::last_send_time, LogicalRepWorker::leader_pid, LogicalRepCtx, LSNGetDatum(), LW_SHARED, LWLockAcquire(), LWLockRelease(), max_logical_replication_workers, ObjectIdGetDatum(), OidIsValid, PG_ARGISNULL, PG_GETARG_OID, PG_STAT_GET_SUBSCRIPTION_COLS, PGPROC::pid, LogicalRepWorker::proc, LogicalRepWorker::relid, LogicalRepWorker::reply_lsn, LogicalRepWorker::reply_time, ReturnSetInfo::setDesc, ReturnSetInfo::setResult, LogicalRepWorker::subid, TimestampTzGetDatum(), tuplestore_putvalues(), LogicalRepWorker::type, values, LogicalRepCtxStruct::workers, WORKERTYPE_APPLY, WORKERTYPE_PARALLEL_APPLY, WORKERTYPE_TABLESYNC, WORKERTYPE_UNKNOWN, and XLogRecPtrIsInvalid.

◆ update_conflict_slot_xmin()

static void update_conflict_slot_xmin ( TransactionId  new_xmin)
static

Definition at line 1459 of file launcher.c.

1460{
1462 Assert(!TransactionIdIsValid(new_xmin) ||
1464
1465 /* Return if the xmin value of the slot cannot be updated */
1467 return;
1468
1471 MyReplicationSlot->data.xmin = new_xmin;
1473
1474 elog(DEBUG1, "updated xmin: %u", MyReplicationSlot->data.xmin);
1475
1478
1479 /*
1480 * Like PhysicalConfirmReceivedLocation(), do not save slot information
1481 * each time. This is acceptable because all concurrent transactions on
1482 * the publisher that require the data preceding the slot's xmin should
1483 * have already been applied and flushed on the subscriber before the xmin
1484 * is advanced. So, even if the slot's xmin regresses after a restart, it
1485 * will be advanced again in the next cycle. Therefore, no data required
1486 * for conflict detection will be prematurely removed.
1487 */
1488 return;
1489}
static bool TransactionIdPrecedesOrEquals(TransactionId id1, TransactionId id2)
Definition: transam.h:282
#define TransactionIdEquals(id1, id2)
Definition: transam.h:43

References Assert(), ReplicationSlot::data, DEBUG1, ReplicationSlot::effective_xmin, elog, ReplicationSlot::mutex, MyReplicationSlot, ReplicationSlotMarkDirty(), ReplicationSlotsComputeRequiredXmin(), SpinLockAcquire, SpinLockRelease, TransactionIdEquals, TransactionIdIsValid, TransactionIdPrecedesOrEquals(), and ReplicationSlotPersistentData::xmin.

Referenced by ApplyLauncherMain().

◆ WaitForReplicationWorkerAttach()

static bool WaitForReplicationWorkerAttach ( LogicalRepWorker worker,
uint16  generation,
BackgroundWorkerHandle handle 
)
static

Definition at line 181 of file launcher.c.

184{
185 bool result = false;
186 bool dropped_latch = false;
187
188 for (;;)
189 {
190 BgwHandleStatus status;
191 pid_t pid;
192 int rc;
193
195
196 LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
197
198 /* Worker either died or has started. Return false if died. */
199 if (!worker->in_use || worker->proc)
200 {
201 result = worker->in_use;
202 LWLockRelease(LogicalRepWorkerLock);
203 break;
204 }
205
206 LWLockRelease(LogicalRepWorkerLock);
207
208 /* Check if worker has died before attaching, and clean up after it. */
209 status = GetBackgroundWorkerPid(handle, &pid);
210
211 if (status == BGWH_STOPPED)
212 {
213 LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
214 /* Ensure that this was indeed the worker we waited for. */
215 if (generation == worker->generation)
217 LWLockRelease(LogicalRepWorkerLock);
218 break; /* result is already false */
219 }
220
221 /*
222 * We need timeout because we generally don't get notified via latch
223 * about the worker attach. But we don't expect to have to wait long.
224 */
225 rc = WaitLatch(MyLatch,
227 10L, WAIT_EVENT_BGWORKER_STARTUP);
228
229 if (rc & WL_LATCH_SET)
230 {
233 dropped_latch = true;
234 }
235 }
236
237 /*
238 * If we had to clear a latch event in order to wait, be sure to restore
239 * it before exiting. Otherwise caller may miss events.
240 */
241 if (dropped_latch)
243
244 return result;
245}
BgwHandleStatus GetBackgroundWorkerPid(BackgroundWorkerHandle *handle, pid_t *pidp)
Definition: bgworker.c:1158
BgwHandleStatus
Definition: bgworker.h:104
@ BGWH_STOPPED
Definition: bgworker.h:107

References BGWH_STOPPED, CHECK_FOR_INTERRUPTS, LogicalRepWorker::generation, GetBackgroundWorkerPid(), LogicalRepWorker::in_use, logicalrep_worker_cleanup(), LW_EXCLUSIVE, LW_SHARED, LWLockAcquire(), LWLockRelease(), MyLatch, LogicalRepWorker::proc, ResetLatch(), SetLatch(), WaitLatch(), WL_EXIT_ON_PM_DEATH, WL_LATCH_SET, and WL_TIMEOUT.

Referenced by logicalrep_worker_launch().

Variable Documentation

◆ dsh_params

const dshash_parameters dsh_params
static
Initial value:
= {
sizeof(Oid),
LWTRANCHE_LAUNCHER_HASH
}
void dshash_memcpy(void *dest, const void *src, size_t size, void *arg)
Definition: dshash.c:590
dshash_hash dshash_memhash(const void *v, size_t size, void *arg)
Definition: dshash.c:581
int dshash_memcmp(const void *a, const void *b, size_t size, void *arg)
Definition: dshash.c:572

Definition at line 81 of file launcher.c.

Referenced by logicalrep_launcher_attach_dshmem().

◆ last_start_times

◆ last_start_times_dsa

dsa_area* last_start_times_dsa = NULL
static

Definition at line 90 of file launcher.c.

Referenced by logicalrep_launcher_attach_dshmem().

◆ LogicalRepCtx

◆ max_logical_replication_workers

◆ max_parallel_apply_workers_per_subscription

int max_parallel_apply_workers_per_subscription = 2

Definition at line 54 of file launcher.c.

Referenced by logicalrep_worker_launch(), and pa_free_worker().

◆ max_sync_workers_per_subscription

int max_sync_workers_per_subscription = 2

Definition at line 53 of file launcher.c.

Referenced by logicalrep_worker_launch(), and ProcessSyncingTablesForApply().

◆ MyLogicalRepWorker

◆ on_commit_launcher_wakeup

bool on_commit_launcher_wakeup = false
static

Definition at line 93 of file launcher.c.

Referenced by ApplyLauncherWakeupAtCommit(), and AtEOXact_ApplyLauncher().