Skip to content

Commit 88afbdb

Browse files
author
Andrei Krichinin
committed
PG_VERSION_NUM test macro
1 parent 59786ba commit 88afbdb

File tree

5 files changed

+81
-0
lines changed

5 files changed

+81
-0
lines changed

src/dmq.c

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1119,7 +1119,11 @@ dmq_handle_message(StringInfo msg, DmqReceiverSlot *my_slot,
11191119
}
11201120
else
11211121
{
1122+
#if PG_VERSION_NUM < 150000
1123+
res = shm_mq_send(mq_handles[sub.procno], body_len, body, false);
1124+
#else
11221125
res = shm_mq_send(mq_handles[sub.procno], body_len, body, false, true);
1126+
#endif
11231127
if (res == SHM_MQ_DETACHED)
11241128
mtm_log(COMMERROR, "[DMQ] queue %d is detached, dropping message (stream=%s)",
11251129
sub.procno, stream_name);
@@ -1623,7 +1627,11 @@ dmq_push(DmqDestinationId dest_id, char *stream_name, char *msg)
16231627
buf.len, buf.len, buf.data);
16241628

16251629
/* XXX: use sendv instead */
1630+
#if PG_VERSION_NUM < 150000
1631+
res = shm_mq_send(dmq_local.mq_outh, buf.len, buf.data, false);
1632+
#else
16261633
res = shm_mq_send(dmq_local.mq_outh, buf.len, buf.data, false, true);
1634+
#endif
16271635
pfree(buf.data);
16281636
if (res != SHM_MQ_SUCCESS)
16291637
mtm_log(ERROR, "[DMQ] dmq_push: can't send to queue");
@@ -1648,7 +1656,11 @@ dmq_push_buffer(DmqDestinationId dest_id, char *stream_name, const void *payload
16481656
buf.len, buf.len, buf.data);
16491657

16501658
/* XXX: use sendv instead */
1659+
#if PG_VERSION_NUM < 150000
1660+
res = shm_mq_send(dmq_local.mq_outh, buf.len, buf.data, false);
1661+
#else
16511662
res = shm_mq_send(dmq_local.mq_outh, buf.len, buf.data, false, true);
1663+
#endif
16521664
pfree(buf.data);
16531665
if (res != SHM_MQ_SUCCESS)
16541666
mtm_log(ERROR, "[DMQ] dmq_push: can't send to queue, status = %d", res);

src/multimaster.c

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1097,7 +1097,9 @@ mtm_after_node_create(PG_FUNCTION_ARGS)
10971097
bool conninfo_isnull;
10981098
int n_nodes;
10991099
int rc;
1100+
#if PG_VERSION_NUM >= 150000
11001101
ParseState *pstate;
1102+
#endif
11011103

11021104
Assert(CALLED_AS_TRIGGER(fcinfo));
11031105
Assert(TRIGGER_FIRED_FOR_ROW(trigdata->tg_event));
@@ -1138,7 +1140,9 @@ mtm_after_node_create(PG_FUNCTION_ARGS)
11381140

11391141
mtm_log(NodeMgmt, "mtm_after_node_create %d", node_id);
11401142

1143+
#if PG_VERSION_NUM >= 150000
11411144
pstate = make_parsestate(NULL);
1145+
#endif
11421146

11431147
if (is_self)
11441148
{
@@ -1148,11 +1152,19 @@ mtm_after_node_create(PG_FUNCTION_ARGS)
11481152
*/
11491153
pub_stmt->pubname = MULTIMASTER_NAME;
11501154
pub_stmt->for_all_tables = false;
1155+
#if PG_VERSION_NUM < 150000
1156+
pub_stmt->tables = NIL;
1157+
#else
11511158
pub_stmt->pubobjects = NIL;
1159+
#endif
11521160
pub_stmt->options = list_make1(
11531161
makeDefElem("publish", (Node *) makeString(pstrdup("insert, truncate")), -1)
11541162
);
1163+
#if PG_VERSION_NUM < 150000
1164+
CreatePublication(pub_stmt);
1165+
#else
11551166
CreatePublication(pstate, pub_stmt);
1167+
#endif
11561168

11571169
/* liftoff */
11581170
MtmMonitorStart(MyDatabaseId, GetUserId());
@@ -1191,7 +1203,11 @@ mtm_after_node_create(PG_FUNCTION_ARGS)
11911203
client_min_messages = ERROR;
11921204
log_min_messages = ERROR;
11931205

1206+
#if PG_VERSION_NUM < 150000
1207+
CreateSubscription(cs_stmt, true);
1208+
#else
11941209
CreateSubscription(pstate, cs_stmt, true);
1210+
#endif
11951211

11961212
/* restore log_level's */
11971213
client_min_messages = saved_client_min_messages;
@@ -1209,7 +1225,9 @@ mtm_after_node_create(PG_FUNCTION_ARGS)
12091225
origin_name = psprintf(MULTIMASTER_SLOT_PATTERN, node_id);
12101226
replorigin_create(origin_name);
12111227
}
1228+
#if PG_VERSION_NUM >= 150000
12121229
free_parsestate(pstate);
1230+
#endif
12131231

