Skip to content

Commit 90e171f

Browse files
danolivokelvich
authored andcommitted
Replace BgwPool SpinLock with LWLock.
1 parent 0736b7c commit 90e171f

File tree

5 files changed

+69
-62
lines changed

5 files changed

+69
-62
lines changed

src/bgwpool.c

Lines changed: 50 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,7 @@
55
#include "postmaster/postmaster.h"
66
#include "postmaster/bgworker.h"
77
#include "storage/dsm.h"
8-
#include "storage/s_lock.h"
9-
#include "storage/spin.h"
8+
#include "storage/lwlock.h"
109
#include "storage/proc.h"
1110
#include "storage/pg_sema.h"
1211
#include "storage/shmem.h"
@@ -42,7 +41,9 @@ void BgwPoolDynamicWorkerMainLoop(Datum arg);
4241
void
4342
BgwPoolInit(BgwPool* pool)
4443
{
45-
SpinLockInit(&pool->lock);
44+
LWLockInitialize(&pool->lock, LWLockNewTrancheId());
45+
LWLockRegisterTranche(pool->lock.tranche, "BGWPOOL_LWLOCK");
46+
4647
pool->nWorkers = 0;
4748
pool->shutdown = false;
4849
pool->producerBlocked = false;
@@ -157,12 +158,12 @@ BgwPoolMainLoop(BgwPool* poolDesc)
157158
}
158159

159160
// XXX: change to LWLock
160-
SpinLockAcquire(&poolDesc->lock);
161+
LWLockAcquire(&poolDesc->lock, LW_EXCLUSIVE);
161162

162163
/* Worker caught the shutdown signal - release locks and return. */
163164
if (poolDesc->shutdown)
164165
{
165-
SpinLockRelease(&poolDesc->lock);
166+
LWLockRelease(&poolDesc->lock);
166167
break;
167168
}
168169

@@ -177,7 +178,7 @@ BgwPoolMainLoop(BgwPool* poolDesc)
177178
* remain in opinion, that worker waked up and doing its work.
178179
*/
179180
ConditionVariablePrepareToSleep(&poolDesc->available_cv);
180-
SpinLockRelease(&poolDesc->lock);
181+
LWLockRelease(&poolDesc->lock);
181182

182183
ConditionVariableSleep(&poolDesc->available_cv, PG_WAIT_EXTENSION);
183184
continue;
@@ -186,9 +187,9 @@ BgwPoolMainLoop(BgwPool* poolDesc)
186187
/* Wait for end of the node joining operation */
187188
while (poolDesc->n_holders > 0 && !poolDesc->shutdown)
188189
{
189-
SpinLockRelease(&poolDesc->lock);
190+
LWLockRelease(&poolDesc->lock);
190191
ConditionVariableSleep(&Mtm->receiver_barrier_cv, PG_WAIT_EXTENSION);
191-
SpinLockAcquire(&poolDesc->lock);
192+
LWLockAcquire(&poolDesc->lock, LW_EXCLUSIVE);
192193
}
193194

194195
size = *(int *) &queue[poolDesc->head];
@@ -230,14 +231,14 @@ BgwPoolMainLoop(BgwPool* poolDesc)
230231
ConditionVariableBroadcast(&poolDesc->overflow_cv);
231232
}
232233

233-
SpinLockRelease(&poolDesc->lock);
234+
LWLockRelease(&poolDesc->lock);
234235

235236
MtmExecutor(work, size, &ctx);
236237
pfree(work);
237238

238-
SpinLockAcquire(&poolDesc->lock);
239+
LWLockAcquire(&poolDesc->lock, LW_EXCLUSIVE);
239240
poolDesc->active -= 1;
240-
SpinLockRelease(&poolDesc->lock);
241+
LWLockRelease(&poolDesc->lock);
241242

