Skip to content

Commit f966bb9

Browse files
danolivokelvich
authored andcommitted
Some code changes after regression tests
1 parent ee573a3 commit f966bb9

File tree

6 files changed

+23
-22
lines changed

6 files changed

+23
-22
lines changed

Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ submake-regress:
3434
$(MAKE) -C $(top_builddir)/src/test/regress tablespace-setup
3535

3636
ifndef MTM_ALL
37-
PROVE_TESTS=t/0000_precommit.pl t/000_regress.pl t/000_truncate.pl t/000_deadlock.pl t/001_basic_recovery.pl
37+
#PROVE_TESTS=t/0000_precommit.pl t/000_regress.pl t/000_truncate.pl t/000_deadlock.pl t/001_basic_recovery.pl
3838
endif
3939
check: temp-install submake-regress
4040
$(prove_check)

src/bgwpool.c

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -160,6 +160,9 @@ BgwPoolMainLoop(BgwPool* poolDesc)
160160
ProcessConfigFile(PGC_SIGHUP);
161161
}
162162

163+
CHECK_FOR_INTERRUPTS();
164+
AcceptInvalidationMessages();
165+
163166
// XXX: change to LWLock
164167
LWLockAcquire(&poolDesc->lock, LW_EXCLUSIVE);
165168

@@ -184,14 +187,17 @@ BgwPoolMainLoop(BgwPool* poolDesc)
184187
LWLockRelease(&poolDesc->lock);
185188

186189
ConditionVariableSleep(&poolDesc->available_cv, PG_WAIT_EXTENSION);
190+
ConditionVariableCancelSleep();
187191
continue;
188192
}
189193

190194
/* Wait for end of the node joining operation */
191195
while (poolDesc->n_holders > 0 && !poolDesc->shutdown)
192196
{
197+
ConditionVariablePrepareToSleep(&Mtm->receiver_barrier_cv);
193198
LWLockRelease(&poolDesc->lock);
194199
ConditionVariableSleep(&Mtm->receiver_barrier_cv, PG_WAIT_EXTENSION);
200+
ConditionVariableCancelSleep();
195201
LWLockAcquire(&poolDesc->lock, LW_EXCLUSIVE);
196202
}
197203

@@ -259,7 +265,7 @@ static void BgwStartExtraWorker(BgwPool* pool)
259265
{
260266
BackgroundWorker worker;
261267
BackgroundWorkerHandle* handle;
262-
MemoryContext oldcontext;
268+
pid_t pid;
263269

264270
if (pool->nWorkers >= MtmMaxWorkers)
265271
return;
@@ -268,21 +274,20 @@ static void BgwStartExtraWorker(BgwPool* pool)
268274
worker.bgw_flags = BGWORKER_SHMEM_ACCESS | BGWORKER_BACKEND_DATABASE_CONNECTION;
269275
worker.bgw_start_time = BgWorkerStart_ConsistentState;
270276
worker.bgw_restart_time = BGW_NEVER_RESTART;
277+
worker.bgw_notify_pid = MyProcPid;
271278
worker.bgw_main_arg = PointerGetDatum(pool);
272279
sprintf(worker.bgw_library_name, "multimaster");
273280
sprintf(worker.bgw_function_name, "BgwPoolDynamicWorkerMainLoop");
274281
snprintf(worker.bgw_name, BGW_MAXLEN, "%s-dynworker-%d", pool->poolName, (int) pool->nWorkers + 1);
275282

276283
pool->lastDynamicWorkerStartTime = GetCurrentTimestamp();
277284

278-
oldcontext = MemoryContextSwitchTo(TopMemoryContext);
279-
280285
if (RegisterDynamicBackgroundWorker(&worker, &handle))
281286
pool->bgwhandles[pool->nWorkers++] = handle;
282287
else
283288
elog(WARNING, "Failed to start dynamic background worker");
284289

285-
MemoryContextSwitchTo(oldcontext);
290+
WaitForBackgroundWorkerStartup(handle, &pid);
286291
}
287292

