Skip to content

Commit 4642cb5

Browse files
danolivokelvich
authored andcommitted
In the join state we pause the receiver and wait for the workers to do all pending work. After that we suspend workers until the end of critical operations
1 parent 83e7117 commit 4642cb5

File tree

1 file changed

+18
-2
lines changed

1 file changed

+18
-2
lines changed

src/bgwpool.c

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@ BgwPoolStart(BgwPool* poolDesc, char *poolName, Oid db_id, Oid user_id)
6060
poolDesc->user_id = user_id;
6161

6262
poolDesc->nWorkers = 0;
63+
poolDesc->n_holders = 0;
6364
poolDesc->producerBlocked = false;
6465
poolDesc->head = 0;
6566
poolDesc->tail = 0;
@@ -180,8 +181,11 @@ BgwPoolMainLoop(BgwPool* poolDesc)
180181
continue;
181182
}
182183

183-
/* Wait for end of the node joining operation */
184-
if (poolDesc->n_holders > 0)
184+
/*
185+
* If we are in a join state, we need to apply all the pending data and
186+
* go into sleep mode until the end of the join operation.
187+
*/
188+
if (poolDesc->n_holders > 0 && poolDesc->pending == 0)
185189
{
186190
ConditionVariablePrepareToSleep(&Mtm->receiver_barrier_cv);
187191
LWLockRelease(&poolDesc->lock);
@@ -299,6 +303,18 @@ BgwPoolExecute(BgwPool* poolDesc, void* work, int size, MtmReceiverContext *ctx)
299303
Assert(MSGLEN(size) <= poolDesc->size);
300304

301305
LWLockAcquire(&poolDesc->lock, LW_EXCLUSIVE);
306+
307+
/* Wait for end of the node joining operation */
308+
while (poolDesc->n_holders > 0 && !ProcDiePending)
309+
{
310+
ConditionVariablePrepareToSleep(&Mtm->receiver_barrier_cv);
311+
LWLockRelease(&poolDesc->lock);
312+
if (!ProcDiePending)
313+
ConditionVariableSleep(&Mtm->receiver_barrier_cv, PG_WAIT_EXTENSION);
314+
ConditionVariableCancelSleep();
315+
LWLockAcquire(&poolDesc->lock, LW_EXCLUSIVE);
316+
}
317+
302318
while (!ProcDiePending)
303319
{
304320
/*

0 commit comments

Comments
 (0)