242243
ConditionVariableBroadcast(&poolDesc->syncpoint_cv);
243244
}
@@ -287,15 +288,15 @@ static void BgwStartExtraWorker(BgwPool* pool)
287288
* After return from routine work and ctx buffers can be reused safely.
288289
*/
289290
void
290-
BgwPoolExecute(BgwPool* pool, void* work, int size, MtmReceiverContext *ctx)
291+
BgwPoolExecute(BgwPool* poolDesc, void* work, int size, MtmReceiverContext *ctx)
291292
{
292293
int payload = INTALIGN(sizeof(MtmReceiverContext));
293294

294-
Assert(pool != NULL);
295+
Assert(poolDesc != NULL);
295296
Assert(queue != NULL);
296297

297298
// XXX: align with spill size and assert that
298-
if (MSGLEN > pool->size)
299+
if (MSGLEN > poolDesc->size)
299300
{
300301
/*
301302
* Size of work is larger than size of shared buffer:
@@ -305,8 +306,8 @@ BgwPoolExecute(BgwPool* pool, void* work, int size, MtmReceiverContext *ctx)
305306
return;
306307
}
307308

308-
SpinLockAcquire(&pool->lock);
309-
while (!pool->shutdown)
309+
LWLockAcquire(&poolDesc->lock, LW_EXCLUSIVE);
310+
while (!poolDesc->shutdown)
310311
{
311312
/*
312313
* If queue is not wrapped through the end of buffer (head <= tail) we can
@@ -316,87 +317,89 @@ BgwPoolExecute(BgwPool* pool, void* work, int size, MtmReceiverContext *ctx)
316317
* If queue is wrapped through the end of buffer (tail < head) we can fit
317318
* message only between head and tail.
318319
*/
319-
if ((pool->head <= pool->tail &&
320-
(pool->size - pool->tail >= MSGLEN || pool->head >= size + payload)) ||
321-
(pool->head > pool->tail && pool->head - pool->tail >= MSGLEN))
320+
if ((poolDesc->head <= poolDesc->tail &&
321+
(poolDesc->size - poolDesc->tail >= MSGLEN ||
322+
poolDesc->head >= size + payload)) ||
323+
(poolDesc->head > poolDesc->tail &&
324+
poolDesc->head - poolDesc->tail >= MSGLEN))
322325
{
323-
pool->pending += 1;
326+
poolDesc->pending += 1;
324327

325-
if (pool->active + pool->pending > pool->nWorkers)
326-
BgwStartExtraWorker(pool);
328+
if (poolDesc->active + poolDesc->pending > poolDesc->nWorkers)
329+
BgwStartExtraWorker(poolDesc);
327330

328331
/*
329332
* We always have free space for size at tail, as everything is
330333
* int-aligned and when pool->tail becomes equal to pool->size it
331334
* is switched to zero.
332335
*/
333-
*(int *) &queue[pool->tail] = size;
336+
*(int *) &queue[poolDesc->tail] = size;
334337

335-
if (pool->size - pool->tail >= MSGLEN)
338+
if (poolDesc->size - poolDesc->tail >= MSGLEN)
336339
{
337-
memcpy(&queue[pool->tail + sizeof(int)], ctx, payload);
338-
memcpy(&queue[pool->tail + sizeof(int) + payload], work, size);
339-
pool->tail += MSGLEN;
340+
memcpy(&queue[poolDesc->tail + sizeof(int)], ctx, payload);
341+
memcpy(&queue[poolDesc->tail + sizeof(int) + payload], work, size);
342+
poolDesc->tail += MSGLEN;
340343
}
341344
else
342345
{
343346
/* Message can't fit into the end of queue. */
344347
memcpy(queue, ctx, payload);
345348
memcpy(&queue[payload], work, size);
346-
pool->tail = MSGLEN - sizeof(int);
349+
poolDesc->tail = MSGLEN - sizeof(int);
347350
}
348351

349-
if (pool->tail == pool->size)
350-
pool->tail = 0;
352+
if (poolDesc->tail == poolDesc->size)
353+
poolDesc->tail = 0;
351354

352-
ConditionVariableBroadcast(&pool->available_cv);
355+
ConditionVariableBroadcast(&poolDesc->available_cv);
353356
break;
354357
}
355358
else
356359
{
357-
pool->producerBlocked = true;
360+
poolDesc->producerBlocked = true;
358361
/* It is critical that the sleep preparation will stay here */
359-
ConditionVariablePrepareToSleep(&pool->overflow_cv);
360-
SpinLockRelease(&pool->lock);
361-
ConditionVariableSleep(&pool->overflow_cv, PG_WAIT_EXTENSION);
362-
SpinLockAcquire(&pool->lock);
362+
ConditionVariablePrepareToSleep(&poolDesc->overflow_cv);
363+
LWLockRelease(&poolDesc->lock);
364+
ConditionVariableSleep(&poolDesc->overflow_cv, PG_WAIT_EXTENSION);
365+
LWLockAcquire(&poolDesc->lock, LW_EXCLUSIVE);
363366
}
364367
}
365-
SpinLockRelease(&pool->lock);
368+
LWLockRelease(&poolDesc->lock);
366369
}
367370

