diff options
Diffstat (limited to 'src/backend/storage/ipc/sinvaladt.c')
-rw-r--r-- | src/backend/storage/ipc/sinvaladt.c | 200 |
1 files changed, 62 insertions, 138 deletions
diff --git a/src/backend/storage/ipc/sinvaladt.c b/src/backend/storage/ipc/sinvaladt.c index 748a792a854..f624bfc7d78 100644 --- a/src/backend/storage/ipc/sinvaladt.c +++ b/src/backend/storage/ipc/sinvaladt.c @@ -139,7 +139,6 @@ typedef struct ProcState { /* procPid is zero in an inactive ProcState array entry. */ pid_t procPid; /* PID of backend, for signaling */ - PGPROC *proc; /* PGPROC of backend */ /* nextMsgNum is meaningless if procPid == 0 or resetState is true. */ int nextMsgNum; /* next message number to read */ bool resetState; /* backend needs to reset its state */ @@ -172,8 +171,6 @@ typedef struct SISeg int minMsgNum; /* oldest message still needed */ int maxMsgNum; /* next message number to be assigned */ int nextThreshold; /* # of messages to call SICleanupQueue */ - int lastBackend; /* index of last active procState entry, +1 */ - int maxBackends; /* size of procState array */ slock_t msgnumLock; /* spinlock protecting maxMsgNum */ @@ -183,11 +180,29 @@ typedef struct SISeg SharedInvalidationMessage buffer[MAXNUMMESSAGES]; /* - * Per-backend invalidation state info (has MaxBackends entries). + * Per-backend invalidation state info. + * + * 'procState' has NumProcStateSlots entries, and is indexed by pgprocno. + * 'numProcs' is the number of slots currently in use, and 'pgprocnos' is + * a dense array of their indexes, to speed up scanning all in-use slots. + * + * 'pgprocnos' is largely redundant with ProcArrayStruct->pgprocnos, but + * having our separate copy avoids contention on ProcArrayLock, and allows + * us to track only the processes that participate in shared cache + * invalidations. */ + int numProcs; + int *pgprocnos; ProcState procState[FLEXIBLE_ARRAY_MEMBER]; } SISeg; +/* + * We reserve a slot for each possible BackendId, plus one for each + * possible auxiliary process type. (This scheme assumes there is not + * more than one of any auxiliary process type at a time.) + */ +#define NumProcStateSlots (MaxBackends + NUM_AUXILIARY_PROCS) + static SISeg *shmInvalBuffer; /* pointer to the shared inval buffer */ @@ -205,16 +220,8 @@ SInvalShmemSize(void) Size size; size = offsetof(SISeg, procState); - - /* - * In Hot Standby mode, the startup process requests a procState array - * slot using InitRecoveryTransactionEnvironment(). Even though - * MaxBackends doesn't account for the startup process, it is guaranteed - * to get a free slot. This is because the autovacuum launcher and worker - * processes, which are included in MaxBackends, are not started in Hot - * Standby mode. - */ - size = add_size(size, mul_size(sizeof(ProcState), MaxBackends)); + size = add_size(size, mul_size(sizeof(ProcState), NumProcStateSlots)); /* procState */ + size = add_size(size, mul_size(sizeof(int), NumProcStateSlots)); /* pgprocnos */ return size; } @@ -239,23 +246,22 @@ CreateSharedInvalidationState(void) shmInvalBuffer->minMsgNum = 0; shmInvalBuffer->maxMsgNum = 0; shmInvalBuffer->nextThreshold = CLEANUP_MIN; - shmInvalBuffer->lastBackend = 0; - shmInvalBuffer->maxBackends = MaxBackends; SpinLockInit(&shmInvalBuffer->msgnumLock); /* The buffer[] array is initially all unused, so we need not fill it */ /* Mark all backends inactive, and initialize nextLXID */ - for (i = 0; i < shmInvalBuffer->maxBackends; i++) + for (i = 0; i < NumProcStateSlots; i++) { shmInvalBuffer->procState[i].procPid = 0; /* inactive */ - shmInvalBuffer->procState[i].proc = NULL; shmInvalBuffer->procState[i].nextMsgNum = 0; /* meaningless */ shmInvalBuffer->procState[i].resetState = false; shmInvalBuffer->procState[i].signaled = false; shmInvalBuffer->procState[i].hasMessages = false; shmInvalBuffer->procState[i].nextLXID = InvalidLocalTransactionId; } + shmInvalBuffer->numProcs = 0; + shmInvalBuffer->pgprocnos = (int *) &shmInvalBuffer->procState[i]; } /* @@ -265,59 +271,41 @@ CreateSharedInvalidationState(void) void SharedInvalBackendInit(bool sendOnly) { - int index; - ProcState *stateP = NULL; + ProcState *stateP; + pid_t oldPid; SISeg *segP = shmInvalBuffer; + int pgprocno; + + if (MyBackendId <= 0) + elog(ERROR, "MyBackendId not set"); + if (MyBackendId > NumProcStateSlots) + elog(PANIC, "unexpected MyBackendId %d in SharedInvalBackendInit (max %d)", + MyBackendId, NumProcStateSlots); + pgprocno = MyBackendId - 1; + stateP = &segP->procState[pgprocno]; /* * This can run in parallel with read operations, but not with write - * operations, since SIInsertDataEntries relies on lastBackend to set - * hasMessages appropriately. + * operations, since SIInsertDataEntries relies on the pgprocnos array to + * set hasMessages appropriately. */ LWLockAcquire(SInvalWriteLock, LW_EXCLUSIVE); - /* Look for a free entry in the procState array */ - for (index = 0; index < segP->lastBackend; index++) - { - if (segP->procState[index].procPid == 0) /* inactive slot? */ - { - stateP = &segP->procState[index]; - break; - } - } - - if (stateP == NULL) + oldPid = stateP->procPid; + if (oldPid != 0) { - if (segP->lastBackend < segP->maxBackends) - { - stateP = &segP->procState[segP->lastBackend]; - Assert(stateP->procPid == 0); - segP->lastBackend++; - } - else - { - /* - * out of procState slots: MaxBackends exceeded -- report normally - */ - MyBackendId = InvalidBackendId; - LWLockRelease(SInvalWriteLock); - ereport(FATAL, - (errcode(ERRCODE_TOO_MANY_CONNECTIONS), - errmsg("sorry, too many clients already"))); - } + LWLockRelease(SInvalWriteLock); + elog(ERROR, "sinval slot for backend %d is already in use by process %d", + MyBackendId, (int) oldPid); } - MyBackendId = (stateP - &segP->procState[0]) + 1; - - /* Advertise assigned backend ID in MyProc */ - MyProc->backendId = MyBackendId; + shmInvalBuffer->pgprocnos[shmInvalBuffer->numProcs++] = pgprocno; /* Fetch next local transaction ID into local memory */ nextLocalTransactionId = stateP->nextLXID; /* mark myself active, with all extant messages already read */ stateP->procPid = MyProcPid; - stateP->proc = MyProc; stateP->nextMsgNum = segP->maxMsgNum; stateP->resetState = false; stateP->signaled = false; @@ -328,8 +316,6 @@ SharedInvalBackendInit(bool sendOnly) /* register exit routine to mark my entry inactive at exit */ on_shmem_exit(CleanupInvalidationState, PointerGetDatum(segP)); - - elog(DEBUG4, "my backend ID is %d", MyBackendId); } /* @@ -345,96 +331,36 @@ CleanupInvalidationState(int status, Datum arg) { SISeg *segP = (SISeg *) DatumGetPointer(arg); ProcState *stateP; + int pgprocno = MyBackendId - 1; int i; Assert(PointerIsValid(segP)); LWLockAcquire(SInvalWriteLock, LW_EXCLUSIVE); - stateP = &segP->procState[MyBackendId - 1]; + stateP = &segP->procState[pgprocno]; /* Update next local transaction ID for next holder of this backendID */ stateP->nextLXID = nextLocalTransactionId; /* Mark myself inactive */ stateP->procPid = 0; - stateP->proc = NULL; stateP->nextMsgNum = 0; stateP->resetState = false; stateP->signaled = false; - /* Recompute index of last active backend */ - for (i = segP->lastBackend; i > 0; i--) + for (i = segP->numProcs - 1; i >= 0; i--) { - if (segP->procState[i - 1].procPid != 0) - break; - } - segP->lastBackend = i; - - LWLockRelease(SInvalWriteLock); -} - -/* - * BackendIdGetProc - * Get the PGPROC structure for a backend, given the backend ID. - * The result may be out of date arbitrarily quickly, so the caller - * must be careful about how this information is used. NULL is - * returned if the backend is not active. - */ -PGPROC * -BackendIdGetProc(int backendID) -{ - PGPROC *result = NULL; - SISeg *segP = shmInvalBuffer; - - /* Need to lock out additions/removals of backends */ - LWLockAcquire(SInvalWriteLock, LW_SHARED); - - if (backendID > 0 && backendID <= segP->lastBackend) - { - ProcState *stateP = &segP->procState[backendID - 1]; - - result = stateP->proc; - } - - LWLockRelease(SInvalWriteLock); - - return result; -} - -/* - * BackendIdGetTransactionIds - * Get the xid, xmin, nsubxid and overflow status of the backend. The - * result may be out of date arbitrarily quickly, so the caller must be - * careful about how this information is used. - */ -void -BackendIdGetTransactionIds(int backendID, TransactionId *xid, - TransactionId *xmin, int *nsubxid, bool *overflowed) -{ - SISeg *segP = shmInvalBuffer; - - *xid = InvalidTransactionId; - *xmin = InvalidTransactionId; - *nsubxid = 0; - *overflowed = false; - - /* Need to lock out additions/removals of backends */ - LWLockAcquire(SInvalWriteLock, LW_SHARED); - - if (backendID > 0 && backendID <= segP->lastBackend) - { - ProcState *stateP = &segP->procState[backendID - 1]; - PGPROC *proc = stateP->proc; - - if (proc != NULL) + if (segP->pgprocnos[i] == pgprocno) { - *xid = proc->xid; - *xmin = proc->xmin; - *nsubxid = proc->subxidStatus.count; - *overflowed = proc->subxidStatus.overflowed; + if (i != segP->numProcs - 1) + segP->pgprocnos[i] = segP->pgprocnos[segP->numProcs - 1]; + break; } } + if (i < 0) + elog(PANIC, "could not find entry in sinval array"); + segP->numProcs--; LWLockRelease(SInvalWriteLock); } @@ -507,9 +433,9 @@ SIInsertDataEntries(const SharedInvalidationMessage *data, int n) * these (unlocked) changes will be committed to memory before we exit * the function. */ - for (i = 0; i < segP->lastBackend; i++) + for (i = 0; i < segP->numProcs; i++) { - ProcState *stateP = &segP->procState[i]; + ProcState *stateP = &segP->procState[segP->pgprocnos[i]]; stateP->hasMessages = true; } @@ -677,13 +603,14 @@ SICleanupQueue(bool callerHasWriteLock, int minFree) minsig = min - SIG_THRESHOLD; lowbound = min - MAXNUMMESSAGES + minFree; - for (i = 0; i < segP->lastBackend; i++) + for (i = 0; i < segP->numProcs; i++) { - ProcState *stateP = &segP->procState[i]; + ProcState *stateP = &segP->procState[segP->pgprocnos[i]]; int n = stateP->nextMsgNum; - /* Ignore if inactive or already in reset state */ - if (stateP->procPid == 0 || stateP->resetState || stateP->sendOnly) + /* Ignore if already in reset state */ + Assert(stateP->procPid != 0); + if (stateP->resetState || stateP->sendOnly) continue; /* @@ -719,11 +646,8 @@ SICleanupQueue(bool callerHasWriteLock, int minFree) { segP->minMsgNum -= MSGNUMWRAPAROUND; segP->maxMsgNum -= MSGNUMWRAPAROUND; - for (i = 0; i < segP->lastBackend; i++) - { - /* we don't bother skipping inactive entries here */ - segP->procState[i].nextMsgNum -= MSGNUMWRAPAROUND; - } + for (i = 0; i < segP->numProcs; i++) + segP->procState[segP->pgprocnos[i]].nextMsgNum -= MSGNUMWRAPAROUND; } /* |