Skip to content

Commit c5ffd02

Browse files
danolivokelvich
authored andcommitted
Improve Receiver code readability
1 parent 90e171f commit c5ffd02

File tree

1 file changed

+37
-48
lines changed

1 file changed

+37
-48
lines changed

src/pglogical_receiver.c

Lines changed: 37 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121
#include "postgres.h"
2222
#include "fmgr.h"
2323
#include "miscadmin.h"
24-
// #include "common/pg_socket.h"
2524
#include "pqexpbuffer.h"
2625
#include "access/xact.h"
2726
#include "access/clog.h"
@@ -67,8 +66,8 @@ bool MtmIsReceiver;
6766

6867
typedef struct MtmFlushPosition
6968
{
70-
dlist_node node;
71-
int node_id;
69+
dlist_node node;
70+
int node_id;
7271
XLogRecPtr local_end;
7372
XLogRecPtr remote_end;
7473
} MtmFlushPosition;
@@ -81,7 +80,7 @@ char const* const MtmReplicationModeName[] =
8180

8281
static dlist_head MtmLsnMapping = DLIST_STATIC_INIT(MtmLsnMapping);
8382

84-
MtmConfig *receiver_mtm_cfg;
83+
MtmConfig *receiver_mtm_cfg;
8584
bool receiver_mtm_cfg_valid;
8685