368371
/*
369372
* Initiate shutdown process of workers: set shutdown sign and wake up all
370373
* workers.
371374
*/
372-
void PoolStateShutdown(BgwPool* pool)
375+
void PoolStateShutdown(BgwPool* poolDesc)
373376
{
374-
SpinLockAcquire(&pool->lock);
375-
pool->shutdown = true;
376-
ConditionVariableBroadcast(&pool->available_cv);
377-
SpinLockRelease(&pool->lock);
377+
LWLockAcquire(&poolDesc->lock, LW_EXCLUSIVE);
378+
poolDesc->shutdown = true;
379+
ConditionVariableBroadcast(&poolDesc->available_cv);
380+
LWLockRelease(&poolDesc->lock);
378381
}
379382

380383
/*
381384
* Tell our lads to cancel currently active transactions.
382385
*/
383386
void
384-
BgwPoolCancel(BgwPool* pool)
387+
BgwPoolCancel(BgwPool* poolDesc)
385388
{
386389
int i;
387390

388-
SpinLockAcquire(&pool->lock);
389-
for (i = 0; i < pool->nWorkers; i++)
391+
LWLockAcquire(&poolDesc->lock, LW_EXCLUSIVE);
392+
for (i = 0; i < poolDesc->nWorkers; i++)
390393
{
391394
BgwHandleStatus status;
392395
pid_t pid;
393396

394-
status = GetBackgroundWorkerPid(pool->bgwhandles[i], &pid);
397+
status = GetBackgroundWorkerPid(poolDesc->bgwhandles[i], &pid);
395398
if (status == BGWH_STARTED)
396399
{
397400
Assert(pid > 0);
398401
kill(pid, SIGINT);
399402
}
400403
}
401-
SpinLockRelease(&pool->lock);
404+
LWLockRelease(&poolDesc->lock);
402405
}

src/commit.c

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
#include "access/twophase.h"
1414
#include "access/transam.h"
1515
#include "storage/proc.h"
16+
#include "storage/spin.h"
1617
#include "utils/guc.h"
1718
#include "utils/syscache.h"
1819
#include "utils/snapmgr.h"

src/include/bgwpool.h

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,7 @@
11
#ifndef __BGWPOOL_H__
22
#define __BGWPOOL_H__
33

4-
#include "storage/s_lock.h"
5-
#include "storage/spin.h"
4+
#include "storage/lwlock.h"
65
#include "storage/pg_sema.h"
76
#include "postmaster/bgworker.h"
87
#include "storage/condition_variable.h"
@@ -21,7 +20,7 @@
2120
*/
2221
typedef struct
2322
{
24-
volatile slock_t lock;
23+
LWLock lock;
2524
ConditionVariable syncpoint_cv;
2625
int n_holders;
2726

src/multimaster.c

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -504,9 +504,9 @@ MtmAllApplyWorkersFinished()
504504
if (i == Mtm->my_node_id - 1)
505505
continue;
506506

507-
SpinLockAcquire(&Mtm->pools[i].lock);
507+
LWLockAcquire(&Mtm->pools[i].lock, LW_EXCLUSIVE);
508508
ntasks = Mtm->pools[i].active + Mtm->pools[i].pending;
509-
SpinLockRelease(&Mtm->pools[i].lock);
509+
LWLockRelease(&Mtm->pools[i].lock);
510510

511511
mtm_log(MtmApplyBgwFinish, "MtmAllApplyWorkersFinished %d tasks not finished", ntasks);
512512

@@ -938,9 +938,9 @@ mtm_join_node(PG_FUNCTION_ARGS)
938938
if (node_id == cfg->my_node_id)
939939
continue;
940940

941-
SpinLockAcquire(&Mtm->pools[node_id - 1].lock);
941+
LWLockAcquire(&Mtm->pools[node_id - 1].lock, LW_EXCLUSIVE);
942942
Mtm->pools[node_id-1].n_holders += 1;
943-
SpinLockRelease(&Mtm->pools[node_id - 1].lock);
943+
LWLockRelease(&Mtm->pools[node_id - 1].lock);
944944
}
945945

