Skip to content

Commit a58213a

Browse files
danolivokelvich
authored andcommitted
[PGPRO-3146] The logic for restarting the receivers and the resolver was rewritten.
Postmaster do not restarts resolver and receivers if they exit. At each cycle of the MtmMonitor we check the state of these processes and re-launch them.
1 parent 5ae8081 commit a58213a

File tree

4 files changed

+66
-5
lines changed

4 files changed

+66
-5
lines changed

src/pglogical_receiver.c

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -547,6 +547,7 @@ pglogical_receiver_main(Datum main_arg)
547547
snprintf(worker_proc, BGW_MAXLEN, "mtm-logrep-receiver-%d-%d",
548548
receiver_mtm_cfg->my_node_id, nodeId);
549549
BgwPoolStart(&Mtm->pools[nodeId-1], worker_proc, db_id, user_id);
550+
mtm_log(MtmReceiverStart, "Receiver %s has started.", worker_proc);
550551

551552
/*
552553
* This is the main loop of logical replication.
@@ -968,7 +969,7 @@ pglogical_receiver_main(Datum main_arg)
968969
*/
969970
BgwPoolCancel(&Mtm->pools[nodeId - 1]);
970971
MtmSleep(RECEIVER_SUSPEND_TIMEOUT);
971-
972+
mtm_log(MtmApplyError, "Receiver %s catch an error and will die", worker_proc);
972973
/* and die */
973974
PG_RE_THROW();
974975
}
@@ -989,7 +990,7 @@ MtmStartReceiver(int nodeId, Oid db_id, Oid user_id, pid_t monitor_pid)
989990
MemSet(&worker, 0, sizeof(BackgroundWorker));
990991
worker.bgw_flags = BGWORKER_SHMEM_ACCESS | BGWORKER_BACKEND_DATABASE_CONNECTION;
991992
worker.bgw_start_time = BgWorkerStart_ConsistentState;
992-
worker.bgw_restart_time = 1;
993+
worker.bgw_restart_time = BGW_NEVER_RESTART;
993994
worker.bgw_main_arg = Int32GetDatum(nodeId);
994995
worker.bgw_notify_pid = monitor_pid;
995996

src/resolver.c

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -135,7 +135,7 @@ ResolverStart(Oid db_id, Oid user_id)
135135
MemSet(&worker, 0, sizeof(BackgroundWorker));
136136
worker.bgw_flags = BGWORKER_SHMEM_ACCESS | BGWORKER_BACKEND_DATABASE_CONNECTION;
137137
worker.bgw_start_time = BgWorkerStart_ConsistentState;
138-
worker.bgw_restart_time = 1;
138+
worker.bgw_restart_time = BGW_NEVER_RESTART;
139139

140140
memcpy(worker.bgw_extra, &db_id, sizeof(Oid));
141141
memcpy(worker.bgw_extra + sizeof(Oid), &user_id, sizeof(Oid));
@@ -561,6 +561,7 @@ ResolverMain(Datum main_arg)
561561
LWLockAcquire(resolver_state->lock, LW_EXCLUSIVE);
562562
resolver_state->pid = MyProcPid;
563563
LWLockRelease(resolver_state->lock);
564+
mtm_log(ResolverTraceTxMsg, "Resolver started");
564565

