diff options
Diffstat (limited to 'src/backend/replication/slot.c')
-rw-r--r-- | src/backend/replication/slot.c | 91 |
1 files changed, 47 insertions, 44 deletions
diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c index 76e55736605..ee0c7c07a97 100644 --- a/src/backend/replication/slot.c +++ b/src/backend/replication/slot.c @@ -72,7 +72,7 @@ typedef struct ReplicationSlotOnDisk sizeof(ReplicationSlotOnDisk) - ReplicationSlotOnDiskConstantSize #define SLOT_MAGIC 0x1051CA1 /* format identifier */ -#define SLOT_VERSION 1 /* version for new files */ +#define SLOT_VERSION 1 /* version for new files */ /* Control array for replication slot management */ ReplicationSlotCtlData *ReplicationSlotCtl = NULL; @@ -81,7 +81,8 @@ ReplicationSlotCtlData *ReplicationSlotCtl = NULL; ReplicationSlot *MyReplicationSlot = NULL; /* GUCs */ -int max_replication_slots = 0; /* the maximum number of replication slots */ +int max_replication_slots = 0; /* the maximum number of replication + * slots */ static void ReplicationSlotDropAcquired(void); @@ -180,8 +181,8 @@ ReplicationSlotValidateName(const char *name, int elevel) { ereport(elevel, (errcode(ERRCODE_INVALID_NAME), - errmsg("replication slot name \"%s\" contains invalid character", - name), + errmsg("replication slot name \"%s\" contains invalid character", + name), errhint("Replication slot names may only contain letters, numbers and the underscore character."))); return false; } @@ -194,7 +195,7 @@ ReplicationSlotValidateName(const char *name, int elevel) * * name: Name of the slot * db_specific: logical decoding is db specific; if the slot is going to - * be used for that pass true, otherwise false. + * be used for that pass true, otherwise false. */ void ReplicationSlotCreate(const char *name, bool db_specific, @@ -208,18 +209,18 @@ ReplicationSlotCreate(const char *name, bool db_specific, ReplicationSlotValidateName(name, ERROR); /* - * If some other backend ran this code currently with us, we'd likely - * both allocate the same slot, and that would be bad. We'd also be - * at risk of missing a name collision. Also, we don't want to try to - * create a new slot while somebody's busy cleaning up an old one, because - * we might both be monkeying with the same directory. + * If some other backend ran this code currently with us, we'd likely both + * allocate the same slot, and that would be bad. We'd also be at risk of + * missing a name collision. Also, we don't want to try to create a new + * slot while somebody's busy cleaning up an old one, because we might + * both be monkeying with the same directory. */ LWLockAcquire(ReplicationSlotAllocationLock, LW_EXCLUSIVE); /* - * Check for name collision, and identify an allocatable slot. We need - * to hold ReplicationSlotControlLock in shared mode for this, so that - * nobody else can change the in_use flags while we're looking at them. + * Check for name collision, and identify an allocatable slot. We need to + * hold ReplicationSlotControlLock in shared mode for this, so that nobody + * else can change the in_use flags while we're looking at them. */ LWLockAcquire(ReplicationSlotControlLock, LW_SHARED); for (i = 0; i < max_replication_slots; i++) @@ -243,10 +244,10 @@ ReplicationSlotCreate(const char *name, bool db_specific, errhint("Free one or increase max_replication_slots."))); /* - * Since this slot is not in use, nobody should be looking at any - * part of it other than the in_use field unless they're trying to allocate - * it. And since we hold ReplicationSlotAllocationLock, nobody except us - * can be doing that. So it's safe to initialize the slot. + * Since this slot is not in use, nobody should be looking at any part of + * it other than the in_use field unless they're trying to allocate it. + * And since we hold ReplicationSlotAllocationLock, nobody except us can + * be doing that. So it's safe to initialize the slot. */ Assert(!slot->in_use); Assert(!slot->active); @@ -366,6 +367,7 @@ ReplicationSlotRelease(void) { /* Mark slot inactive. We're not freeing it, just disconnecting. */ volatile ReplicationSlot *vslot = slot; + SpinLockAcquire(&slot->mutex); vslot->active = false; SpinLockRelease(&slot->mutex); @@ -444,7 +446,7 @@ ReplicationSlotDropAcquired(void) else { volatile ReplicationSlot *vslot = slot; - bool fail_softly = slot->data.persistency == RS_EPHEMERAL; + bool fail_softly = slot->data.persistency == RS_EPHEMERAL; SpinLockAcquire(&slot->mutex); vslot->active = false; @@ -571,8 +573,8 @@ ReplicationSlotsComputeRequiredXmin(bool already_locked) for (i = 0; i < max_replication_slots; i++) { ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i]; - TransactionId effective_xmin; - TransactionId effective_catalog_xmin; + TransactionId effective_xmin; + TransactionId effective_catalog_xmin; if (!s->in_use) continue; @@ -612,7 +614,7 @@ void ReplicationSlotsComputeRequiredLSN(void) { int i; - XLogRecPtr min_required = InvalidXLogRecPtr; + XLogRecPtr min_required = InvalidXLogRecPtr; Assert(ReplicationSlotCtl != NULL); @@ -620,7 +622,7 @@ ReplicationSlotsComputeRequiredLSN(void) for (i = 0; i < max_replication_slots; i++) { ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i]; - XLogRecPtr restart_lsn; + XLogRecPtr restart_lsn; if (!s->in_use) continue; @@ -669,7 +671,7 @@ ReplicationSlotsComputeLogicalRestartLSN(void) for (i = 0; i < max_replication_slots; i++) { volatile ReplicationSlot *s; - XLogRecPtr restart_lsn; + XLogRecPtr restart_lsn; s = &ReplicationSlotCtl->replication_slots[i]; @@ -772,8 +774,8 @@ CheckSlotRequirements(void) static bool string_endswith(const char *str, const char *end) { - size_t slen = strlen(str); - size_t elen = strlen(end); + size_t slen = strlen(str); + size_t elen = strlen(end); /* can't be a postfix if longer */ if (elen > slen) @@ -802,8 +804,8 @@ CheckPointReplicationSlots(void) * Prevent any slot from being created/dropped while we're active. As we * explicitly do *not* want to block iterating over replication_slots or * acquiring a slot we cannot take the control lock - but that's OK, - * because holding ReplicationSlotAllocationLock is strictly stronger, - * and enough to guarantee that nobody can change the in_use bits on us. + * because holding ReplicationSlotAllocationLock is strictly stronger, and + * enough to guarantee that nobody can change the in_use bits on us. */ LWLockAcquire(ReplicationSlotAllocationLock, LW_SHARED); @@ -839,7 +841,7 @@ StartupReplicationSlots(XLogRecPtr checkPointRedo) replication_dir = AllocateDir("pg_replslot"); while ((replication_de = ReadDir(replication_dir, "pg_replslot")) != NULL) { - struct stat statbuf; + struct stat statbuf; char path[MAXPGPATH]; if (strcmp(replication_de->d_name, ".") == 0 || @@ -892,7 +894,7 @@ CreateSlotOnDisk(ReplicationSlot *slot) { char tmppath[MAXPGPATH]; char path[MAXPGPATH]; - struct stat st; + struct stat st; /* * No need to take out the io_in_progress_lock, nobody else can see this @@ -904,11 +906,10 @@ CreateSlotOnDisk(ReplicationSlot *slot) sprintf(tmppath, "pg_replslot/%s.tmp", NameStr(slot->data.name)); /* - * It's just barely possible that some previous effort to create or - * drop a slot with this name left a temp directory lying around. - * If that seems to be the case, try to remove it. If the rmtree() - * fails, we'll error out at the mkdir() below, so we don't bother - * checking success. + * It's just barely possible that some previous effort to create or drop a + * slot with this name left a temp directory lying around. If that seems + * to be the case, try to remove it. If the rmtree() fails, we'll error + * out at the mkdir() below, so we don't bother checking success. */ if (stat(tmppath, &st) == 0 && S_ISDIR(st.st_mode)) rmtree(tmppath, true); @@ -922,7 +923,7 @@ CreateSlotOnDisk(ReplicationSlot *slot) fsync_fname(tmppath, true); /* Write the actual state file. */ - slot->dirty = true; /* signal that we really need to write */ + slot->dirty = true; /* signal that we really need to write */ SaveSlotToPath(slot, tmppath, ERROR); /* Rename the directory into place. */ @@ -1003,12 +1004,13 @@ SaveSlotToPath(ReplicationSlot *slot, const char *dir, int elevel) SpinLockRelease(&slot->mutex); COMP_CRC32(cp.checksum, - (char *)(&cp) + ReplicationSlotOnDiskConstantSize, + (char *) (&cp) + ReplicationSlotOnDiskConstantSize, ReplicationSlotOnDiskDynamicSize); if ((write(fd, &cp, sizeof(cp))) != sizeof(cp)) { - int save_errno = errno; + int save_errno = errno; + CloseTransientFile(fd); errno = save_errno; ereport(elevel, @@ -1021,7 +1023,8 @@ SaveSlotToPath(ReplicationSlot *slot, const char *dir, int elevel) /* fsync the temporary file */ if (pg_fsync(fd) != 0) { - int save_errno = errno; + int save_errno = errno; + CloseTransientFile(fd); errno = save_errno; ereport(elevel, @@ -1150,19 +1153,19 @@ RestoreSlotFromDisk(const char *name) if (cp.version != SLOT_VERSION) ereport(PANIC, (errcode_for_file_access(), - errmsg("replication slot file \"%s\" has unsupported version %u", - path, cp.version))); + errmsg("replication slot file \"%s\" has unsupported version %u", + path, cp.version))); /* boundary check on length */ if (cp.length != ReplicationSlotOnDiskDynamicSize) ereport(PANIC, (errcode_for_file_access(), - errmsg("replication slot file \"%s\" has corrupted length %u", - path, cp.length))); + errmsg("replication slot file \"%s\" has corrupted length %u", + path, cp.length))); /* Now that we know the size, read the entire file */ readBytes = read(fd, - (char *)&cp + ReplicationSlotOnDiskConstantSize, + (char *) &cp + ReplicationSlotOnDiskConstantSize, cp.length); if (readBytes != cp.length) { @@ -1181,7 +1184,7 @@ RestoreSlotFromDisk(const char *name) /* now verify the CRC32 */ INIT_CRC32(checksum); COMP_CRC32(checksum, - (char *)&cp + ReplicationSlotOnDiskConstantSize, + (char *) &cp + ReplicationSlotOnDiskConstantSize, ReplicationSlotOnDiskDynamicSize); if (!EQ_CRC32(checksum, cp.checksum)) |