8786
/* Signal handling */
@@ -308,15 +307,15 @@ MtmExecute(void* work, int size, MtmReceiverContext *receiver_ctx, bool no_pool)
308307
static bool
309308
MtmFilterTransaction(char *record, int size, Syncpoint *spvector, HTAB *filter_map)
310309
{
311-
StringInfoData s;
312-
uint8 event;
310+
StringInfoData s;
311+
uint8 event;
313312
XLogRecPtr origin_lsn;
314313
XLogRecPtr end_lsn;
315314
XLogRecPtr tx_lsn;
316-
int replication_node;
317-
int origin_node;
318-
char const* gid = "";
319-
char msgtype PG_USED_FOR_ASSERTS_ONLY;
315+
int replication_node;
316+
int origin_node;
317+
char const* gid = "";
318+
char msgtype PG_USED_FOR_ASSERTS_ONLY;
320319

321320
s.data = record;
322321
s.len = size;
@@ -377,12 +376,10 @@ MtmFilterTransaction(char *record, int size, Syncpoint *spvector, HTAB *filter_m
377376

378377
hash_search(filter_map, &entry, HASH_FIND, &found);
379378

380-
{
381-
mtm_log(MtmReceiverFilter,
382-
"Filter (map) transaction %s from node %d event=%x (restrt=%"INT64_MODIFIER"x, tx=%d/%"INT64_MODIFIER"x) -> %d",
383-
gid, replication_node, event,
384-
spvector[origin_node-1].origin_lsn, origin_node, tx_lsn, found);
385-
}
379+
mtm_log(MtmReceiverFilter,
380+
"Filter (map) transaction %s from node %d event=%x (restrt=%"INT64_MODIFIER"x, tx=%d/%"INT64_MODIFIER"x) -> %d",
381+
gid, replication_node, event,
382+
spvector[origin_node-1].origin_lsn, origin_node, tx_lsn, found);
386383

387384
return found;
388385
}
@@ -422,10 +419,10 @@ MtmEndSession(int nodeId, bool unlock)
422419
static PGconn *
423420
receiver_connect(char *conninfo)
424421
{
425-
PGconn *conn;
426-
ConnStatusType status;
427-
const char *keys[] = {"dbname", "replication", NULL};
428-
const char *vals[] = {conninfo, "database", NULL};
422+
PGconn *conn;
423+
ConnStatusType status;
424+
const char *keys[] = {"dbname", "replication", NULL};
425+
const char *vals[] = {conninfo, "database", NULL};
429426

430427
conn = PQconnectdbParams(keys, vals, /* expand_dbname = */ true);
431428
status = PQstatus(conn);
@@ -447,9 +444,9 @@ receiver_connect(char *conninfo)
447444
void
448445
MtmReceiverCreateSlot(char *conninfo, int my_node_id)
449446
{
450-
StringInfoData cmd;
451-
PGresult *res;
452-
PGconn *conn = receiver_connect(conninfo);
447+
StringInfoData cmd;
448+
PGresult *res;
449+
PGconn *conn = receiver_connect(conninfo);
453450

454451
if (!conn)
455452
mtm_log(ERROR, "Could not connect to '%s'", conninfo);
@@ -846,15 +843,11 @@ pglogical_receiver_main(Datum main_arg)
846843
}
847844
if (stmt[0] == 'Z' || (stmt[0] == 'M' && (stmt[1] == 'L' || stmt[1] == 'P' || stmt[1] == 'C' || stmt[1] == 'S' ))) {
848845
if (stmt[0] == 'M' && stmt[1] == 'C')
849-
{
850846
/* concurrent DDL should be executed by parallel workers */
851847
MtmExecute(stmt, msg_len, &receiver_ctx, false);
852-
}
853848
else
854-
{
855849
/* all other messages should be processed by receiver itself */
856850
MtmExecute(stmt, msg_len, &receiver_ctx, true);
857-
}
858851
}
859852
else
860853
{
@@ -877,7 +870,8 @@ pglogical_receiver_main(Datum main_arg)
877870
else
878871
MtmExecute(buf.data, buf.used, &receiver_ctx, false);
879872

880-
} else if (spill_file >= 0)
873+
}
874+
else if (spill_file >= 0)
881875
{
882876
MtmCloseSpillFile(spill_file);
883877
resetStringInfo(&spill_info);
@@ -899,16 +893,16 @@ pglogical_receiver_main(Datum main_arg)
899893
* not more than the specified timeout, so that we can send a
900894
* response back to the client.
901895
*/
902-
int r;
903-
fd_set input_mask;
904-
int64 message_target = 0;
905-
int64 fsync_target = 0;
906-
struct timeval timeout;
907-
struct timeval *timeoutptr = NULL;
908-
int64 targettime;
909-
long secs;
910-
int usecs;
911-
int64 now;
896+
int r;
897+
fd_set input_mask;
898+
int64 message_target = 0;
899+
int64 fsync_target = 0;
900+
struct timeval timeout;
901+
struct timeval *timeoutptr = NULL;
902+
int64 targettime;
903+
long secs;
904+
int usecs;
905+
int64 now;
912906

913907
FD_ZERO(&input_mask);
914908
FD_SET(PQsocket(conn), &input_mask);
@@ -919,10 +913,7 @@ pglogical_receiver_main(Datum main_arg)
919913
if (fsync_target > 0 && fsync_target < targettime)
920914
targettime = fsync_target;
921915
now = feGetCurrentTimestamp();
922-
feTimestampDifference(now,
923-
targettime,
924-
&secs,
925-
&usecs);
916+
feTimestampDifference(now, targettime, &secs, &usecs);
926917
if (secs <= 0)
927918
timeout.tv_sec = 1; /* Always sleep at least 1 sec */
928919
else
@@ -940,14 +931,13 @@ pglogical_receiver_main(Datum main_arg)
940931
sendFeedback(conn, now, nodeId);
941932
}
942933
else if (r < 0 && errno == EINTR)
943-
{
944934
/*
945935
* Got a timeout or signal. Continue the loop and either
946936
* deliver a status packet to the server or just go back into
947937
* blocking.
948938
*/
949939
continue;
950-
}
940+
951941
else if (r < 0)
952942
{
953943
ereport(LOG, (MTM_ERRMSG("%s: Incorrect status received.",
@@ -1003,17 +993,16 @@ pglogical_receiver_main(Datum main_arg)
1003993
BgwPoolCancel(&Mtm->pools[nodeId - 1]);
1004994
MtmSleep(RECEIVER_SUSPEND_TIMEOUT);
1005995
}
1006-
// ByteBufferFree(&buf);
1007-
/* Never reach that point */
1008996

997+
/* Never reach that point */
1009998
proc_exit(2);
1010999
}
10111000

10121001
BackgroundWorkerHandle *
10131002
MtmStartReceiver(int nodeId, Oid db_id, Oid user_id, pid_t monitor_pid)
10141003
{
1015-
BackgroundWorker worker;
1016-
BackgroundWorkerHandle *handle;
1004+
BackgroundWorker worker;
1005+
BackgroundWorkerHandle *handle;
10171006

10181007
MemSet(&worker, 0, sizeof(BackgroundWorker));
10191008
worker.bgw_flags = BGWORKER_SHMEM_ACCESS | BGWORKER_BACKEND_DATABASE_CONNECTION;

0 commit comments

Comments
 (0)