565566
for(;;)
566567
{

src/state.c

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1193,7 +1193,7 @@ MtmMonitorStart(Oid db_id, Oid user_id)
11931193
MemSet(&worker, 0, sizeof(BackgroundWorker));
11941194
worker.bgw_flags = BGWORKER_SHMEM_ACCESS | BGWORKER_BACKEND_DATABASE_CONNECTION;
11951195
worker.bgw_start_time = BgWorkerStart_ConsistentState;
1196-
worker.bgw_restart_time = BGW_NEVER_RESTART; /* or we can start several receivers */
1196+
worker.bgw_restart_time = 1;
11971197
worker.bgw_main_arg = Int32GetDatum(0);
11981198

11991199
memcpy(worker.bgw_extra, &db_id, sizeof(Oid));
@@ -1658,10 +1658,13 @@ MtmMonitor(Datum arg)
16581658
/* Launch resolver */
16591659
Assert(resolver == NULL);
16601660
resolver = ResolverStart(db_id, user_id);
1661+
mtm_log(MtmStateMessage, "MtmMonitor started");
16611662

16621663
for (;;)
16631664
{
16641665
int rc;
1666+
int i;
1667+
pid_t pid;
16651668

16661669
CHECK_FOR_INTERRUPTS();
16671670

@@ -1712,6 +1715,29 @@ MtmMonitor(Datum arg)
17121715
config_valid = true;
17131716
}
17141717

1718+
/*
1719+
* Check and restart resolver and receivers if its stopped by any error.
1720+
*/
1721+
if (GetBackgroundWorkerPid(resolver, &pid) == BGWH_STOPPED)
1722+
{
1723+
mtm_log(MtmStateMessage, "Restart resolver");
1724+
resolver = ResolverStart(db_id, user_id);
1725+
}
1726+
1727+
for (i = 0; i < MTM_MAX_NODES; i++)
1728+
{
1729+
if (receivers[i] == NULL)
1730+
continue;
1731+
1732+
if (GetBackgroundWorkerPid(receivers[i], &pid) == BGWH_STOPPED)
1733+
{
1734+
mtm_log(MtmStateMessage, "Restart receiver for the node%d", i + 1);
1735+
/* Receiver has finished by some kind of mistake. Start it. */
1736+
receivers[i] = MtmStartReceiver(i+1, MyDatabaseId,
1737+
GetUserId(), MyProcPid);
1738+
}
1739+
}
1740+
17151741
// XXX: add tx start/stop to clear mcxt?
17161742
check_status_requests(mtm_cfg);
17171743

t/006_bugfixes.pl renamed to t/007_bugfixes.pl

Lines changed: 34 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
use PostgresNode;
44
use Cluster;
55
use TestLib;
6-
use Test::More tests => 2;
6+
use Test::More tests => 3;
77

88
my $cluster = new Cluster(3);
99
$cluster->init();
@@ -90,5 +90,38 @@
9090
is( (($hash0 eq $hash1) and ($hash1 eq $hash2)) , 1,
9191
"Check that hash is the same after query");
9292

93+
# ##############################################################################
94+
#
95+
# Check the PGPRO-3146 bug. Hard crash of backend causes restart of all postgres
96+
# processes. Multimaster node must be survived after the crash and included into
97+
# the multimaster after recovery.
98+
#
99+
# ##############################################################################
100+
101+
# Set GUC restart_after_crash in 'on' value
102+
$cluster->stop();
103+
foreach (0..$#{$cluster->{nodes}})
104+
{
105+
$cluster->{nodes}->[$_]->append_conf('postgresql.conf', q{restart_after_crash = on});
106+
}
107+
$cluster->start();
108+
$cluster->await_nodes( (0,1,2) );
109+
110+
# Simulate payload
111+
$cluster->pgbench(0, ('-i', '-n', -s => '1') );
112+
my $pgb1 = $cluster->pgbench_async(0, ('-n', -T => '15', -j=>'5', -c => '5') );
113+
sleep(5);
114+
115+
my $pid0 = $cluster->safe_psql(0, "SELECT pid FROM pg_stat_activity
116+
WHERE backend_type LIKE 'client backend'
117+
AND query LIKE 'UPDATE%' LIMIT 1;");
118+
119+
# Simulate hard crash
120+
note("Simulate hard crash of a backend by SIGKILL to $pid0");
121+
kill -9, $pid0;
122+
123+
$cluster->await_nodes( (0,1,2) );
124+
is($cluster->is_data_identic( (0,1,2) ), 1, "check consistency after crash");
125+
93126
$cluster->stop();
94127

0 commit comments

Comments
 (0)