946946
/* Await for workers finish and create syncpoints */
@@ -989,9 +989,10 @@ mtm_join_node(PG_FUNCTION_ARGS)
989989
if (node_id == cfg->my_node_id)
990990
continue;
991991

992-
SpinLockAcquire(&Mtm->pools[node_id - 1].lock);
992+
993+
LWLockAcquire(&Mtm->pools[node_id - 1].lock, LW_EXCLUSIVE);
993994
Mtm->pools[node_id-1].n_holders -= 1;
994-
SpinLockRelease(&Mtm->pools[node_id - 1].lock);
995+
LWLockRelease(&Mtm->pools[node_id - 1].lock);
995996
}
996997
ConditionVariableBroadcast(&Mtm->receiver_barrier_cv);
997998
PG_RE_THROW();
@@ -1006,9 +1007,9 @@ mtm_join_node(PG_FUNCTION_ARGS)
10061007
if (node_id == cfg->my_node_id)
10071008
continue;
10081009

1009-
SpinLockAcquire(&Mtm->pools[node_id - 1].lock);
1010+
LWLockAcquire(&Mtm->pools[node_id - 1].lock, LW_EXCLUSIVE);
10101011
Mtm->pools[node_id-1].n_holders -= 1;
1011-
SpinLockRelease(&Mtm->pools[node_id - 1].lock);
1012+
LWLockRelease(&Mtm->pools[node_id - 1].lock);
10121013
}
10131014
ConditionVariableBroadcast(&Mtm->receiver_barrier_cv);
10141015

src/pglogical_apply.c

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -520,26 +520,29 @@ process_syncpoint(MtmReceiverContext *rctx, const char *msg, XLogRecPtr received
520520
* allow previous transactions to proceed. This way we will not delay
521521
* application of transaction bodies, just prepare record itself.
522522
*/
523+
LWLockAcquire(&Mtm->pools[rctx->node_id-1].lock, LW_EXCLUSIVE);
523524
for(;;)
524525
{
525526
int ntasks;
526527

527528
if (Mtm->pools[rctx->node_id-1].nWorkers <= 0)
529+
{
530+
LWLockRelease(&Mtm->pools[rctx->node_id-1].lock);
528531
break;
532+
}
529533

530-
SpinLockAcquire(&Mtm->pools[rctx->node_id-1].lock);
531534
ntasks = Mtm->pools[rctx->node_id-1].active +
532535
Mtm->pools[rctx->node_id-1].pending;
533536
ConditionVariablePrepareToSleep(&Mtm->pools[rctx->node_id-1].syncpoint_cv);
534-
SpinLockRelease(&Mtm->pools[rctx->node_id-1].lock);
537+
LWLockRelease(&Mtm->pools[rctx->node_id-1].lock);
535538

536539
Assert(ntasks >= 0);
537540
if (ntasks == 0)
538541
break;
539-
elog(LOG, "BEFORE syncpoint_cv");
542+
540543
ConditionVariableSleep(&Mtm->pools[rctx->node_id-1].syncpoint_cv,
541544
PG_WAIT_EXTENSION);
542-
elog(LOG, "AFTER syncpoint_cv");
545+
LWLockAcquire(&Mtm->pools[rctx->node_id-1].lock, LW_EXCLUSIVE);
543546
}
544547

545548
/*

0 commit comments

Comments
 (0)