288293
/*
@@ -365,6 +370,7 @@ BgwPoolExecute(BgwPool* poolDesc, void* work, int size, MtmReceiverContext *ctx)
365370
ConditionVariablePrepareToSleep(&poolDesc->overflow_cv);
366371
LWLockRelease(&poolDesc->lock);
367372
ConditionVariableSleep(&poolDesc->overflow_cv, PG_WAIT_EXTENSION);
373+
ConditionVariableCancelSleep();
368374
LWLockAcquire(&poolDesc->lock, LW_EXCLUSIVE);
369375
}
370376
}

src/dmq.c

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@
4545
#include "storage/ipc.h"
4646
#include "tcop/tcopprot.h"
4747
#include "utils/dynahash.h"
48+
#include "utils/inval.h"
4849
#include "utils/ps_status.h"
4950

5051
#define DMQ_MQ_SIZE ((Size) 65536)
@@ -992,7 +993,6 @@ dmq_receiver_at_exit(int status, Datum receiver)
992993
dmq_receiver_stop_hook(sender_name);
993994
}
994995

995-
996996
Datum
997997
dmq_receiver_loop(PG_FUNCTION_ARGS)
998998
{
@@ -1152,6 +1152,7 @@ dmq_receiver_loop(PG_FUNCTION_ARGS)
11521152

11531153
// XXX: is it enough?
11541154
CHECK_FOR_INTERRUPTS();
1155+
AcceptInvalidationMessages();
11551156

11561157
if (dmq_now() - last_message_at > recv_timeout)
11571158
{

src/multimaster.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -325,7 +325,7 @@ _PG_init(void)
325325
"Maximal size of transaction after which transaction is written to the disk",
326326
NULL,
327327
&MtmTransSpillThreshold,
328-
25 * 1024, /* 100Mb */
328+
100 * 1024, /* 100Mb */
329329
0,
330330
MaxAllocSize/1024,
331331
PGC_SIGHUP,

src/pglogical_apply.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1107,10 +1107,10 @@ process_remote_insert(StringInfo s, Relation rel)
11071107
TupleDesc tupDesc = RelationGetDescr(rel);
11081108
HeapTuple tup;
11091109

1110+
PushActiveSnapshot(GetTransactionSnapshot());
11101111
estate = create_rel_estate(rel);
11111112
newslot = ExecInitExtraTupleSlot(estate, tupDesc);
11121113
oldslot = ExecInitExtraTupleSlot(estate, tupDesc);
1113-
PushActiveSnapshot(GetTransactionSnapshot());
11141114

11151115
ExecOpenIndices(estate->es_result_relation_info, false);
11161116
relinfo = estate->es_result_relation_info;

src/pglogical_receiver.c

Lines changed: 8 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -677,10 +677,12 @@ pglogical_receiver_main(Datum main_arg)
677677
{
678678
int rc, hdr_len;
679679

680-
if (ProcDiePending && Mtm->pools[nodeId-1].nWorkers > 0)
681-
PoolStateShutdown(&Mtm->pools[nodeId-1]);
682-
683-
CHECK_FOR_INTERRUPTS();
680+
if (ProcDiePending)
681+
{
682+
if (Mtm->pools[nodeId-1].nWorkers > 0)
683+
PoolStateShutdown(&Mtm->pools[nodeId-1]);
684+
proc_exit(0);
685+
}
684686

685687
/* Wait necessary amount of time */
686688
rc = WaitLatchOrSocket(MyLatch,
@@ -707,16 +709,8 @@ pglogical_receiver_main(Datum main_arg)
707709
proc_exit(1);
708710
}
709711

710-
if (ProcDiePending)
711-
{
712-
dsm_handle handle = Mtm->pools[nodeId-1].dsmhandler;
713-
dsm_segment *seg = dsm_find_mapping(handle);
714-
dsm_detach(seg);
715-
716-
if (Mtm->pools[nodeId-1].nWorkers > 0)
717-
PoolStateShutdown(&Mtm->pools[nodeId-1]);
718-
return;
719-
}
712+
CHECK_FOR_INTERRUPTS();
713+
AcceptInvalidationMessages();
720714

721715
if (count != MtmGetRecoveryCount())
722716
{

0 commit comments

Comments
 (0)