12141232
PG_RETURN_VOID();
12151233
}

src/pglogical_proto.c

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -468,7 +468,11 @@ pglogical_write_prepare(StringInfo out, PGLogicalOutputData *data,
468468
/* send fixed fields */
469469
pq_sendint64(out, lsn);
470470
pq_sendint64(out, txn->end_lsn);
471+
#if PG_VERSION_NUM < 150000
472+
pq_sendint64(out, txn->commit_time);
473+
#else
471474
pq_sendint64(out, txn->xact_time.commit_time);
475+
#endif
472476

473477
send_node_id(out, txn, hooks_data);
474478
pq_sendint64(out, txn->origin_lsn);
@@ -499,7 +503,11 @@ pglogical_write_commit_prepared(StringInfo out, PGLogicalOutputData *data,
499503
/* send fixed fields */
500504
pq_sendint64(out, lsn);
501505
pq_sendint64(out, txn->end_lsn);
506+
#if PG_VERSION_NUM < 150000
507+
pq_sendint64(out, txn->commit_time);
508+
#else
502509
pq_sendint64(out, txn->xact_time.commit_time);
510+
#endif
503511

504512
send_node_id(out, txn, hooks_data);
505513
pq_sendint64(out, txn->origin_lsn);
@@ -532,7 +540,11 @@ pglogical_write_abort_prepared(StringInfo out, PGLogicalOutputData *data,
532540
/* send fixed fields */
533541
pq_sendint64(out, lsn);
534542
pq_sendint64(out, txn->end_lsn);
543+
#if PG_VERSION_NUM < 150000
544+
pq_sendint64(out, txn->commit_time);
545+
#else
535546
pq_sendint64(out, txn->xact_time.commit_time);
547+
#endif
536548

537549
send_node_id(out, txn, hooks_data);
538550
pq_sendint64(out, txn->origin_lsn);
@@ -560,7 +572,11 @@ pglogical_write_commit(StringInfo out, PGLogicalOutputData *data,
560572
/* send fixed fields */
561573
pq_sendint64(out, lsn);
562574
pq_sendint64(out, txn->end_lsn);
575+
#if PG_VERSION_NUM < 150000
576+
pq_sendint64(out, txn->commit_time);
577+
#else
563578
pq_sendint64(out, txn->xact_time.commit_time);
579+
#endif
564580

565581
send_node_id(out, txn, hooks_data);
566582
pq_sendint64(out, txn->origin_lsn);
@@ -583,7 +599,11 @@ pglogical_write_abort(StringInfo out, PGLogicalOutputData *data,
583599
/* send fixed fields */
584600
pq_sendint64(out, lsn);
585601
pq_sendint64(out, txn->end_lsn);
602+
#if PG_VERSION_NUM < 150000
603+
pq_sendint64(out, txn->commit_time);
604+
#else
586605
pq_sendint64(out, txn->xact_time.commit_time);
606+
#endif
587607

588608
send_node_id(out, txn, hooks_data);
589609
pq_sendint64(out, txn->origin_lsn);

src/state.c

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,9 @@
1212
#include <sys/types.h>
1313
#include <unistd.h>
1414

15+
#if PG_VERSION_NUM >= 150000
1516
#include "common/pg_prng.h"
17+
#endif
1618
#include "access/twophase.h"
1719
#include "access/xlogutils.h"
1820
#include "access/xlog_internal.h"
@@ -1678,7 +1680,11 @@ CampaignerMain(Datum main_arg)
16781680
MemoryContext campaigner_ctx = AllocSetContextCreate(TopMemoryContext,
16791681
"CampaignerContext",
16801682
ALLOCSET_DEFAULT_SIZES);
1683+
#if PG_VERSION_NUM < 150000
1684+
static unsigned short drandom_seed[3] = {0, 0, 0};
1685+
#else
16811686
static pg_prng_state drandom_seed = {0, 0};
1687+
#endif
16821688
TimestampTz last_campaign_at = 0;
16831689
int rc = WL_TIMEOUT;
16841690

@@ -1720,7 +1726,13 @@ CampaignerMain(Datum main_arg)
17201726

17211727
/* Mix the PID with the most predictable bits of the timestamp */
17221728
iseed = (uint64) now ^ ((uint64) MyProcPid << 32);
1729+
#if PG_VERSION_NUM < 150000
1730+
drandom_seed[0] = (unsigned short) iseed;
1731+
drandom_seed[1] = (unsigned short) (iseed >> 16);
1732+
drandom_seed[2] = (unsigned short) (iseed >> 32);
1733+
#else
17231734
pg_prng_seed(&drandom_seed, iseed);
1735+
#endif
17241736
}
17251737

17261738
/*
@@ -1800,10 +1812,17 @@ CampaignerMain(Datum main_arg)
18001812
* here nodes will mostly propose the same set of candidates,
18011813
* supporting each other)
18021814
*/
1815+
#if PG_VERSION_NUM < 150000
1816+
rc = WaitLatch(MyLatch,
1817+
WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
1818+
campaign_retry_interval * pg_erand48(drandom_seed),
1819+
PG_WAIT_EXTENSION);
1820+
#else
18031821
rc = WaitLatch(MyLatch,
18041822
WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
18051823
campaign_retry_interval * pg_prng_double(&drandom_seed),
18061824
PG_WAIT_EXTENSION);
1825+
#endif
18071826

18081827
if (rc & WL_LATCH_SET)
18091828
ResetLatch(MyLatch);
@@ -4239,7 +4258,11 @@ GetLoggedPreparedXactState(HTAB *txset)
42394258
XLogRecPtr start_lsn;
42404259
XLogRecPtr lsn;
42414260
TimeLineID timeline;
4261+
#if PG_VERSION_NUM < 150000
4262+
XLogRecPtr end_wal_lsn = GetFlushRecPtr();
4263+
#else
42424264
XLogRecPtr end_wal_lsn = GetFlushRecPtr(NULL);
4265+
#endif
42434266
XLogRecPtr end_lsn = end_wal_lsn;
42444267
int n_trans = hash_get_num_entries(txset);
42454268

src/syncpoint.c

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -477,7 +477,11 @@ RecoveryFilterLoad(int filter_node_id, Syncpoint *spvector, MtmConfig *mtm_cfg)
477477

478478
/* ensure we will scan everything written up to this point, just in case */
479479
XLogFlush(GetXLogWriteRecPtr());
480+
#if PG_VERSION_NUM < 150000
481+
current_last_lsn = GetFlushRecPtr();
482+
#else
480483
current_last_lsn = GetFlushRecPtr(NULL);
484+
#endif
481485

482486
Assert(current_last_lsn != InvalidXLogRecPtr);
483487

@@ -490,7 +494,11 @@ RecoveryFilterLoad(int filter_node_id, Syncpoint *spvector, MtmConfig *mtm_cfg)
490494
}
491495

492496
/* create hash */
497+
#if PG_VERSION_NUM < 150000
498+
estimate_size = (GetFlushRecPtr() - start_lsn) / 100;
499+
#else
493500
estimate_size = (GetFlushRecPtr(NULL) - start_lsn) / 100;
501+
#endif
494502
estimate_size = Min(Max(estimate_size, 1000), 100000);
495503
MemSet(&hash_ctl, 0, sizeof(hash_ctl));
496504
hash_ctl.keysize = hash_ctl.entrysize = sizeof(FilterEntry);

0 commit comments

Comments
 (0)