PostgreSQL Source Code git master
subscriptioncmds.c File Reference
#include "postgres.h"
#include "access/commit_ts.h"
#include "access/htup_details.h"
#include "access/table.h"
#include "access/twophase.h"
#include "access/xact.h"
#include "catalog/catalog.h"
#include "catalog/dependency.h"
#include "catalog/indexing.h"
#include "catalog/namespace.h"
#include "catalog/objectaccess.h"
#include "catalog/objectaddress.h"
#include "catalog/pg_authid_d.h"
#include "catalog/pg_database_d.h"
#include "catalog/pg_subscription.h"
#include "catalog/pg_subscription_rel.h"
#include "catalog/pg_type.h"
#include "commands/defrem.h"
#include "commands/event_trigger.h"
#include "commands/subscriptioncmds.h"
#include "executor/executor.h"
#include "miscadmin.h"
#include "nodes/makefuncs.h"
#include "pgstat.h"
#include "replication/logicallauncher.h"
#include "replication/logicalworker.h"
#include "replication/origin.h"
#include "replication/slot.h"
#include "replication/walreceiver.h"
#include "replication/walsender.h"
#include "replication/worker_internal.h"
#include "storage/lmgr.h"
#include "utils/acl.h"
#include "utils/builtins.h"
#include "utils/guc.h"
#include "utils/lsyscache.h"
#include "utils/memutils.h"
#include "utils/pg_lsn.h"
#include "utils/syscache.h"
Include dependency graph for subscriptioncmds.c:

Go to the source code of this file.

Data Structures

struct  SubOpts
 
struct  PublicationRelKind
 

Macros

#define SUBOPT_CONNECT   0x00000001
 
#define SUBOPT_ENABLED   0x00000002
 
#define SUBOPT_CREATE_SLOT   0x00000004
 
#define SUBOPT_SLOT_NAME   0x00000008
 
#define SUBOPT_COPY_DATA   0x00000010
 
#define SUBOPT_SYNCHRONOUS_COMMIT   0x00000020
 
#define SUBOPT_REFRESH   0x00000040
 
#define SUBOPT_BINARY   0x00000080
 
#define SUBOPT_STREAMING   0x00000100
 
#define SUBOPT_TWOPHASE_COMMIT   0x00000200
 
#define SUBOPT_DISABLE_ON_ERR   0x00000400
 
#define SUBOPT_PASSWORD_REQUIRED   0x00000800
 
#define SUBOPT_RUN_AS_OWNER   0x00001000
 
#define SUBOPT_FAILOVER   0x00002000
 
#define SUBOPT_RETAIN_DEAD_TUPLES   0x00004000
 
#define SUBOPT_MAX_RETENTION_DURATION   0x00008000
 
#define SUBOPT_LSN   0x00010000
 
#define SUBOPT_ORIGIN   0x00020000
 
#define IsSet(val, bits)   (((val) & (bits)) == (bits))
 

Typedefs

typedef struct SubOpts SubOpts
 
typedef struct PublicationRelKind PublicationRelKind
 

Functions

static Listfetch_relation_list (WalReceiverConn *wrconn, List *publications)
 
static void check_publications_origin_tables (WalReceiverConn *wrconn, List *publications, bool copydata, bool retain_dead_tuples, char *origin, Oid *subrel_local_oids, int subrel_count, char *subname)
 
static void check_publications_origin_sequences (WalReceiverConn *wrconn, List *publications, bool copydata, char *origin, Oid *subrel_local_oids, int subrel_count, char *subname)
 
static void check_pub_dead_tuple_retention (WalReceiverConn *wrconn)
 
static void check_duplicates_in_publist (List *publist, Datum *datums)
 
static Listmerge_publications (List *oldpublist, List *newpublist, bool addpub, const char *subname)
 
static void ReportSlotConnectionError (List *rstates, Oid subid, char *slotname, char *err)
 
static void CheckAlterSubOption (Subscription *sub, const char *option, bool slot_needs_update, bool isTopLevel)
 
static void parse_subscription_options (ParseState *pstate, List *stmt_options, bits32 supported_opts, SubOpts *opts)
 
static void check_publications (WalReceiverConn *wrconn, List *publications)
 
static Datum publicationListToArray (List *publist)
 
ObjectAddress CreateSubscription (ParseState *pstate, CreateSubscriptionStmt *stmt, bool isTopLevel)
 
static void AlterSubscription_refresh (Subscription *sub, bool copy_data, List *validate_publications)
 
static void AlterSubscription_refresh_seq (Subscription *sub)
 
ObjectAddress AlterSubscription (ParseState *pstate, AlterSubscriptionStmt *stmt, bool isTopLevel)
 
void DropSubscription (DropSubscriptionStmt *stmt, bool isTopLevel)
 
void ReplicationSlotDropAtPubNode (WalReceiverConn *wrconn, char *slotname, bool missing_ok)
 
static void AlterSubscriptionOwner_internal (Relation rel, HeapTuple tup, Oid newOwnerId)
 
ObjectAddress AlterSubscriptionOwner (const char *name, Oid newOwnerId)
 
void AlterSubscriptionOwner_oid (Oid subid, Oid newOwnerId)
 
void CheckSubDeadTupleRetention (bool check_guc, bool sub_disabled, int elevel_for_sub_disabled, bool retain_dead_tuples, bool retention_active, bool max_retention_set)
 
static bool list_member_rangevar (const List *list, RangeVar *rv)
 
char defGetStreamingMode (DefElem *def)
 

Macro Definition Documentation

◆ IsSet

#define IsSet (   val,
  bits 
)    (((val) & (bits)) == (bits))

Definition at line 80 of file subscriptioncmds.c.

◆ SUBOPT_BINARY

#define SUBOPT_BINARY   0x00000080

Definition at line 67 of file subscriptioncmds.c.

◆ SUBOPT_CONNECT

#define SUBOPT_CONNECT   0x00000001

Definition at line 60 of file subscriptioncmds.c.

◆ SUBOPT_COPY_DATA

#define SUBOPT_COPY_DATA   0x00000010

Definition at line 64 of file subscriptioncmds.c.

◆ SUBOPT_CREATE_SLOT

#define SUBOPT_CREATE_SLOT   0x00000004

Definition at line 62 of file subscriptioncmds.c.

◆ SUBOPT_DISABLE_ON_ERR

#define SUBOPT_DISABLE_ON_ERR   0x00000400

Definition at line 70 of file subscriptioncmds.c.

◆ SUBOPT_ENABLED

#define SUBOPT_ENABLED   0x00000002

Definition at line 61 of file subscriptioncmds.c.

◆ SUBOPT_FAILOVER

#define SUBOPT_FAILOVER   0x00002000

Definition at line 73 of file subscriptioncmds.c.

◆ SUBOPT_LSN

#define SUBOPT_LSN   0x00010000

Definition at line 76 of file subscriptioncmds.c.

◆ SUBOPT_MAX_RETENTION_DURATION

#define SUBOPT_MAX_RETENTION_DURATION   0x00008000

Definition at line 75 of file subscriptioncmds.c.

◆ SUBOPT_ORIGIN

#define SUBOPT_ORIGIN   0x00020000

Definition at line 77 of file subscriptioncmds.c.

◆ SUBOPT_PASSWORD_REQUIRED

#define SUBOPT_PASSWORD_REQUIRED   0x00000800

Definition at line 71 of file subscriptioncmds.c.

◆ SUBOPT_REFRESH

#define SUBOPT_REFRESH   0x00000040

Definition at line 66 of file subscriptioncmds.c.

◆ SUBOPT_RETAIN_DEAD_TUPLES

#define SUBOPT_RETAIN_DEAD_TUPLES   0x00004000

Definition at line 74 of file subscriptioncmds.c.

◆ SUBOPT_RUN_AS_OWNER

#define SUBOPT_RUN_AS_OWNER   0x00001000

Definition at line 72 of file subscriptioncmds.c.

◆ SUBOPT_SLOT_NAME

#define SUBOPT_SLOT_NAME   0x00000008

Definition at line 63 of file subscriptioncmds.c.

◆ SUBOPT_STREAMING

#define SUBOPT_STREAMING   0x00000100

Definition at line 68 of file subscriptioncmds.c.

◆ SUBOPT_SYNCHRONOUS_COMMIT

#define SUBOPT_SYNCHRONOUS_COMMIT   0x00000020

Definition at line 65 of file subscriptioncmds.c.

◆ SUBOPT_TWOPHASE_COMMIT

#define SUBOPT_TWOPHASE_COMMIT   0x00000200

Definition at line 69 of file subscriptioncmds.c.

Typedef Documentation

◆ PublicationRelKind

◆ SubOpts

typedef struct SubOpts SubOpts

Function Documentation

◆ AlterSubscription()

ObjectAddress AlterSubscription ( ParseState pstate,
AlterSubscriptionStmt stmt,
bool  isTopLevel 
)

Definition at line 1331 of file subscriptioncmds.c.

1333{
1334 Relation rel;
1335 ObjectAddress myself;
1336 bool nulls[Natts_pg_subscription];
1337 bool replaces[Natts_pg_subscription];
1338 Datum values[Natts_pg_subscription];
1339 HeapTuple tup;
1340 Oid subid;
1341 bool update_tuple = false;
1342 bool update_failover = false;
1343 bool update_two_phase = false;
1344 bool check_pub_rdt = false;
1345 bool retain_dead_tuples;
1346 int max_retention;
1347 bool retention_active;
1348 char *origin;
1349 Subscription *sub;
1351 bits32 supported_opts;
1352 SubOpts opts = {0};
1353
1354 rel = table_open(SubscriptionRelationId, RowExclusiveLock);
1355
1356 /* Fetch the existing tuple. */
1357 tup = SearchSysCacheCopy2(SUBSCRIPTIONNAME, ObjectIdGetDatum(MyDatabaseId),
1358 CStringGetDatum(stmt->subname));
1359
1360 if (!HeapTupleIsValid(tup))
1361 ereport(ERROR,
1362 (errcode(ERRCODE_UNDEFINED_OBJECT),
1363 errmsg("subscription \"%s\" does not exist",
1364 stmt->subname)));
1365
1366 form = (Form_pg_subscription) GETSTRUCT(tup);
1367 subid = form->oid;
1368
1369 /* must be owner */
1370 if (!object_ownercheck(SubscriptionRelationId, subid, GetUserId()))
1372 stmt->subname);
1373
1374 sub = GetSubscription(subid, false);
1375
1376 retain_dead_tuples = sub->retaindeadtuples;
1377 origin = sub->origin;
1378 max_retention = sub->maxretention;
1379 retention_active = sub->retentionactive;
1380
1381 /*
1382 * Don't allow non-superuser modification of a subscription with
1383 * password_required=false.
1384 */
1385 if (!sub->passwordrequired && !superuser())
1386 ereport(ERROR,
1387 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
1388 errmsg("password_required=false is superuser-only"),
1389 errhint("Subscriptions with the password_required option set to false may only be created or modified by the superuser.")));
1390
1391 /* Lock the subscription so nobody else can do anything with it. */
1392 LockSharedObject(SubscriptionRelationId, subid, 0, AccessExclusiveLock);
1393
1394 /* Form a new tuple. */
1395 memset(values, 0, sizeof(values));
1396 memset(nulls, false, sizeof(nulls));
1397 memset(replaces, false, sizeof(replaces));
1398
1399 switch (stmt->kind)
1400 {
1402 {
1403 supported_opts = (SUBOPT_SLOT_NAME |
1412
1413 parse_subscription_options(pstate, stmt->options,
1414 supported_opts, &opts);
1415
1416 if (IsSet(opts.specified_opts, SUBOPT_SLOT_NAME))
1417 {
1418 /*
1419 * The subscription must be disabled to allow slot_name as
1420 * 'none', otherwise, the apply worker will repeatedly try
1421 * to stream the data using that slot_name which neither
1422 * exists on the publisher nor the user will be allowed to
1423 * create it.
1424 */
1425 if (sub->enabled && !opts.slot_name)
1426 ereport(ERROR,
1427 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1428 errmsg("cannot set %s for enabled subscription",
1429 "slot_name = NONE")));
1430
1431 if (opts.slot_name)
1432 values[Anum_pg_subscription_subslotname - 1] =
1434 else
1435 nulls[Anum_pg_subscription_subslotname - 1] = true;
1436 replaces[Anum_pg_subscription_subslotname - 1] = true;
1437 }
1438
1439 if (opts.synchronous_commit)
1440 {
1441 values[Anum_pg_subscription_subsynccommit - 1] =
1442 CStringGetTextDatum(opts.synchronous_commit);
1443 replaces[Anum_pg_subscription_subsynccommit - 1] = true;
1444 }
1445
1446 if (IsSet(opts.specified_opts, SUBOPT_BINARY))
1447 {
1448 values[Anum_pg_subscription_subbinary - 1] =
1449 BoolGetDatum(opts.binary);
1450 replaces[Anum_pg_subscription_subbinary - 1] = true;
1451 }
1452
1453 if (IsSet(opts.specified_opts, SUBOPT_STREAMING))
1454 {
1455 values[Anum_pg_subscription_substream - 1] =
1456 CharGetDatum(opts.streaming);
1457 replaces[Anum_pg_subscription_substream - 1] = true;
1458 }
1459
1460 if (IsSet(opts.specified_opts, SUBOPT_DISABLE_ON_ERR))
1461 {
1462 values[Anum_pg_subscription_subdisableonerr - 1]
1463 = BoolGetDatum(opts.disableonerr);
1464 replaces[Anum_pg_subscription_subdisableonerr - 1]
1465 = true;
1466 }
1467
1468 if (IsSet(opts.specified_opts, SUBOPT_PASSWORD_REQUIRED))
1469 {
1470 /* Non-superuser may not disable password_required. */
1471 if (!opts.passwordrequired && !superuser())
1472 ereport(ERROR,
1473 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
1474 errmsg("password_required=false is superuser-only"),
1475 errhint("Subscriptions with the password_required option set to false may only be created or modified by the superuser.")));
1476
1477 values[Anum_pg_subscription_subpasswordrequired - 1]
1478 = BoolGetDatum(opts.passwordrequired);
1479 replaces[Anum_pg_subscription_subpasswordrequired - 1]
1480 = true;
1481 }
1482
1483 if (IsSet(opts.specified_opts, SUBOPT_RUN_AS_OWNER))
1484 {
1485 values[Anum_pg_subscription_subrunasowner - 1] =
1486 BoolGetDatum(opts.runasowner);
1487 replaces[Anum_pg_subscription_subrunasowner - 1] = true;
1488 }
1489
1490 if (IsSet(opts.specified_opts, SUBOPT_TWOPHASE_COMMIT))
1491 {
1492 /*
1493 * We need to update both the slot and the subscription
1494 * for the two_phase option. We can enable the two_phase
1495 * option for a slot only once the initial data
1496 * synchronization is done. This is to avoid missing some
1497 * data as explained in comments atop worker.c.
1498 */
1499 update_two_phase = !opts.twophase;
1500
1501 CheckAlterSubOption(sub, "two_phase", update_two_phase,
1502 isTopLevel);
1503
1504 /*
1505 * Modifying the two_phase slot option requires a slot
1506 * lookup by slot name, so changing the slot name at the
1507 * same time is not allowed.
1508 */
1509 if (update_two_phase &&
1510 IsSet(opts.specified_opts, SUBOPT_SLOT_NAME))
1511 ereport(ERROR,
1512 (errcode(ERRCODE_SYNTAX_ERROR),
1513 errmsg("\"slot_name\" and \"two_phase\" cannot be altered at the same time")));
1514
1515 /*
1516 * Note that workers may still survive even if the
1517 * subscription has been disabled.
1518 *
1519 * Ensure workers have already been exited to avoid
1520 * getting prepared transactions while we are disabling
1521 * the two_phase option. Otherwise, the changes of an
1522 * already prepared transaction can be replicated again
1523 * along with its corresponding commit, leading to
1524 * duplicate data or errors.
1525 */
1526 if (logicalrep_workers_find(subid, true, true))
1527 ereport(ERROR,
1528 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1529 errmsg("cannot alter \"two_phase\" when logical replication worker is still running"),
1530 errhint("Try again after some time.")));
1531
1532 /*
1533 * two_phase cannot be disabled if there are any
1534 * uncommitted prepared transactions present otherwise it
1535 * can lead to duplicate data or errors as explained in
1536 * the comment above.
1537 */
1538 if (update_two_phase &&
1539 sub->twophasestate == LOGICALREP_TWOPHASE_STATE_ENABLED &&
1540 LookupGXactBySubid(subid))
1541 ereport(ERROR,
1542 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1543 errmsg("cannot disable \"two_phase\" when prepared transactions exist"),
1544 errhint("Resolve these transactions and try again.")));
1545
1546 /* Change system catalog accordingly */
1547 values[Anum_pg_subscription_subtwophasestate - 1] =
1548 CharGetDatum(opts.twophase ?
1549 LOGICALREP_TWOPHASE_STATE_PENDING :
1550 LOGICALREP_TWOPHASE_STATE_DISABLED);
1551 replaces[Anum_pg_subscription_subtwophasestate - 1] = true;
1552 }
1553
1554 if (IsSet(opts.specified_opts, SUBOPT_FAILOVER))
1555 {
1556 /*
1557 * Similar to the two_phase case above, we need to update
1558 * the failover option for both the slot and the
1559 * subscription.
1560 */
1561 update_failover = true;
1562
1563 CheckAlterSubOption(sub, "failover", update_failover,
1564 isTopLevel);
1565
1566 values[Anum_pg_subscription_subfailover - 1] =
1567 BoolGetDatum(opts.failover);
1568 replaces[Anum_pg_subscription_subfailover - 1] = true;
1569 }
1570
1571 if (IsSet(opts.specified_opts, SUBOPT_RETAIN_DEAD_TUPLES))
1572 {
1573 values[Anum_pg_subscription_subretaindeadtuples - 1] =
1574 BoolGetDatum(opts.retaindeadtuples);
1575 replaces[Anum_pg_subscription_subretaindeadtuples - 1] = true;
1576
1577 /*
1578 * Update the retention status only if there's a change in
1579 * the retain_dead_tuples option value.
1580 *
1581 * Automatically marking retention as active when
1582 * retain_dead_tuples is enabled may not always be ideal,
1583 * especially if retention was previously stopped and the
1584 * user toggles retain_dead_tuples without adjusting the
1585 * publisher workload. However, this behavior provides a
1586 * convenient way for users to manually refresh the
1587 * retention status. Since retention will be stopped again
1588 * unless the publisher workload is reduced, this approach
1589 * is acceptable for now.
1590 */
1591 if (opts.retaindeadtuples != sub->retaindeadtuples)
1592 {
1593 values[Anum_pg_subscription_subretentionactive - 1] =
1594 BoolGetDatum(opts.retaindeadtuples);
1595 replaces[Anum_pg_subscription_subretentionactive - 1] = true;
1596
1597 retention_active = opts.retaindeadtuples;
1598 }
1599
1600 CheckAlterSubOption(sub, "retain_dead_tuples", false, isTopLevel);
1601
1602 /*
1603 * Workers may continue running even after the
1604 * subscription has been disabled.
1605 *
1606 * To prevent race conditions (as described in
1607 * CheckAlterSubOption()), ensure that all worker
1608 * processes have already exited before proceeding.
1609 */
1610 if (logicalrep_workers_find(subid, true, true))
1611 ereport(ERROR,
1612 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1613 errmsg("cannot alter retain_dead_tuples when logical replication worker is still running"),
1614 errhint("Try again after some time.")));
1615
1616 /*
1617 * Notify the launcher to manage the replication slot for
1618 * conflict detection. This ensures that replication slot
1619 * is efficiently handled (created, updated, or dropped)
1620 * in response to any configuration changes.
1621 */
1623
1624 check_pub_rdt = opts.retaindeadtuples;
1625 retain_dead_tuples = opts.retaindeadtuples;
1626 }
1627
1628 if (IsSet(opts.specified_opts, SUBOPT_MAX_RETENTION_DURATION))
1629 {
1630 values[Anum_pg_subscription_submaxretention - 1] =
1631 Int32GetDatum(opts.maxretention);
1632 replaces[Anum_pg_subscription_submaxretention - 1] = true;
1633
1634 max_retention = opts.maxretention;
1635 }
1636
1637 /*
1638 * Ensure that system configuration paramters are set
1639 * appropriately to support retain_dead_tuples and
1640 * max_retention_duration.
1641 */
1642 if (IsSet(opts.specified_opts, SUBOPT_RETAIN_DEAD_TUPLES) ||
1643 IsSet(opts.specified_opts, SUBOPT_MAX_RETENTION_DURATION))
1645 retain_dead_tuples,
1646 retention_active,
1647 (max_retention > 0));
1648
1649 if (IsSet(opts.specified_opts, SUBOPT_ORIGIN))
1650 {
1651 values[Anum_pg_subscription_suborigin - 1] =
1652 CStringGetTextDatum(opts.origin);
1653 replaces[Anum_pg_subscription_suborigin - 1] = true;
1654
1655 /*
1656 * Check if changes from different origins may be received
1657 * from the publisher when the origin is changed to ANY
1658 * and retain_dead_tuples is enabled.
1659 */
1660 check_pub_rdt = retain_dead_tuples &&
1661 pg_strcasecmp(opts.origin, LOGICALREP_ORIGIN_ANY) == 0;
1662
1663 origin = opts.origin;
1664 }
1665
1666 update_tuple = true;
1667 break;
1668 }
1669
1671 {
1672 parse_subscription_options(pstate, stmt->options,
1674 Assert(IsSet(opts.specified_opts, SUBOPT_ENABLED));
1675
1676 if (!sub->slotname && opts.enabled)
1677 ereport(ERROR,
1678 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1679 errmsg("cannot enable subscription that does not have a slot name")));
1680
1681 /*
1682 * Check track_commit_timestamp only when enabling the
1683 * subscription in case it was disabled after creation. See
1684 * comments atop CheckSubDeadTupleRetention() for details.
1685 */
1686 CheckSubDeadTupleRetention(opts.enabled, !opts.enabled,
1688 sub->retentionactive, false);
1689
1690 values[Anum_pg_subscription_subenabled - 1] =
1691 BoolGetDatum(opts.enabled);
1692 replaces[Anum_pg_subscription_subenabled - 1] = true;
1693
1694 if (opts.enabled)
1696
1697 update_tuple = true;
1698
1699 /*
1700 * The subscription might be initially created with
1701 * connect=false and retain_dead_tuples=true, meaning the
1702 * remote server's status may not be checked. Ensure this
1703 * check is conducted now.
1704 */
1705 check_pub_rdt = sub->retaindeadtuples && opts.enabled;
1706 break;
1707 }
1708
1710 /* Load the library providing us libpq calls. */
1711 load_file("libpqwalreceiver", false);
1712 /* Check the connection info string. */
1713 walrcv_check_conninfo(stmt->conninfo,
1714 sub->passwordrequired && !sub->ownersuperuser);
1715
1716 values[Anum_pg_subscription_subconninfo - 1] =
1717 CStringGetTextDatum(stmt->conninfo);
1718 replaces[Anum_pg_subscription_subconninfo - 1] = true;
1719 update_tuple = true;
1720
1721 /*
1722 * Since the remote server configuration might have changed,
1723 * perform a check to ensure it permits enabling
1724 * retain_dead_tuples.
1725 */
1726 check_pub_rdt = sub->retaindeadtuples;
1727 break;
1728
1730 {
1731 supported_opts = SUBOPT_COPY_DATA | SUBOPT_REFRESH;
1732 parse_subscription_options(pstate, stmt->options,
1733 supported_opts, &opts);
1734
1735 values[Anum_pg_subscription_subpublications - 1] =
1736 publicationListToArray(stmt->publication);
1737 replaces[Anum_pg_subscription_subpublications - 1] = true;
1738
1739 update_tuple = true;
1740
1741 /* Refresh if user asked us to. */
1742 if (opts.refresh)
1743 {
1744 if (!sub->enabled)
1745 ereport(ERROR,
1746 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1747 errmsg("ALTER SUBSCRIPTION with refresh is not allowed for disabled subscriptions"),
1748 errhint("Use ALTER SUBSCRIPTION ... SET PUBLICATION ... WITH (refresh = false).")));
1749
1750 /*
1751 * See ALTER_SUBSCRIPTION_REFRESH_PUBLICATION for details
1752 * why this is not allowed.
1753 */
1754 if (sub->twophasestate == LOGICALREP_TWOPHASE_STATE_ENABLED && opts.copy_data)
1755 ereport(ERROR,
1756 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1757 errmsg("ALTER SUBSCRIPTION with refresh and copy_data is not allowed when two_phase is enabled"),
1758 errhint("Use ALTER SUBSCRIPTION ... SET PUBLICATION with refresh = false, or with copy_data = false, or use DROP/CREATE SUBSCRIPTION.")));
1759
1760 PreventInTransactionBlock(isTopLevel, "ALTER SUBSCRIPTION with refresh");
1761
1762 /* Make sure refresh sees the new list of publications. */
1763 sub->publications = stmt->publication;
1764
1765 AlterSubscription_refresh(sub, opts.copy_data,
1766 stmt->publication);
1767 }
1768
1769 break;
1770 }
1771
1774 {
1775 List *publist;
1776 bool isadd = stmt->kind == ALTER_SUBSCRIPTION_ADD_PUBLICATION;
1777
1778 supported_opts = SUBOPT_REFRESH | SUBOPT_COPY_DATA;
1779 parse_subscription_options(pstate, stmt->options,
1780 supported_opts, &opts);
1781
1782 publist = merge_publications(sub->publications, stmt->publication, isadd, stmt->subname);
1783 values[Anum_pg_subscription_subpublications - 1] =
1784 publicationListToArray(publist);
1785 replaces[Anum_pg_subscription_subpublications - 1] = true;
1786
1787 update_tuple = true;
1788
1789 /* Refresh if user asked us to. */
1790 if (opts.refresh)
1791 {
1792 /* We only need to validate user specified publications. */
1793 List *validate_publications = (isadd) ? stmt->publication : NULL;
1794
1795 if (!sub->enabled)
1796 ereport(ERROR,
1797 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1798 errmsg("ALTER SUBSCRIPTION with refresh is not allowed for disabled subscriptions"),
1799 /* translator: %s is an SQL ALTER command */
1800 errhint("Use %s instead.",
1801 isadd ?
1802 "ALTER SUBSCRIPTION ... ADD PUBLICATION ... WITH (refresh = false)" :
1803 "ALTER SUBSCRIPTION ... DROP PUBLICATION ... WITH (refresh = false)")));
1804
1805 /*
1806 * See ALTER_SUBSCRIPTION_REFRESH_PUBLICATION for details
1807 * why this is not allowed.
1808 */
1809 if (sub->twophasestate == LOGICALREP_TWOPHASE_STATE_ENABLED && opts.copy_data)
1810 ereport(ERROR,
1811 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1812 errmsg("ALTER SUBSCRIPTION with refresh and copy_data is not allowed when two_phase is enabled"),
1813 /* translator: %s is an SQL ALTER command */
1814 errhint("Use %s with refresh = false, or with copy_data = false, or use DROP/CREATE SUBSCRIPTION.",
1815 isadd ?
1816 "ALTER SUBSCRIPTION ... ADD PUBLICATION" :
1817 "ALTER SUBSCRIPTION ... DROP PUBLICATION")));
1818
1819 PreventInTransactionBlock(isTopLevel, "ALTER SUBSCRIPTION with refresh");
1820
1821 /* Refresh the new list of publications. */
1822 sub->publications = publist;
1823
1824 AlterSubscription_refresh(sub, opts.copy_data,
1825 validate_publications);
1826 }
1827
1828 break;
1829 }
1830
1832 {
1833 if (!sub->enabled)
1834 ereport(ERROR,
1835 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1836 errmsg("%s is not allowed for disabled subscriptions",
1837 "ALTER SUBSCRIPTION ... REFRESH PUBLICATION")));
1838
1839 parse_subscription_options(pstate, stmt->options,
1841
1842 /*
1843 * The subscription option "two_phase" requires that
1844 * replication has passed the initial table synchronization
1845 * phase before the two_phase becomes properly enabled.
1846 *
1847 * But, having reached this two-phase commit "enabled" state
1848 * we must not allow any subsequent table initialization to
1849 * occur. So the ALTER SUBSCRIPTION ... REFRESH PUBLICATION is
1850 * disallowed when the user had requested two_phase = on mode.
1851 *
1852 * The exception to this restriction is when copy_data =
1853 * false, because when copy_data is false the tablesync will
1854 * start already in READY state and will exit directly without
1855 * doing anything.
1856 *
1857 * For more details see comments atop worker.c.
1858 */
1859 if (sub->twophasestate == LOGICALREP_TWOPHASE_STATE_ENABLED && opts.copy_data)
1860 ereport(ERROR,
1861 (errcode(ERRCODE_SYNTAX_ERROR),
1862 errmsg("ALTER SUBSCRIPTION ... REFRESH PUBLICATION with copy_data is not allowed when two_phase is enabled"),
1863 errhint("Use ALTER SUBSCRIPTION ... REFRESH PUBLICATION with copy_data = false, or use DROP/CREATE SUBSCRIPTION.")));
1864
1865 PreventInTransactionBlock(isTopLevel, "ALTER SUBSCRIPTION ... REFRESH PUBLICATION");
1866
1867 AlterSubscription_refresh(sub, opts.copy_data, NULL);
1868
1869 break;
1870 }
1871
1873 {
1874 if (!sub->enabled)
1875 ereport(ERROR,
1876 errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1877 errmsg("%s is not allowed for disabled subscriptions",
1878 "ALTER SUBSCRIPTION ... REFRESH SEQUENCES"));
1879
1881
1882 break;
1883 }
1884
1886 {
1887 parse_subscription_options(pstate, stmt->options, SUBOPT_LSN, &opts);
1888
1889 /* ALTER SUBSCRIPTION ... SKIP supports only LSN option */
1890 Assert(IsSet(opts.specified_opts, SUBOPT_LSN));
1891
1892 /*
1893 * If the user sets subskiplsn, we do a sanity check to make
1894 * sure that the specified LSN is a probable value.
1895 */
1896 if (!XLogRecPtrIsInvalid(opts.lsn))
1897 {
1898 RepOriginId originid;
1899 char originname[NAMEDATALEN];
1900 XLogRecPtr remote_lsn;
1901
1903 originname, sizeof(originname));
1904 originid = replorigin_by_name(originname, false);
1905 remote_lsn = replorigin_get_progress(originid, false);
1906
1907 /* Check the given LSN is at least a future LSN */
1908 if (!XLogRecPtrIsInvalid(remote_lsn) && opts.lsn < remote_lsn)
1909 ereport(ERROR,
1910 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1911 errmsg("skip WAL location (LSN %X/%08X) must be greater than origin LSN %X/%08X",
1912 LSN_FORMAT_ARGS(opts.lsn),
1913 LSN_FORMAT_ARGS(remote_lsn))));
1914 }
1915
1916 values[Anum_pg_subscription_subskiplsn - 1] = LSNGetDatum(opts.lsn);
1917 replaces[Anum_pg_subscription_subskiplsn - 1] = true;
1918
1919 update_tuple = true;
1920 break;
1921 }
1922
1923 default:
1924 elog(ERROR, "unrecognized ALTER SUBSCRIPTION kind %d",
1925 stmt->kind);
1926 }
1927
1928 /* Update the catalog if needed. */
1929 if (update_tuple)
1930 {
1931 tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
1932 replaces);
1933
1934 CatalogTupleUpdate(rel, &tup->t_self, tup);
1935
1936 heap_freetuple(tup);
1937 }
1938
1939 /*
1940 * Try to acquire the connection necessary either for modifying the slot
1941 * or for checking if the remote server permits enabling
1942 * retain_dead_tuples.
1943 *
1944 * This has to be at the end because otherwise if there is an error while
1945 * doing the database operations we won't be able to rollback altered
1946 * slot.
1947 */
1948 if (update_failover || update_two_phase || check_pub_rdt)
1949 {
1950 bool must_use_password;
1951 char *err;
1953
1954 /* Load the library providing us libpq calls. */
1955 load_file("libpqwalreceiver", false);
1956
1957 /*
1958 * Try to connect to the publisher, using the new connection string if
1959 * available.
1960 */
1961 must_use_password = sub->passwordrequired && !sub->ownersuperuser;
1962 wrconn = walrcv_connect(stmt->conninfo ? stmt->conninfo : sub->conninfo,
1963 true, true, must_use_password, sub->name,
1964 &err);
1965 if (!wrconn)
1966 ereport(ERROR,
1967 (errcode(ERRCODE_CONNECTION_FAILURE),
1968 errmsg("subscription \"%s\" could not connect to the publisher: %s",
1969 sub->name, err)));
1970
1971 PG_TRY();
1972 {
1973 if (retain_dead_tuples)
1975
1977 retain_dead_tuples, origin, NULL, 0,
1978 sub->name);
1979
1980 if (update_failover || update_two_phase)
1982 update_failover ? &opts.failover : NULL,
1983 update_two_phase ? &opts.twophase : NULL);
1984 }
1985 PG_FINALLY();
1986 {
1988 }
1989 PG_END_TRY();
1990 }
1991
1993
1994 ObjectAddressSet(myself, SubscriptionRelationId, subid);
1995
1996 InvokeObjectPostAlterHook(SubscriptionRelationId, subid, 0);
1997
1998 /* Wake up related replication workers to handle this change quickly. */
2000
2001 return myself;
2002}
@ ACLCHECK_NOT_OWNER
Definition: acl.h:185
void aclcheck_error(AclResult aclerr, ObjectType objtype, const char *objectname)
Definition: aclchk.c:2652
bool object_ownercheck(Oid classid, Oid objectid, Oid roleid)
Definition: aclchk.c:4088
void LogicalRepWorkersWakeupAtCommit(Oid subid)
Definition: worker.c:6219
void ReplicationOriginNameForLogicalRep(Oid suboid, Oid relid, char *originname, Size szoriginname)
Definition: worker.c:641
static Datum values[MAXATTR]
Definition: bootstrap.c:153
#define CStringGetTextDatum(s)
Definition: builtins.h:97
uint32 bits32
Definition: c.h:551
void load_file(const char *filename, bool restricted)
Definition: dfmgr.c:149
int errhint(const char *fmt,...)
Definition: elog.c:1330
int errcode(int sqlerrcode)
Definition: elog.c:863
int errmsg(const char *fmt,...)
Definition: elog.c:1080
#define PG_TRY(...)
Definition: elog.h:372
#define WARNING
Definition: elog.h:36
#define PG_END_TRY(...)
Definition: elog.h:397
#define ERROR
Definition: elog.h:39
#define elog(elevel,...)
Definition: elog.h:226
#define NOTICE
Definition: elog.h:35
#define PG_FINALLY(...)
Definition: elog.h:389
#define ereport(elevel,...)
Definition: elog.h:150
void err(int eval, const char *fmt,...)
Definition: err.c:43
#define DirectFunctionCall1(func, arg1)
Definition: fmgr.h:682
Oid MyDatabaseId
Definition: globals.c:94
Assert(PointerIsAligned(start, uint64))
HeapTuple heap_modify_tuple(HeapTuple tuple, TupleDesc tupleDesc, const Datum *replValues, const bool *replIsnull, const bool *doReplace)
Definition: heaptuple.c:1210
void heap_freetuple(HeapTuple htup)
Definition: heaptuple.c:1435
#define HeapTupleIsValid(tuple)
Definition: htup.h:78
static void * GETSTRUCT(const HeapTupleData *tuple)
Definition: htup_details.h:728
#define stmt
Definition: indent_codes.h:59
void CatalogTupleUpdate(Relation heapRel, const ItemPointerData *otid, HeapTuple tup)
Definition: indexing.c:313
if(TABLE==NULL||TABLE_index==NULL)
Definition: isn.c:81
List * logicalrep_workers_find(Oid subid, bool only_running, bool acquire_lock)
Definition: launcher.c:291
void ApplyLauncherWakeupAtCommit(void)
Definition: launcher.c:1144
void LockSharedObject(Oid classid, Oid objid, uint16 objsubid, LOCKMODE lockmode)
Definition: lmgr.c:1088
#define AccessExclusiveLock
Definition: lockdefs.h:43
#define RowExclusiveLock
Definition: lockdefs.h:38
Oid GetUserId(void)
Definition: miscinit.c:469
Datum namein(PG_FUNCTION_ARGS)
Definition: name.c:48
#define InvokeObjectPostAlterHook(classId, objectId, subId)
Definition: objectaccess.h:197
#define ObjectAddressSet(addr, class_id, object_id)
Definition: objectaddress.h:40
RepOriginId replorigin_by_name(const char *roname, bool missing_ok)
Definition: origin.c:226
XLogRecPtr replorigin_get_progress(RepOriginId node, bool flush)
Definition: origin.c:1037
@ ALTER_SUBSCRIPTION_REFRESH_PUBLICATION
Definition: parsenodes.h:4364
@ ALTER_SUBSCRIPTION_ENABLED
Definition: parsenodes.h:4366
@ ALTER_SUBSCRIPTION_DROP_PUBLICATION
Definition: parsenodes.h:4363
@ ALTER_SUBSCRIPTION_SET_PUBLICATION
Definition: parsenodes.h:4361
@ ALTER_SUBSCRIPTION_REFRESH_SEQUENCES
Definition: parsenodes.h:4365
@ ALTER_SUBSCRIPTION_SKIP
Definition: parsenodes.h:4367
@ ALTER_SUBSCRIPTION_OPTIONS
Definition: parsenodes.h:4359
@ ALTER_SUBSCRIPTION_CONNECTION
Definition: parsenodes.h:4360
@ ALTER_SUBSCRIPTION_ADD_PUBLICATION
Definition: parsenodes.h:4362
@ OBJECT_SUBSCRIPTION
Definition: parsenodes.h:2363
static AmcheckOptions opts
Definition: pg_amcheck.c:112
#define NAMEDATALEN
static Datum LSNGetDatum(XLogRecPtr X)
Definition: pg_lsn.h:31
Subscription * GetSubscription(Oid subid, bool missing_ok)
FormData_pg_subscription * Form_pg_subscription
int pg_strcasecmp(const char *s1, const char *s2)
Definition: pgstrcasecmp.c:36
static Datum BoolGetDatum(bool X)
Definition: postgres.h:112
static Datum ObjectIdGetDatum(Oid X)
Definition: postgres.h:262
uint64_t Datum
Definition: postgres.h:70
static Datum CStringGetDatum(const char *X)
Definition: postgres.h:360
static Datum Int32GetDatum(int32 X)
Definition: postgres.h:222
static Datum CharGetDatum(char X)
Definition: postgres.h:132
#define InvalidOid
Definition: postgres_ext.h:37
unsigned int Oid
Definition: postgres_ext.h:32
#define RelationGetDescr(relation)
Definition: rel.h:541
ItemPointerData t_self
Definition: htup.h:65
Definition: pg_list.h:54
#define SUBOPT_STREAMING
#define SUBOPT_PASSWORD_REQUIRED
#define SUBOPT_SYNCHRONOUS_COMMIT
#define SUBOPT_ENABLED
static void CheckAlterSubOption(Subscription *sub, const char *option, bool slot_needs_update, bool isTopLevel)
#define SUBOPT_RETAIN_DEAD_TUPLES
#define SUBOPT_ORIGIN
static Datum publicationListToArray(List *publist)
#define SUBOPT_FAILOVER
static void parse_subscription_options(ParseState *pstate, List *stmt_options, bits32 supported_opts, SubOpts *opts)
#define SUBOPT_RUN_AS_OWNER
#define SUBOPT_SLOT_NAME
#define SUBOPT_COPY_DATA
#define SUBOPT_TWOPHASE_COMMIT
static void AlterSubscription_refresh(Subscription *sub, bool copy_data, List *validate_publications)
#define SUBOPT_DISABLE_ON_ERR
void CheckSubDeadTupleRetention(bool check_guc, bool sub_disabled, int elevel_for_sub_disabled, bool retain_dead_tuples, bool retention_active, bool max_retention_set)
static void AlterSubscription_refresh_seq(Subscription *sub)
#define SUBOPT_LSN
#define SUBOPT_MAX_RETENTION_DURATION
static List * merge_publications(List *oldpublist, List *newpublist, bool addpub, const char *subname)
static void check_publications_origin_tables(WalReceiverConn *wrconn, List *publications, bool copydata, bool retain_dead_tuples, char *origin, Oid *subrel_local_oids, int subrel_count, char *subname)
#define SUBOPT_BINARY
#define IsSet(val, bits)
#define SUBOPT_REFRESH
static void check_pub_dead_tuple_retention(WalReceiverConn *wrconn)
bool superuser(void)
Definition: superuser.c:46
#define SearchSysCacheCopy2(cacheId, key1, key2)
Definition: syscache.h:93
void table_close(Relation relation, LOCKMODE lockmode)
Definition: table.c:126
Relation table_open(Oid relationId, LOCKMODE lockmode)
Definition: table.c:40
bool LookupGXactBySubid(Oid subid)
Definition: twophase.c:2797
static WalReceiverConn * wrconn
Definition: walreceiver.c:93
#define walrcv_connect(conninfo, replication, logical, must_use_password, appname, err)
Definition: walreceiver.h:435
#define walrcv_check_conninfo(conninfo, must_use_password)
Definition: walreceiver.h:437
#define walrcv_alter_slot(conn, slotname, failover, two_phase)
Definition: walreceiver.h:461
#define walrcv_disconnect(conn)
Definition: walreceiver.h:467
void PreventInTransactionBlock(bool isTopLevel, const char *stmtType)
Definition: xact.c:3660
#define LSN_FORMAT_ARGS(lsn)
Definition: xlogdefs.h:46
#define XLogRecPtrIsInvalid(r)
Definition: xlogdefs.h:29
uint16 RepOriginId
Definition: xlogdefs.h:68
uint64 XLogRecPtr
Definition: xlogdefs.h:21

References AccessExclusiveLock, aclcheck_error(), ACLCHECK_NOT_OWNER, ALTER_SUBSCRIPTION_ADD_PUBLICATION, ALTER_SUBSCRIPTION_CONNECTION, ALTER_SUBSCRIPTION_DROP_PUBLICATION, ALTER_SUBSCRIPTION_ENABLED, ALTER_SUBSCRIPTION_OPTIONS, ALTER_SUBSCRIPTION_REFRESH_PUBLICATION, ALTER_SUBSCRIPTION_REFRESH_SEQUENCES, ALTER_SUBSCRIPTION_SET_PUBLICATION, ALTER_SUBSCRIPTION_SKIP, AlterSubscription_refresh(), AlterSubscription_refresh_seq(), ApplyLauncherWakeupAtCommit(), Assert(), BoolGetDatum(), CatalogTupleUpdate(), CharGetDatum(), check_pub_dead_tuple_retention(), check_publications_origin_tables(), CheckAlterSubOption(), CheckSubDeadTupleRetention(), Subscription::conninfo, CStringGetDatum(), CStringGetTextDatum, DirectFunctionCall1, elog, Subscription::enabled, ereport, err(), errcode(), errhint(), errmsg(), ERROR, GETSTRUCT(), GetSubscription(), GetUserId(), heap_freetuple(), heap_modify_tuple(), HeapTupleIsValid, if(), Int32GetDatum(), InvalidOid, InvokeObjectPostAlterHook, IsSet, load_file(), LockSharedObject(), logicalrep_workers_find(), LogicalRepWorkersWakeupAtCommit(), LookupGXactBySubid(), LSN_FORMAT_ARGS, LSNGetDatum(), Subscription::maxretention, merge_publications(), MyDatabaseId, Subscription::name, NAMEDATALEN, namein(), NOTICE, object_ownercheck(), OBJECT_SUBSCRIPTION, ObjectAddressSet, ObjectIdGetDatum(), opts, Subscription::origin, Subscription::ownersuperuser, parse_subscription_options(), Subscription::passwordrequired, PG_END_TRY, PG_FINALLY, pg_strcasecmp(), PG_TRY, PreventInTransactionBlock(), publicationListToArray(), Subscription::publications, RelationGetDescr, ReplicationOriginNameForLogicalRep(), replorigin_by_name(), replorigin_get_progress(), Subscription::retaindeadtuples, Subscription::retentionactive, RowExclusiveLock, SearchSysCacheCopy2, Subscription::slotname, stmt, SUBOPT_BINARY, SUBOPT_COPY_DATA, SUBOPT_DISABLE_ON_ERR, SUBOPT_ENABLED, SUBOPT_FAILOVER, SUBOPT_LSN, SUBOPT_MAX_RETENTION_DURATION, SUBOPT_ORIGIN, SUBOPT_PASSWORD_REQUIRED, SUBOPT_REFRESH, SUBOPT_RETAIN_DEAD_TUPLES, SUBOPT_RUN_AS_OWNER, SUBOPT_SLOT_NAME, SUBOPT_STREAMING, SUBOPT_SYNCHRONOUS_COMMIT, SUBOPT_TWOPHASE_COMMIT, superuser(), HeapTupleData::t_self, table_close(), table_open(), Subscription::twophasestate, values, walrcv_alter_slot, walrcv_check_conninfo, walrcv_connect, walrcv_disconnect, WARNING, wrconn, and XLogRecPtrIsInvalid.

Referenced by ProcessUtilitySlow().

◆ AlterSubscription_refresh()

static void AlterSubscription_refresh ( Subscription sub,
bool  copy_data,
List validate_publications 
)
static

Definition at line 917 of file subscriptioncmds.c.

919{
920 char *err;
921 List *pubrels = NIL;
922 Oid *pubrel_local_oids;
923 List *subrel_states;
924 List *sub_remove_rels = NIL;
925 Oid *subrel_local_oids;
926 Oid *subseq_local_oids;
927 int subrel_count;
928 ListCell *lc;
929 int off;
930 int tbl_count = 0;
931 int seq_count = 0;
932 Relation rel = NULL;
933 typedef struct SubRemoveRels
934 {
935 Oid relid;
936 char state;
937 } SubRemoveRels;
938
940 bool must_use_password;
941
942 /* Load the library providing us libpq calls. */
943 load_file("libpqwalreceiver", false);
944
945 /* Try to connect to the publisher. */
946 must_use_password = sub->passwordrequired && !sub->ownersuperuser;
947 wrconn = walrcv_connect(sub->conninfo, true, true, must_use_password,
948 sub->name, &err);
949 if (!wrconn)
951 (errcode(ERRCODE_CONNECTION_FAILURE),
952 errmsg("subscription \"%s\" could not connect to the publisher: %s",
953 sub->name, err)));
954
955 PG_TRY();
956 {
957 if (validate_publications)
958 check_publications(wrconn, validate_publications);
959
960 /* Get the relation list from publisher. */
961 pubrels = fetch_relation_list(wrconn, sub->publications);
962
963 /* Get local relation list. */
964 subrel_states = GetSubscriptionRelations(sub->oid, true, true, false);
965 subrel_count = list_length(subrel_states);
966
967 /*
968 * Build qsorted arrays of local table oids and sequence oids for
969 * faster lookup. This can potentially contain all tables and
970 * sequences in the database so speed of lookup is important.
971 *
972 * We do not yet know the exact count of tables and sequences, so we
973 * allocate separate arrays for table OIDs and sequence OIDs based on
974 * the total number of relations (subrel_count).
975 */
976 subrel_local_oids = palloc(subrel_count * sizeof(Oid));
977 subseq_local_oids = palloc(subrel_count * sizeof(Oid));
978 foreach(lc, subrel_states)
979 {
981
982 if (get_rel_relkind(relstate->relid) == RELKIND_SEQUENCE)
983 subseq_local_oids[seq_count++] = relstate->relid;
984 else
985 subrel_local_oids[tbl_count++] = relstate->relid;
986 }
987
988 qsort(subrel_local_oids, tbl_count, sizeof(Oid), oid_cmp);
990 sub->retaindeadtuples, sub->origin,
991 subrel_local_oids, tbl_count,
992 sub->name);
993
994 qsort(subseq_local_oids, seq_count, sizeof(Oid), oid_cmp);
996 copy_data, sub->origin,
997 subseq_local_oids, seq_count,
998 sub->name);
999
1000 /*
1001 * Walk over the remote relations and try to match them to locally
1002 * known relations. If the relation is not known locally create a new
1003 * state for it.
1004 *
1005 * Also builds array of local oids of remote relations for the next
1006 * step.
1007 */
1008 off = 0;
1009 pubrel_local_oids = palloc(list_length(pubrels) * sizeof(Oid));
1010
1011 foreach_ptr(PublicationRelKind, pubrelinfo, pubrels)
1012 {
1013 RangeVar *rv = pubrelinfo->rv;
1014 Oid relid;
1015 char relkind;
1016
1017 relid = RangeVarGetRelid(rv, AccessShareLock, false);
1018 relkind = get_rel_relkind(relid);
1019
1020 /* Check for supported relkind. */
1021 CheckSubscriptionRelkind(relkind, pubrelinfo->relkind,
1022 rv->schemaname, rv->relname);
1023
1024 pubrel_local_oids[off++] = relid;
1025
1026 if (!bsearch(&relid, subrel_local_oids,
1027 tbl_count, sizeof(Oid), oid_cmp) &&
1028 !bsearch(&relid, subseq_local_oids,
1029 seq_count, sizeof(Oid), oid_cmp))
1030 {
1031 AddSubscriptionRelState(sub->oid, relid,
1032 copy_data ? SUBREL_STATE_INIT : SUBREL_STATE_READY,
1033 InvalidXLogRecPtr, true);
1035 errmsg_internal("%s \"%s.%s\" added to subscription \"%s\"",
1036 relkind == RELKIND_SEQUENCE ? "sequence" : "table",
1037 rv->schemaname, rv->relname, sub->name));
1038 }
1039 }
1040
1041 /*
1042 * Next remove state for tables we should not care about anymore using
1043 * the data we collected above
1044 */
1045 qsort(pubrel_local_oids, list_length(pubrels), sizeof(Oid), oid_cmp);
1046
1047 for (off = 0; off < tbl_count; off++)
1048 {
1049 Oid relid = subrel_local_oids[off];
1050
1051 if (!bsearch(&relid, pubrel_local_oids,
1052 list_length(pubrels), sizeof(Oid), oid_cmp))
1053 {
1054 char state;
1055 XLogRecPtr statelsn;
1056 SubRemoveRels *remove_rel = palloc(sizeof(SubRemoveRels));
1057
1058 /*
1059 * Lock pg_subscription_rel with AccessExclusiveLock to
1060 * prevent any race conditions with the apply worker
1061 * re-launching workers at the same time this code is trying
1062 * to remove those tables.
1063 *
1064 * Even if new worker for this particular rel is restarted it
1065 * won't be able to make any progress as we hold exclusive
1066 * lock on pg_subscription_rel till the transaction end. It
1067 * will simply exit as there is no corresponding rel entry.
1068 *
1069 * This locking also ensures that the state of rels won't
1070 * change till we are done with this refresh operation.
1071 */
1072 if (!rel)
1073 rel = table_open(SubscriptionRelRelationId, AccessExclusiveLock);
1074
1075 /* Last known rel state. */
1076 state = GetSubscriptionRelState(sub->oid, relid, &statelsn);
1077
1078 RemoveSubscriptionRel(sub->oid, relid);
1079
1080 remove_rel->relid = relid;
1081 remove_rel->state = state;
1082
1083 sub_remove_rels = lappend(sub_remove_rels, remove_rel);
1084
1086
1087 /*
1088 * For READY state, we would have already dropped the
1089 * tablesync origin.
1090 */
1091 if (state != SUBREL_STATE_READY)
1092 {
1093 char originname[NAMEDATALEN];
1094
1095 /*
1096 * Drop the tablesync's origin tracking if exists.
1097 *
1098 * It is possible that the origin is not yet created for
1099 * tablesync worker, this can happen for the states before
1100 * SUBREL_STATE_FINISHEDCOPY. The tablesync worker or
1101 * apply worker can also concurrently try to drop the
1102 * origin and by this time the origin might be already
1103 * removed. For these reasons, passing missing_ok = true.
1104 */
1105 ReplicationOriginNameForLogicalRep(sub->oid, relid, originname,
1106 sizeof(originname));
1107 replorigin_drop_by_name(originname, true, false);
1108 }
1109
1111 (errmsg_internal("table \"%s.%s\" removed from subscription \"%s\"",
1113 get_rel_name(relid),
1114 sub->name)));
1115 }
1116 }
1117
1118 /*
1119 * Drop the tablesync slots associated with removed tables. This has
1120 * to be at the end because otherwise if there is an error while doing
1121 * the database operations we won't be able to rollback dropped slots.
1122 */
1123 foreach_ptr(SubRemoveRels, rel, sub_remove_rels)
1124 {
1125 if (rel->state != SUBREL_STATE_READY &&
1126 rel->state != SUBREL_STATE_SYNCDONE)
1127 {
1128 char syncslotname[NAMEDATALEN] = {0};
1129
1130 /*
1131 * For READY/SYNCDONE states we know the tablesync slot has
1132 * already been dropped by the tablesync worker.
1133 *
1134 * For other states, there is no certainty, maybe the slot
1135 * does not exist yet. Also, if we fail after removing some of
1136 * the slots, next time, it will again try to drop already
1137 * dropped slots and fail. For these reasons, we allow
1138 * missing_ok = true for the drop.
1139 */
1140 ReplicationSlotNameForTablesync(sub->oid, rel->relid,
1141 syncslotname, sizeof(syncslotname));
1142 ReplicationSlotDropAtPubNode(wrconn, syncslotname, true);
1143 }
1144 }
1145
1146 /*
1147 * Next remove state for sequences we should not care about anymore
1148 * using the data we collected above
1149 */
1150 for (off = 0; off < seq_count; off++)
1151 {
1152 Oid relid = subseq_local_oids[off];
1153
1154 if (!bsearch(&relid, pubrel_local_oids,
1155 list_length(pubrels), sizeof(Oid), oid_cmp))
1156 {
1157 /*
1158 * This locking ensures that the state of rels won't change
1159 * till we are done with this refresh operation.
1160 */
1161 if (!rel)
1162 rel = table_open(SubscriptionRelRelationId, AccessExclusiveLock);
1163
1164 RemoveSubscriptionRel(sub->oid, relid);
1165
1167 errmsg_internal("sequence \"%s.%s\" removed from subscription \"%s\"",
1169 get_rel_name(relid),
1170 sub->name));
1171 }
1172 }
1173 }
1174 PG_FINALLY();
1175 {
1177 }
1178 PG_END_TRY();
1179
1180 if (rel)
1181 table_close(rel, NoLock);
1182}
int errmsg_internal(const char *fmt,...)
Definition: elog.c:1170
#define DEBUG1
Definition: elog.h:30
void CheckSubscriptionRelkind(char localrelkind, char remoterelkind, const char *nspname, const char *relname)
void logicalrep_worker_stop(LogicalRepWorkerType wtype, Oid subid, Oid relid)
Definition: launcher.c:639
List * lappend(List *list, void *datum)
Definition: list.c:339
#define NoLock
Definition: lockdefs.h:34
#define AccessShareLock
Definition: lockdefs.h:36
char * get_rel_name(Oid relid)
Definition: lsyscache.c:2095
char get_rel_relkind(Oid relid)
Definition: lsyscache.c:2170
Oid get_rel_namespace(Oid relid)
Definition: lsyscache.c:2119
char * get_namespace_name(Oid nspid)
Definition: lsyscache.c:3533
void * palloc(Size size)
Definition: mcxt.c:1365
#define RangeVarGetRelid(relation, lockmode, missing_ok)
Definition: namespace.h:98
int oid_cmp(const void *p1, const void *p2)
Definition: oid.c:258
void replorigin_drop_by_name(const char *name, bool missing_ok, bool nowait)
Definition: origin.c:439
#define lfirst(lc)
Definition: pg_list.h:172
static int list_length(const List *l)
Definition: pg_list.h:152
#define NIL
Definition: pg_list.h:68
#define foreach_ptr(type, var, lst)
Definition: pg_list.h:469
void RemoveSubscriptionRel(Oid subid, Oid relid)
char GetSubscriptionRelState(Oid subid, Oid relid, XLogRecPtr *sublsn)
void AddSubscriptionRelState(Oid subid, Oid relid, char state, XLogRecPtr sublsn, bool retain_lock)
List * GetSubscriptionRelations(Oid subid, bool tables, bool sequences, bool not_ready)
#define qsort(a, b, c, d)
Definition: port.h:479
char * relname
Definition: primnodes.h:83
char * schemaname
Definition: primnodes.h:80
Definition: regguts.h:323
static void check_publications_origin_sequences(WalReceiverConn *wrconn, List *publications, bool copydata, char *origin, Oid *subrel_local_oids, int subrel_count, char *subname)
static void check_publications(WalReceiverConn *wrconn, List *publications)
static List * fetch_relation_list(WalReceiverConn *wrconn, List *publications)
void ReplicationSlotDropAtPubNode(WalReceiverConn *wrconn, char *slotname, bool missing_ok)
void ReplicationSlotNameForTablesync(Oid suboid, Oid relid, char *syncslotname, Size szslot)
Definition: tablesync.c:1223
@ WORKERTYPE_TABLESYNC
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28

References AccessExclusiveLock, AccessShareLock, AddSubscriptionRelState(), check_publications(), check_publications_origin_sequences(), check_publications_origin_tables(), CheckSubscriptionRelkind(), Subscription::conninfo, DEBUG1, ereport, err(), errcode(), errmsg(), errmsg_internal(), ERROR, fetch_relation_list(), foreach_ptr, get_namespace_name(), get_rel_name(), get_rel_namespace(), get_rel_relkind(), GetSubscriptionRelations(), GetSubscriptionRelState(), InvalidXLogRecPtr, lappend(), lfirst, list_length(), load_file(), logicalrep_worker_stop(), Subscription::name, NAMEDATALEN, NIL, NoLock, Subscription::oid, oid_cmp(), Subscription::origin, Subscription::ownersuperuser, palloc(), Subscription::passwordrequired, PG_END_TRY, PG_FINALLY, PG_TRY, Subscription::publications, qsort, RangeVarGetRelid, SubscriptionRelState::relid, RangeVar::relname, RemoveSubscriptionRel(), ReplicationOriginNameForLogicalRep(), ReplicationSlotDropAtPubNode(), ReplicationSlotNameForTablesync(), replorigin_drop_by_name(), Subscription::retaindeadtuples, RangeVar::schemaname, table_close(), table_open(), walrcv_connect, walrcv_disconnect, WORKERTYPE_TABLESYNC, and wrconn.

Referenced by AlterSubscription().

◆ AlterSubscription_refresh_seq()

static void AlterSubscription_refresh_seq ( Subscription sub)
static

Definition at line 1188 of file subscriptioncmds.c.

1189{
1190 char *err = NULL;
1192 bool must_use_password;
1193
1194 /* Load the library providing us libpq calls. */
1195 load_file("libpqwalreceiver", false);
1196
1197 /* Try to connect to the publisher. */
1198 must_use_password = sub->passwordrequired && !sub->ownersuperuser;
1199 wrconn = walrcv_connect(sub->conninfo, true, true, must_use_password,
1200 sub->name, &err);
1201 if (!wrconn)
1202 ereport(ERROR,
1203 errcode(ERRCODE_CONNECTION_FAILURE),
1204 errmsg("subscription \"%s\" could not connect to the publisher: %s",
1205 sub->name, err));
1206
1207 PG_TRY();
1208 {
1209 List *subrel_states;
1210
1212 sub->origin, NULL, 0, sub->name);
1213
1214 /* Get local sequence list. */
1215 subrel_states = GetSubscriptionRelations(sub->oid, false, true, false);
1216 foreach_ptr(SubscriptionRelState, subrel, subrel_states)
1217 {
1218 Oid relid = subrel->relid;
1219
1220 UpdateSubscriptionRelState(sub->oid, relid, SUBREL_STATE_INIT,
1221 InvalidXLogRecPtr, false);
1223 errmsg_internal("sequence \"%s.%s\" of subscription \"%s\" set to INIT state",
1225 get_rel_name(relid),
1226 sub->name));
1227 }
1228 }
1229 PG_FINALLY();
1230 {
1232 }
1233 PG_END_TRY();
1234}
void UpdateSubscriptionRelState(Oid subid, Oid relid, char state, XLogRecPtr sublsn, bool already_locked)

References check_publications_origin_sequences(), Subscription::conninfo, DEBUG1, ereport, err(), errcode(), errmsg(), errmsg_internal(), ERROR, foreach_ptr, get_namespace_name(), get_rel_name(), get_rel_namespace(), GetSubscriptionRelations(), InvalidXLogRecPtr, load_file(), Subscription::name, Subscription::oid, Subscription::origin, Subscription::ownersuperuser, Subscription::passwordrequired, PG_END_TRY, PG_FINALLY, PG_TRY, Subscription::publications, UpdateSubscriptionRelState(), walrcv_connect, walrcv_disconnect, and wrconn.

Referenced by AlterSubscription().

◆ AlterSubscriptionOwner()

ObjectAddress AlterSubscriptionOwner ( const char *  name,
Oid  newOwnerId 
)

Definition at line 2412 of file subscriptioncmds.c.

2413{
2414 Oid subid;
2415 HeapTuple tup;
2416 Relation rel;
2417 ObjectAddress address;
2419
2420 rel = table_open(SubscriptionRelationId, RowExclusiveLock);
2421
2422 tup = SearchSysCacheCopy2(SUBSCRIPTIONNAME, ObjectIdGetDatum(MyDatabaseId),
2424
2425 if (!HeapTupleIsValid(tup))
2426 ereport(ERROR,
2427 (errcode(ERRCODE_UNDEFINED_OBJECT),
2428 errmsg("subscription \"%s\" does not exist", name)));
2429
2430 form = (Form_pg_subscription) GETSTRUCT(tup);
2431 subid = form->oid;
2432
2433 AlterSubscriptionOwner_internal(rel, tup, newOwnerId);
2434
2435 ObjectAddressSet(address, SubscriptionRelationId, subid);
2436
2437 heap_freetuple(tup);
2438
2440
2441 return address;
2442}
static void AlterSubscriptionOwner_internal(Relation rel, HeapTuple tup, Oid newOwnerId)
const char * name

References AlterSubscriptionOwner_internal(), CStringGetDatum(), ereport, errcode(), errmsg(), ERROR, GETSTRUCT(), heap_freetuple(), HeapTupleIsValid, MyDatabaseId, name, ObjectAddressSet, ObjectIdGetDatum(), RowExclusiveLock, SearchSysCacheCopy2, table_close(), and table_open().

Referenced by ExecAlterOwnerStmt().

◆ AlterSubscriptionOwner_internal()

static void AlterSubscriptionOwner_internal ( Relation  rel,
HeapTuple  tup,
Oid  newOwnerId 
)
static

Definition at line 2352 of file subscriptioncmds.c.

2353{
2355 AclResult aclresult;
2356
2357 form = (Form_pg_subscription) GETSTRUCT(tup);
2358
2359 if (form->subowner == newOwnerId)
2360 return;
2361
2362 if (!object_ownercheck(SubscriptionRelationId, form->oid, GetUserId()))
2364 NameStr(form->subname));
2365
2366 /*
2367 * Don't allow non-superuser modification of a subscription with
2368 * password_required=false.
2369 */
2370 if (!form->subpasswordrequired && !superuser())
2371 ereport(ERROR,
2372 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
2373 errmsg("password_required=false is superuser-only"),
2374 errhint("Subscriptions with the password_required option set to false may only be created or modified by the superuser.")));
2375
2376 /* Must be able to become new owner */
2377 check_can_set_role(GetUserId(), newOwnerId);
2378
2379 /*
2380 * current owner must have CREATE on database
2381 *
2382 * This is consistent with how ALTER SCHEMA ... OWNER TO works, but some
2383 * other object types behave differently (e.g. you can't give a table to a
2384 * user who lacks CREATE privileges on a schema).
2385 */
2386 aclresult = object_aclcheck(DatabaseRelationId, MyDatabaseId,
2388 if (aclresult != ACLCHECK_OK)
2391
2392 form->subowner = newOwnerId;
2393 CatalogTupleUpdate(rel, &tup->t_self, tup);
2394
2395 /* Update owner dependency reference */
2396 changeDependencyOnOwner(SubscriptionRelationId,
2397 form->oid,
2398 newOwnerId);
2399
2400 InvokeObjectPostAlterHook(SubscriptionRelationId,
2401 form->oid, 0);
2402
2403 /* Wake up related background processes to handle this change quickly. */
2406}
void check_can_set_role(Oid member, Oid role)
Definition: acl.c:5341
AclResult
Definition: acl.h:182
@ ACLCHECK_OK
Definition: acl.h:183
AclResult object_aclcheck(Oid classid, Oid objectid, Oid roleid, AclMode mode)
Definition: aclchk.c:3834
#define NameStr(name)
Definition: c.h:755
char * get_database_name(Oid dbid)
Definition: lsyscache.c:1259
@ OBJECT_DATABASE
Definition: parsenodes.h:2334
#define ACL_CREATE
Definition: parsenodes.h:85
void changeDependencyOnOwner(Oid classId, Oid objectId, Oid newOwnerId)
Definition: pg_shdepend.c:316

References ACL_CREATE, aclcheck_error(), ACLCHECK_NOT_OWNER, ACLCHECK_OK, ApplyLauncherWakeupAtCommit(), CatalogTupleUpdate(), changeDependencyOnOwner(), check_can_set_role(), ereport, errcode(), errhint(), errmsg(), ERROR, get_database_name(), GETSTRUCT(), GetUserId(), InvokeObjectPostAlterHook, LogicalRepWorkersWakeupAtCommit(), MyDatabaseId, NameStr, object_aclcheck(), OBJECT_DATABASE, object_ownercheck(), OBJECT_SUBSCRIPTION, superuser(), and HeapTupleData::t_self.

Referenced by AlterSubscriptionOwner(), and AlterSubscriptionOwner_oid().

◆ AlterSubscriptionOwner_oid()

void AlterSubscriptionOwner_oid ( Oid  subid,
Oid  newOwnerId 
)

Definition at line 2448 of file subscriptioncmds.c.

2449{
2450 HeapTuple tup;
2451 Relation rel;
2452
2453 rel = table_open(SubscriptionRelationId, RowExclusiveLock);
2454
2455 tup = SearchSysCacheCopy1(SUBSCRIPTIONOID, ObjectIdGetDatum(subid));
2456
2457 if (!HeapTupleIsValid(tup))
2458 ereport(ERROR,
2459 (errcode(ERRCODE_UNDEFINED_OBJECT),
2460 errmsg("subscription with OID %u does not exist", subid)));
2461
2462 AlterSubscriptionOwner_internal(rel, tup, newOwnerId);
2463
2464 heap_freetuple(tup);
2465
2467}
#define SearchSysCacheCopy1(cacheId, key1)
Definition: syscache.h:91

References AlterSubscriptionOwner_internal(), ereport, errcode(), errmsg(), ERROR, heap_freetuple(), HeapTupleIsValid, ObjectIdGetDatum(), RowExclusiveLock, SearchSysCacheCopy1, table_close(), and table_open().

Referenced by shdepReassignOwned_Owner().

◆ check_duplicates_in_publist()

static void check_duplicates_in_publist ( List publist,
Datum datums 
)
static

Definition at line 3045 of file subscriptioncmds.c.

3046{
3047 ListCell *cell;
3048 int j = 0;
3049
3050 foreach(cell, publist)
3051 {
3052 char *name = strVal(lfirst(cell));
3053 ListCell *pcell;
3054
3055 foreach(pcell, publist)
3056 {
3057 char *pname = strVal(lfirst(pcell));
3058
3059 if (pcell == cell)
3060 break;
3061
3062 if (strcmp(name, pname) == 0)
3063 ereport(ERROR,
3065 errmsg("publication name \"%s\" used more than once",
3066 pname)));
3067 }
3068
3069 if (datums)
3070 datums[j++] = CStringGetTextDatum(name);
3071 }
3072}
int j
Definition: isn.c:78
#define ERRCODE_DUPLICATE_OBJECT
Definition: streamutil.c:30
#define strVal(v)
Definition: value.h:82

References CStringGetTextDatum, ereport, errcode(), ERRCODE_DUPLICATE_OBJECT, errmsg(), ERROR, j, lfirst, name, and strVal.

Referenced by merge_publications(), and publicationListToArray().

◆ check_pub_dead_tuple_retention()

static void check_pub_dead_tuple_retention ( WalReceiverConn wrconn)
static

Definition at line 2753 of file subscriptioncmds.c.

2754{
2755 WalRcvExecResult *res;
2756 Oid RecoveryRow[1] = {BOOLOID};
2757 TupleTableSlot *slot;
2758 bool isnull;
2759 bool remote_in_recovery;
2760
2761 if (walrcv_server_version(wrconn) < 19000)
2762 ereport(ERROR,
2763 errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2764 errmsg("cannot enable retain_dead_tuples if the publisher is running a version earlier than PostgreSQL 19"));
2765
2766 res = walrcv_exec(wrconn, "SELECT pg_is_in_recovery()", 1, RecoveryRow);
2767
2768 if (res->status != WALRCV_OK_TUPLES)
2769 ereport(ERROR,
2770 (errcode(ERRCODE_CONNECTION_FAILURE),
2771 errmsg("could not obtain recovery progress from the publisher: %s",
2772 res->err)));
2773
2775 if (!tuplestore_gettupleslot(res->tuplestore, true, false, slot))
2776 elog(ERROR, "failed to fetch tuple for the recovery progress");
2777
2778 remote_in_recovery = DatumGetBool(slot_getattr(slot, 1, &isnull));
2779
2780 if (remote_in_recovery)
2781 ereport(ERROR,
2782 errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
2783 errmsg("cannot enable retain_dead_tuples if the publisher is in recovery."));
2784
2786
2788}
TupleTableSlot * MakeSingleTupleTableSlot(TupleDesc tupdesc, const TupleTableSlotOps *tts_ops)
Definition: execTuples.c:1427
void ExecDropSingleTupleTableSlot(TupleTableSlot *slot)
Definition: execTuples.c:1443
const TupleTableSlotOps TTSOpsMinimalTuple
Definition: execTuples.c:86
static bool DatumGetBool(Datum X)
Definition: postgres.h:100
Tuplestorestate * tuplestore
Definition: walreceiver.h:223
TupleDesc tupledesc
Definition: walreceiver.h:224
WalRcvExecStatus status
Definition: walreceiver.h:220
bool tuplestore_gettupleslot(Tuplestorestate *state, bool forward, bool copy, TupleTableSlot *slot)
Definition: tuplestore.c:1130
static Datum slot_getattr(TupleTableSlot *slot, int attnum, bool *isnull)
Definition: tuptable.h:398
@ WALRCV_OK_TUPLES
Definition: walreceiver.h:207
static void walrcv_clear_result(WalRcvExecResult *walres)
Definition: walreceiver.h:471
#define walrcv_server_version(conn)
Definition: walreceiver.h:447
#define walrcv_exec(conn, exec, nRetTypes, retTypes)
Definition: walreceiver.h:465

References DatumGetBool(), elog, ereport, WalRcvExecResult::err, errcode(), errmsg(), ERROR, ExecDropSingleTupleTableSlot(), MakeSingleTupleTableSlot(), slot_getattr(), WalRcvExecResult::status, TTSOpsMinimalTuple, WalRcvExecResult::tupledesc, WalRcvExecResult::tuplestore, tuplestore_gettupleslot(), walrcv_clear_result(), walrcv_exec, WALRCV_OK_TUPLES, walrcv_server_version, and wrconn.

Referenced by AlterSubscription(), and CreateSubscription().

◆ check_publications()

static void check_publications ( WalReceiverConn wrconn,
List publications 
)
static

Definition at line 491 of file subscriptioncmds.c.

492{
493 WalRcvExecResult *res;
494 StringInfo cmd;
495 TupleTableSlot *slot;
496 List *publicationsCopy = NIL;
497 Oid tableRow[1] = {TEXTOID};
498
499 cmd = makeStringInfo();
500 appendStringInfoString(cmd, "SELECT t.pubname FROM\n"
501 " pg_catalog.pg_publication t WHERE\n"
502 " t.pubname IN (");
503 GetPublicationsStr(publications, cmd, true);
504 appendStringInfoChar(cmd, ')');
505
506 res = walrcv_exec(wrconn, cmd->data, 1, tableRow);
508
509 if (res->status != WALRCV_OK_TUPLES)
511 errmsg("could not receive list of publications from the publisher: %s",
512 res->err));
513
514 publicationsCopy = list_copy(publications);
515
516 /* Process publication(s). */
518 while (tuplestore_gettupleslot(res->tuplestore, true, false, slot))
519 {
520 char *pubname;
521 bool isnull;
522
523 pubname = TextDatumGetCString(slot_getattr(slot, 1, &isnull));
524 Assert(!isnull);
525
526 /* Delete the publication present in publisher from the list. */
527 publicationsCopy = list_delete(publicationsCopy, makeString(pubname));
528 ExecClearTuple(slot);
529 }
530
532
534
535 if (list_length(publicationsCopy))
536 {
537 /* Prepare the list of non-existent publication(s) for error message. */
538 StringInfo pubnames = makeStringInfo();
539
540 GetPublicationsStr(publicationsCopy, pubnames, false);
542 errcode(ERRCODE_UNDEFINED_OBJECT),
543 errmsg_plural("publication %s does not exist on the publisher",
544 "publications %s do not exist on the publisher",
545 list_length(publicationsCopy),
546 pubnames->data));
547 }
548}
#define TextDatumGetCString(d)
Definition: builtins.h:98
int errmsg_plural(const char *fmt_singular, const char *fmt_plural, unsigned long n,...)
Definition: elog.c:1193
List * list_delete(List *list, void *datum)
Definition: list.c:853
List * list_copy(const List *oldlist)
Definition: list.c:1573
void GetPublicationsStr(List *publications, StringInfo dest, bool quote_literal)
void destroyStringInfo(StringInfo str)
Definition: stringinfo.c:409
StringInfo makeStringInfo(void)
Definition: stringinfo.c:72
void appendStringInfoString(StringInfo str, const char *s)
Definition: stringinfo.c:230
void appendStringInfoChar(StringInfo str, char ch)
Definition: stringinfo.c:242
static TupleTableSlot * ExecClearTuple(TupleTableSlot *slot)
Definition: tuptable.h:457
String * makeString(char *str)
Definition: value.c:63

References appendStringInfoChar(), appendStringInfoString(), Assert(), StringInfoData::data, destroyStringInfo(), ereport, WalRcvExecResult::err, errcode(), errmsg(), errmsg_plural(), ERROR, ExecClearTuple(), ExecDropSingleTupleTableSlot(), GetPublicationsStr(), list_copy(), list_delete(), list_length(), MakeSingleTupleTableSlot(), makeString(), makeStringInfo(), NIL, slot_getattr(), WalRcvExecResult::status, TextDatumGetCString, TTSOpsMinimalTuple, WalRcvExecResult::tupledesc, WalRcvExecResult::tuplestore, tuplestore_gettupleslot(), walrcv_clear_result(), walrcv_exec, WALRCV_OK_TUPLES, WARNING, and wrconn.

Referenced by AlterSubscription_refresh(), and CreateSubscription().

◆ check_publications_origin_sequences()

static void check_publications_origin_sequences ( WalReceiverConn wrconn,
List publications,
bool  copydata,
char *  origin,
Oid subrel_local_oids,
int  subrel_count,
char *  subname 
)
static

Definition at line 2639 of file subscriptioncmds.c.

2643{
2644 WalRcvExecResult *res;
2645 StringInfoData cmd;
2646 TupleTableSlot *slot;
2647 Oid tableRow[1] = {TEXTOID};
2648 List *publist = NIL;
2649
2650 /*
2651 * Enable sequence synchronization checks only when origin is 'none' , to
2652 * ensure that sequence data from other origins is not inadvertently
2653 * copied.
2654 */
2655 if (!copydata || pg_strcasecmp(origin, LOGICALREP_ORIGIN_NONE) != 0)
2656 return;
2657
2658 initStringInfo(&cmd);
2660 "SELECT DISTINCT P.pubname AS pubname\n"
2661 "FROM pg_publication P,\n"
2662 " LATERAL pg_get_publication_sequences(P.pubname) GPS\n"
2663 " JOIN pg_subscription_rel PS ON (GPS.relid = PS.srrelid),\n"
2664 " pg_class C JOIN pg_namespace N ON (N.oid = C.relnamespace)\n"
2665 "WHERE C.oid = GPS.relid AND P.pubname IN (");
2666
2667 GetPublicationsStr(publications, &cmd, true);
2668 appendStringInfoString(&cmd, ")\n");
2669
2670 /*
2671 * In case of ALTER SUBSCRIPTION ... REFRESH PUBLICATION,
2672 * subrel_local_oids contains the list of relations that are already
2673 * present on the subscriber. This check should be skipped as these will
2674 * not be re-synced.
2675 */
2676 for (int i = 0; i < subrel_count; i++)
2677 {
2678 Oid relid = subrel_local_oids[i];
2679 char *schemaname = get_namespace_name(get_rel_namespace(relid));
2680 char *seqname = get_rel_name(relid);
2681
2682 appendStringInfo(&cmd,
2683 "AND NOT (N.nspname = '%s' AND C.relname = '%s')\n",
2684 schemaname, seqname);
2685 }
2686
2687 res = walrcv_exec(wrconn, cmd.data, 1, tableRow);
2688 pfree(cmd.data);
2689
2690 if (res->status != WALRCV_OK_TUPLES)
2691 ereport(ERROR,
2692 (errcode(ERRCODE_CONNECTION_FAILURE),
2693 errmsg("could not receive list of replicated sequences from the publisher: %s",
2694 res->err)));
2695
2696 /* Process publications. */
2698 while (tuplestore_gettupleslot(res->tuplestore, true, false, slot))
2699 {
2700 char *pubname;
2701 bool isnull;
2702
2703 pubname = TextDatumGetCString(slot_getattr(slot, 1, &isnull));
2704 Assert(!isnull);
2705
2706 ExecClearTuple(slot);
2707 publist = list_append_unique(publist, makeString(pubname));
2708 }
2709
2710 /*
2711 * Log a warning if the publisher has subscribed to the same sequence from
2712 * some other publisher. We cannot know the origin of sequences data
2713 * during the initial sync.
2714 */
2715 if (publist)
2716 {
2717 StringInfo pubnames = makeStringInfo();
2718 StringInfo err_msg = makeStringInfo();
2719 StringInfo err_hint = makeStringInfo();
2720
2721 /* Prepare the list of publication(s) for warning message. */
2722 GetPublicationsStr(publist, pubnames, false);
2723
2724 appendStringInfo(err_msg, _("subscription \"%s\" requested copy_data with origin = NONE but might copy data that had a different origin"),
2725 subname);
2726 appendStringInfoString(err_hint, _("Verify that initial data copied from the publisher sequences did not come from other origins."));
2727
2729 errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2730 errmsg_internal("%s", err_msg->data),
2731 errdetail_plural("The subscription subscribes to a publication (%s) that contains sequences that are written to by other subscriptions.",
2732 "The subscription subscribes to publications (%s) that contain sequences that are written to by other subscriptions.",
2733 list_length(publist), pubnames->data),
2734 errhint_internal("%s", err_hint->data));
2735 }
2736
2738
2740}
int errhint_internal(const char *fmt,...)
Definition: elog.c:1352
int errdetail_plural(const char *fmt_singular, const char *fmt_plural, unsigned long n,...)
Definition: elog.c:1308
#define _(x)
Definition: elog.c:91
int i
Definition: isn.c:77
List * list_append_unique(List *list, void *datum)
Definition: list.c:1343
void pfree(void *pointer)
Definition: mcxt.c:1594
NameData subname
void appendStringInfo(StringInfo str, const char *fmt,...)
Definition: stringinfo.c:145
void initStringInfo(StringInfo str)
Definition: stringinfo.c:97

References _, appendStringInfo(), appendStringInfoString(), Assert(), StringInfoData::data, ereport, WalRcvExecResult::err, errcode(), errdetail_plural(), errhint_internal(), errmsg(), errmsg_internal(), ERROR, ExecClearTuple(), ExecDropSingleTupleTableSlot(), get_namespace_name(), get_rel_name(), get_rel_namespace(), GetPublicationsStr(), i, initStringInfo(), list_append_unique(), list_length(), MakeSingleTupleTableSlot(), makeString(), makeStringInfo(), NIL, pfree(), pg_strcasecmp(), slot_getattr(), WalRcvExecResult::status, subname, TextDatumGetCString, TTSOpsMinimalTuple, WalRcvExecResult::tupledesc, WalRcvExecResult::tuplestore, tuplestore_gettupleslot(), walrcv_clear_result(), walrcv_exec, WALRCV_OK_TUPLES, WARNING, and wrconn.

Referenced by AlterSubscription_refresh(), AlterSubscription_refresh_seq(), and CreateSubscription().

◆ check_publications_origin_tables()

static void check_publications_origin_tables ( WalReceiverConn wrconn,
List publications,
bool  copydata,
bool  retain_dead_tuples,
char *  origin,
Oid subrel_local_oids,
int  subrel_count,
char *  subname 
)
static

Definition at line 2493 of file subscriptioncmds.c.

2497{
2498 WalRcvExecResult *res;
2499 StringInfoData cmd;
2500 TupleTableSlot *slot;
2501 Oid tableRow[1] = {TEXTOID};
2502 List *publist = NIL;
2503 int i;
2504 bool check_rdt;
2505 bool check_table_sync;
2506 bool origin_none = origin &&
2507 pg_strcasecmp(origin, LOGICALREP_ORIGIN_NONE) == 0;
2508
2509 /*
2510 * Enable retain_dead_tuples checks only when origin is set to 'any',
2511 * since with origin='none' only local changes are replicated to the
2512 * subscriber.
2513 */
2514 check_rdt = retain_dead_tuples && !origin_none;
2515
2516 /*
2517 * Enable table synchronization checks only when origin is 'none', to
2518 * ensure that data from other origins is not inadvertently copied.
2519 */
2520 check_table_sync = copydata && origin_none;
2521
2522 /* retain_dead_tuples and table sync checks occur separately */
2523 Assert(!(check_rdt && check_table_sync));
2524
2525 /* Return if no checks are required */
2526 if (!check_rdt && !check_table_sync)
2527 return;
2528
2529 initStringInfo(&cmd);
2531 "SELECT DISTINCT P.pubname AS pubname\n"
2532 "FROM pg_publication P,\n"
2533 " LATERAL pg_get_publication_tables(P.pubname) GPT\n"
2534 " JOIN pg_subscription_rel PS ON (GPT.relid = PS.srrelid OR"
2535 " GPT.relid IN (SELECT relid FROM pg_partition_ancestors(PS.srrelid) UNION"
2536 " SELECT relid FROM pg_partition_tree(PS.srrelid))),\n"
2537 " pg_class C JOIN pg_namespace N ON (N.oid = C.relnamespace)\n"
2538 "WHERE C.oid = GPT.relid AND P.pubname IN (");
2539 GetPublicationsStr(publications, &cmd, true);
2540 appendStringInfoString(&cmd, ")\n");
2541
2542 /*
2543 * In case of ALTER SUBSCRIPTION ... REFRESH PUBLICATION,
2544 * subrel_local_oids contains the list of relation oids that are already
2545 * present on the subscriber. This check should be skipped for these
2546 * tables if checking for table sync scenario. However, when handling the
2547 * retain_dead_tuples scenario, ensure all tables are checked, as some
2548 * existing tables may now include changes from other origins due to newly
2549 * created subscriptions on the publisher.
2550 */
2551 if (check_table_sync)
2552 {
2553 for (i = 0; i < subrel_count; i++)
2554 {
2555 Oid relid = subrel_local_oids[i];
2556 char *schemaname = get_namespace_name(get_rel_namespace(relid));
2557 char *tablename = get_rel_name(relid);
2558
2559 appendStringInfo(&cmd, "AND NOT (N.nspname = '%s' AND C.relname = '%s')\n",
2560 schemaname, tablename);
2561 }
2562 }
2563
2564 res = walrcv_exec(wrconn, cmd.data, 1, tableRow);
2565 pfree(cmd.data);
2566
2567 if (res->status != WALRCV_OK_TUPLES)
2568 ereport(ERROR,
2569 (errcode(ERRCODE_CONNECTION_FAILURE),
2570 errmsg("could not receive list of replicated tables from the publisher: %s",
2571 res->err)));
2572
2573 /* Process publications. */
2575 while (tuplestore_gettupleslot(res->tuplestore, true, false, slot))
2576 {
2577 char *pubname;
2578 bool isnull;
2579
2580 pubname = TextDatumGetCString(slot_getattr(slot, 1, &isnull));
2581 Assert(!isnull);
2582
2583 ExecClearTuple(slot);
2584 publist = list_append_unique(publist, makeString(pubname));
2585 }
2586
2587 /*
2588 * Log a warning if the publisher has subscribed to the same table from
2589 * some other publisher. We cannot know the origin of data during the
2590 * initial sync. Data origins can be found only from the WAL by looking at
2591 * the origin id.
2592 *
2593 * XXX: For simplicity, we don't check whether the table has any data or
2594 * not. If the table doesn't have any data then we don't need to
2595 * distinguish between data having origin and data not having origin so we
2596 * can avoid logging a warning for table sync scenario.
2597 */
2598 if (publist)
2599 {
2600 StringInfo pubnames = makeStringInfo();
2601 StringInfo err_msg = makeStringInfo();
2602 StringInfo err_hint = makeStringInfo();
2603
2604 /* Prepare the list of publication(s) for warning message. */
2605 GetPublicationsStr(publist, pubnames, false);
2606
2607 if (check_table_sync)
2608 {
2609 appendStringInfo(err_msg, _("subscription \"%s\" requested copy_data with origin = NONE but might copy data that had a different origin"),
2610 subname);
2611 appendStringInfoString(err_hint, _("Verify that initial data copied from the publisher tables did not come from other origins."));
2612 }
2613 else
2614 {
2615 appendStringInfo(err_msg, _("subscription \"%s\" enabled retain_dead_tuples but might not reliably detect conflicts for changes from different origins"),
2616 subname);
2617 appendStringInfoString(err_hint, _("Consider using origin = NONE or disabling retain_dead_tuples."));
2618 }
2619
2621 errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2622 errmsg_internal("%s", err_msg->data),
2623 errdetail_plural("The subscription subscribes to a publication (%s) that contains tables that are written to by other subscriptions.",
2624 "The subscription subscribes to publications (%s) that contain tables that are written to by other subscriptions.",
2625 list_length(publist), pubnames->data),
2626 errhint_internal("%s", err_hint->data));
2627 }
2628
2630
2632}

References _, appendStringInfo(), appendStringInfoString(), Assert(), StringInfoData::data, ereport, WalRcvExecResult::err, errcode(), errdetail_plural(), errhint_internal(), errmsg(), errmsg_internal(), ERROR, ExecClearTuple(), ExecDropSingleTupleTableSlot(), get_namespace_name(), get_rel_name(), get_rel_namespace(), GetPublicationsStr(), i, initStringInfo(), list_append_unique(), list_length(), MakeSingleTupleTableSlot(), makeString(), makeStringInfo(), NIL, pfree(), pg_strcasecmp(), slot_getattr(), WalRcvExecResult::status, subname, TextDatumGetCString, TTSOpsMinimalTuple, WalRcvExecResult::tupledesc, WalRcvExecResult::tuplestore, tuplestore_gettupleslot(), walrcv_clear_result(), walrcv_exec, WALRCV_OK_TUPLES, WARNING, and wrconn.

Referenced by AlterSubscription(), AlterSubscription_refresh(), and CreateSubscription().

◆ CheckAlterSubOption()

static void CheckAlterSubOption ( Subscription sub,
const char *  option,
bool  slot_needs_update,
bool  isTopLevel 
)
static

Definition at line 1241 of file subscriptioncmds.c.

1243{
1244 Assert(strcmp(option, "failover") == 0 ||
1245 strcmp(option, "two_phase") == 0 ||
1246 strcmp(option, "retain_dead_tuples") == 0);
1247
1248 /*
1249 * Altering the retain_dead_tuples option does not update the slot on the
1250 * publisher.
1251 */
1252 Assert(!slot_needs_update || strcmp(option, "retain_dead_tuples") != 0);
1253
1254 /*
1255 * Do not allow changing the option if the subscription is enabled. This
1256 * is because both failover and two_phase options of the slot on the
1257 * publisher cannot be modified if the slot is currently acquired by the
1258 * existing walsender.
1259 *
1260 * Note that two_phase is enabled (aka changed from 'false' to 'true') on
1261 * the publisher by the existing walsender, so we could have allowed that
1262 * even when the subscription is enabled. But we kept this restriction for
1263 * the sake of consistency and simplicity.
1264 *
1265 * Additionally, do not allow changing the retain_dead_tuples option when
1266 * the subscription is enabled to prevent race conditions arising from the
1267 * new option value being acknowledged asynchronously by the launcher and
1268 * apply workers.
1269 *
1270 * Without the restriction, a race condition may arise when a user
1271 * disables and immediately re-enables the retain_dead_tuples option. In
1272 * this case, the launcher might drop the slot upon noticing the disabled
1273 * action, while the apply worker may keep maintaining
1274 * oldest_nonremovable_xid without noticing the option change. During this
1275 * period, a transaction ID wraparound could falsely make this ID appear
1276 * as if it originates from the future w.r.t the transaction ID stored in
1277 * the slot maintained by launcher.
1278 *
1279 * Similarly, if the user enables retain_dead_tuples concurrently with the
1280 * launcher starting the worker, the apply worker may start calculating
1281 * oldest_nonremovable_xid before the launcher notices the enable action.
1282 * Consequently, the launcher may update slot.xmin to a newer value than
1283 * that maintained by the worker. In subsequent cycles, upon integrating
1284 * the worker's oldest_nonremovable_xid, the launcher might detect a
1285 * retreat in the calculated xmin, necessitating additional handling.
1286 *
1287 * XXX To address the above race conditions, we can define
1288 * oldest_nonremovable_xid as FullTransactionID and adds the check to
1289 * disallow retreating the conflict slot's xmin. For now, we kept the
1290 * implementation simple by disallowing change to the retain_dead_tuples,
1291 * but in the future we can change this after some more analysis.
1292 *
1293 * Note that we could restrict only the enabling of retain_dead_tuples to
1294 * avoid the race conditions described above, but we maintain the
1295 * restriction for both enable and disable operations for the sake of
1296 * consistency.
1297 */
1298 if (sub->enabled)
1299 ereport(ERROR,
1300 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1301 errmsg("cannot set option \"%s\" for enabled subscription",
1302 option)));
1303
1304 if (slot_needs_update)
1305 {
1306 StringInfoData cmd;
1307
1308 /*
1309 * A valid slot must be associated with the subscription for us to
1310 * modify any of the slot's properties.
1311 */
1312 if (!sub->slotname)
1313 ereport(ERROR,
1314 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1315 errmsg("cannot set option \"%s\" for a subscription that does not have a slot name",
1316 option)));
1317
1318 /* The changed option of the slot can't be rolled back. */
1319 initStringInfo(&cmd);
1320 appendStringInfo(&cmd, "ALTER SUBSCRIPTION ... SET (%s)", option);
1321
1322 PreventInTransactionBlock(isTopLevel, cmd.data);
1323 pfree(cmd.data);
1324 }
1325}

References appendStringInfo(), Assert(), StringInfoData::data, Subscription::enabled, ereport, errcode(), errmsg(), ERROR, initStringInfo(), pfree(), PreventInTransactionBlock(), and Subscription::slotname.

Referenced by AlterSubscription().

◆ CheckSubDeadTupleRetention()

void CheckSubDeadTupleRetention ( bool  check_guc,
bool  sub_disabled,
int  elevel_for_sub_disabled,
bool  retain_dead_tuples,
bool  retention_active,
bool  max_retention_set 
)

Definition at line 2814 of file subscriptioncmds.c.

2818{
2819 Assert(elevel_for_sub_disabled == NOTICE ||
2820 elevel_for_sub_disabled == WARNING);
2821
2822 if (retain_dead_tuples)
2823 {
2824 if (check_guc && wal_level < WAL_LEVEL_REPLICA)
2825 ereport(ERROR,
2826 errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2827 errmsg("\"wal_level\" is insufficient to create the replication slot required by retain_dead_tuples"),
2828 errhint("\"wal_level\" must be set to \"replica\" or \"logical\" at server start."));
2829
2830 if (check_guc && !track_commit_timestamp)
2832 errcode(ERRCODE_INVALID_PARAMETER_VALUE),
2833 errmsg("commit timestamp and origin data required for detecting conflicts won't be retained"),
2834 errhint("Consider setting \"%s\" to true.",
2835 "track_commit_timestamp"));
2836
2837 if (sub_disabled && retention_active)
2838 ereport(elevel_for_sub_disabled,
2839 errcode(ERRCODE_INVALID_PARAMETER_VALUE),
2840 errmsg("deleted rows to detect conflicts would not be removed until the subscription is enabled"),
2841 (elevel_for_sub_disabled > NOTICE)
2842 ? errhint("Consider setting %s to false.",
2843 "retain_dead_tuples") : 0);
2844 }
2845 else if (max_retention_set)
2846 {
2848 errcode(ERRCODE_INVALID_PARAMETER_VALUE),
2849 errmsg("max_retention_duration is ineffective when retain_dead_tuples is disabled"));
2850 }
2851}
bool track_commit_timestamp
Definition: commit_ts.c:109
int wal_level
Definition: xlog.c:132
@ WAL_LEVEL_REPLICA
Definition: xlog.h:75

References Assert(), ereport, errcode(), errhint(), errmsg(), ERROR, NOTICE, track_commit_timestamp, wal_level, WAL_LEVEL_REPLICA, and WARNING.

Referenced by AlterSubscription(), CreateSubscription(), and DisableSubscriptionAndExit().

◆ CreateSubscription()

ObjectAddress CreateSubscription ( ParseState pstate,
CreateSubscriptionStmt stmt,
bool  isTopLevel 
)

Definition at line 584 of file subscriptioncmds.c.

586{
587 Relation rel;
588 ObjectAddress myself;
589 Oid subid;
590 bool nulls[Natts_pg_subscription];
591 Datum values[Natts_pg_subscription];
592 Oid owner = GetUserId();
593 HeapTuple tup;
594 char *conninfo;
595 char originname[NAMEDATALEN];
596 List *publications;
597 bits32 supported_opts;
598 SubOpts opts = {0};
599 AclResult aclresult;
600
601 /*
602 * Parse and check options.
603 *
604 * Connection and publication should not be specified here.
605 */
606 supported_opts = (SUBOPT_CONNECT | SUBOPT_ENABLED | SUBOPT_CREATE_SLOT |
614 parse_subscription_options(pstate, stmt->options, supported_opts, &opts);
615
616 /*
617 * Since creating a replication slot is not transactional, rolling back
618 * the transaction leaves the created replication slot. So we cannot run
619 * CREATE SUBSCRIPTION inside a transaction block if creating a
620 * replication slot.
621 */
622 if (opts.create_slot)
623 PreventInTransactionBlock(isTopLevel, "CREATE SUBSCRIPTION ... WITH (create_slot = true)");
624
625 /*
626 * We don't want to allow unprivileged users to be able to trigger
627 * attempts to access arbitrary network destinations, so require the user
628 * to have been specifically authorized to create subscriptions.
629 */
630 if (!has_privs_of_role(owner, ROLE_PG_CREATE_SUBSCRIPTION))
632 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
633 errmsg("permission denied to create subscription"),
634 errdetail("Only roles with privileges of the \"%s\" role may create subscriptions.",
635 "pg_create_subscription")));
636
637 /*
638 * Since a subscription is a database object, we also check for CREATE
639 * permission on the database.
640 */
641 aclresult = object_aclcheck(DatabaseRelationId, MyDatabaseId,
642 owner, ACL_CREATE);
643 if (aclresult != ACLCHECK_OK)
646
647 /*
648 * Non-superusers are required to set a password for authentication, and
649 * that password must be used by the target server, but the superuser can
650 * exempt a subscription from this requirement.
651 */
652 if (!opts.passwordrequired && !superuser_arg(owner))
654 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
655 errmsg("password_required=false is superuser-only"),
656 errhint("Subscriptions with the password_required option set to false may only be created or modified by the superuser.")));
657
658 /*
659 * If built with appropriate switch, whine when regression-testing
660 * conventions for subscription names are violated.
661 */
662#ifdef ENFORCE_REGRESSION_TEST_NAME_RESTRICTIONS
663 if (strncmp(stmt->subname, "regress_", 8) != 0)
664 elog(WARNING, "subscriptions created by regression test cases should have names starting with \"regress_\"");
665#endif
666
667 rel = table_open(SubscriptionRelationId, RowExclusiveLock);
668
669 /* Check if name is used */
670 subid = GetSysCacheOid2(SUBSCRIPTIONNAME, Anum_pg_subscription_oid,
672 if (OidIsValid(subid))
673 {
676 errmsg("subscription \"%s\" already exists",
677 stmt->subname)));
678 }
679
680 /*
681 * Ensure that system configuration paramters are set appropriately to
682 * support retain_dead_tuples and max_retention_duration.
683 */
685 opts.retaindeadtuples, opts.retaindeadtuples,
686 (opts.maxretention > 0));
687
688 if (!IsSet(opts.specified_opts, SUBOPT_SLOT_NAME) &&
689 opts.slot_name == NULL)
690 opts.slot_name = stmt->subname;
691
692 /* The default for synchronous_commit of subscriptions is off. */
693 if (opts.synchronous_commit == NULL)
694 opts.synchronous_commit = "off";
695
696 conninfo = stmt->conninfo;
697 publications = stmt->publication;
698
699 /* Load the library providing us libpq calls. */
700 load_file("libpqwalreceiver", false);
701
702 /* Check the connection info string. */
703 walrcv_check_conninfo(conninfo, opts.passwordrequired && !superuser());
704
705 /* Everything ok, form a new tuple. */
706 memset(values, 0, sizeof(values));
707 memset(nulls, false, sizeof(nulls));
708
709 subid = GetNewOidWithIndex(rel, SubscriptionObjectIndexId,
710 Anum_pg_subscription_oid);
711 values[Anum_pg_subscription_oid - 1] = ObjectIdGetDatum(subid);
712 values[Anum_pg_subscription_subdbid - 1] = ObjectIdGetDatum(MyDatabaseId);
713 values[Anum_pg_subscription_subskiplsn - 1] = LSNGetDatum(InvalidXLogRecPtr);
714 values[Anum_pg_subscription_subname - 1] =
716 values[Anum_pg_subscription_subowner - 1] = ObjectIdGetDatum(owner);
717 values[Anum_pg_subscription_subenabled - 1] = BoolGetDatum(opts.enabled);
718 values[Anum_pg_subscription_subbinary - 1] = BoolGetDatum(opts.binary);
719 values[Anum_pg_subscription_substream - 1] = CharGetDatum(opts.streaming);
720 values[Anum_pg_subscription_subtwophasestate - 1] =
721 CharGetDatum(opts.twophase ?
722 LOGICALREP_TWOPHASE_STATE_PENDING :
723 LOGICALREP_TWOPHASE_STATE_DISABLED);
724 values[Anum_pg_subscription_subdisableonerr - 1] = BoolGetDatum(opts.disableonerr);
725 values[Anum_pg_subscription_subpasswordrequired - 1] = BoolGetDatum(opts.passwordrequired);
726 values[Anum_pg_subscription_subrunasowner - 1] = BoolGetDatum(opts.runasowner);
727 values[Anum_pg_subscription_subfailover - 1] = BoolGetDatum(opts.failover);
728 values[Anum_pg_subscription_subretaindeadtuples - 1] =
729 BoolGetDatum(opts.retaindeadtuples);
730 values[Anum_pg_subscription_submaxretention - 1] =
731 Int32GetDatum(opts.maxretention);
732 values[Anum_pg_subscription_subretentionactive - 1] =
733 Int32GetDatum(opts.retaindeadtuples);
734 values[Anum_pg_subscription_subconninfo - 1] =
735 CStringGetTextDatum(conninfo);
736 if (opts.slot_name)
737 values[Anum_pg_subscription_subslotname - 1] =
739 else
740 nulls[Anum_pg_subscription_subslotname - 1] = true;
741 values[Anum_pg_subscription_subsynccommit - 1] =
742 CStringGetTextDatum(opts.synchronous_commit);
743 values[Anum_pg_subscription_subpublications - 1] =
744 publicationListToArray(publications);
745 values[Anum_pg_subscription_suborigin - 1] =
747
748 tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
749
750 /* Insert tuple into catalog. */
751 CatalogTupleInsert(rel, tup);
752 heap_freetuple(tup);
753
754 recordDependencyOnOwner(SubscriptionRelationId, subid, owner);
755
756 /*
757 * A replication origin is currently created for all subscriptions,
758 * including those that only contain sequences or are otherwise empty.
759 *
760 * XXX: While this is technically unnecessary, optimizing it would require
761 * additional logic to skip origin creation during DDL operations and
762 * apply workers initialization, and to handle origin creation dynamically
763 * when tables are added to the subscription. It is not clear whether
764 * preventing creation of origins is worth additional complexity.
765 */
766 ReplicationOriginNameForLogicalRep(subid, InvalidOid, originname, sizeof(originname));
767 replorigin_create(originname);
768
769 /*
770 * Connect to remote side to execute requested commands and fetch table
771 * and sequence info.
772 */
773 if (opts.connect)
774 {
775 char *err;
777 bool must_use_password;
778
779 /* Try to connect to the publisher. */
780 must_use_password = !superuser_arg(owner) && opts.passwordrequired;
781 wrconn = walrcv_connect(conninfo, true, true, must_use_password,
782 stmt->subname, &err);
783 if (!wrconn)
785 (errcode(ERRCODE_CONNECTION_FAILURE),
786 errmsg("subscription \"%s\" could not connect to the publisher: %s",
787 stmt->subname, err)));
788
789 PG_TRY();
790 {
791 bool has_tables = false;
792 List *pubrels;
793 char relation_state;
794
795 check_publications(wrconn, publications);
797 opts.copy_data,
798 opts.retaindeadtuples, opts.origin,
799 NULL, 0, stmt->subname);
801 opts.copy_data, opts.origin,
802 NULL, 0, stmt->subname);
803
804 if (opts.retaindeadtuples)
806
807 /*
808 * Set sync state based on if we were asked to do data copy or
809 * not.
810 */
811 relation_state = opts.copy_data ? SUBREL_STATE_INIT : SUBREL_STATE_READY;
812
813 /*
814 * Build local relation status info. Relations are for both tables
815 * and sequences from the publisher.
816 */
817 pubrels = fetch_relation_list(wrconn, publications);
818
819 foreach_ptr(PublicationRelKind, pubrelinfo, pubrels)
820 {
821 Oid relid;
822 char relkind;
823 RangeVar *rv = pubrelinfo->rv;
824
825 relid = RangeVarGetRelid(rv, AccessShareLock, false);
826 relkind = get_rel_relkind(relid);
827
828 /* Check for supported relkind. */
829 CheckSubscriptionRelkind(relkind, pubrelinfo->relkind,
830 rv->schemaname, rv->relname);
831 has_tables |= (relkind != RELKIND_SEQUENCE);
832 AddSubscriptionRelState(subid, relid, relation_state,
833 InvalidXLogRecPtr, true);
834 }
835
836 /*
837 * If requested, create permanent slot for the subscription. We
838 * won't use the initial snapshot for anything, so no need to
839 * export it.
840 *
841 * XXX: Similar to origins, it is not clear whether preventing the
842 * slot creation for empty and sequence-only subscriptions is
843 * worth additional complexity.
844 */
845 if (opts.create_slot)
846 {
847 bool twophase_enabled = false;
848
849 Assert(opts.slot_name);
850
851 /*
852 * Even if two_phase is set, don't create the slot with
853 * two-phase enabled. Will enable it once all the tables are
854 * synced and ready. This avoids race-conditions like prepared
855 * transactions being skipped due to changes not being applied
856 * due to checks in should_apply_changes_for_rel() when
857 * tablesync for the corresponding tables are in progress. See
858 * comments atop worker.c.
859 *
860 * Note that if tables were specified but copy_data is false
861 * then it is safe to enable two_phase up-front because those
862 * tables are already initially in READY state. When the
863 * subscription has no tables, we leave the twophase state as
864 * PENDING, to allow ALTER SUBSCRIPTION ... REFRESH
865 * PUBLICATION to work.
866 */
867 if (opts.twophase && !opts.copy_data && has_tables)
868 twophase_enabled = true;
869
870 walrcv_create_slot(wrconn, opts.slot_name, false, twophase_enabled,
871 opts.failover, CRS_NOEXPORT_SNAPSHOT, NULL);
872
873 if (twophase_enabled)
874 UpdateTwoPhaseState(subid, LOGICALREP_TWOPHASE_STATE_ENABLED);
875
877 (errmsg("created replication slot \"%s\" on publisher",
878 opts.slot_name)));
879 }
880 }
881 PG_FINALLY();
882 {
884 }
885 PG_END_TRY();
886 }
887 else
889 (errmsg("subscription was created, but is not connected"),
890 errhint("To initiate replication, you must manually create the replication slot, enable the subscription, and alter the subscription to refresh publications.")));
891
893
895
896 /*
897 * Notify the launcher to start the apply worker if the subscription is
898 * enabled, or to create the conflict detection slot if retain_dead_tuples
899 * is enabled.
900 *
901 * Creating the conflict detection slot is essential even when the
902 * subscription is not enabled. This ensures that dead tuples are
903 * retained, which is necessary for accurately identifying the type of
904 * conflict during replication.
905 */
906 if (opts.enabled || opts.retaindeadtuples)
908
909 ObjectAddressSet(myself, SubscriptionRelationId, subid);
910
911 InvokeObjectPostCreateHook(SubscriptionRelationId, subid, 0);
912
913 return myself;
914}
bool has_privs_of_role(Oid member, Oid role)
Definition: acl.c:5284
#define OidIsValid(objectId)
Definition: c.h:778
Oid GetNewOidWithIndex(Relation relation, Oid indexId, AttrNumber oidcolumn)
Definition: catalog.c:448
int errdetail(const char *fmt,...)
Definition: elog.c:1216
HeapTuple heap_form_tuple(TupleDesc tupleDescriptor, const Datum *values, const bool *isnull)
Definition: heaptuple.c:1117
void CatalogTupleInsert(Relation heapRel, HeapTuple tup)
Definition: indexing.c:233
#define InvokeObjectPostCreateHook(classId, objectId, subId)
Definition: objectaccess.h:173
RepOriginId replorigin_create(const char *roname)
Definition: origin.c:257
void recordDependencyOnOwner(Oid classId, Oid objectId, Oid owner)
Definition: pg_shdepend.c:168
void pgstat_create_subscription(Oid subid)
#define SUBOPT_CREATE_SLOT
#define SUBOPT_CONNECT
bool superuser_arg(Oid roleid)
Definition: superuser.c:56
#define GetSysCacheOid2(cacheId, oidcol, key1, key2)
Definition: syscache.h:111
void UpdateTwoPhaseState(Oid suboid, char new_state)
Definition: tablesync.c:1670
#define walrcv_create_slot(conn, slotname, temporary, two_phase, failover, snapshot_action, lsn)
Definition: walreceiver.h:459
@ CRS_NOEXPORT_SNAPSHOT
Definition: walsender.h:23

References AccessShareLock, ACL_CREATE, aclcheck_error(), ACLCHECK_OK, AddSubscriptionRelState(), ApplyLauncherWakeupAtCommit(), Assert(), BoolGetDatum(), CatalogTupleInsert(), CharGetDatum(), check_pub_dead_tuple_retention(), check_publications(), check_publications_origin_sequences(), check_publications_origin_tables(), CheckSubDeadTupleRetention(), CheckSubscriptionRelkind(), CRS_NOEXPORT_SNAPSHOT, CStringGetDatum(), CStringGetTextDatum, DirectFunctionCall1, elog, ereport, err(), errcode(), ERRCODE_DUPLICATE_OBJECT, errdetail(), errhint(), errmsg(), ERROR, fetch_relation_list(), foreach_ptr, get_database_name(), get_rel_relkind(), GetNewOidWithIndex(), GetSysCacheOid2, GetUserId(), has_privs_of_role(), heap_form_tuple(), heap_freetuple(), Int32GetDatum(), InvalidOid, InvalidXLogRecPtr, InvokeObjectPostCreateHook, IsSet, load_file(), LSNGetDatum(), MyDatabaseId, NAMEDATALEN, namein(), NOTICE, object_aclcheck(), OBJECT_DATABASE, ObjectAddressSet, ObjectIdGetDatum(), OidIsValid, opts, parse_subscription_options(), PG_END_TRY, PG_FINALLY, PG_TRY, pgstat_create_subscription(), PreventInTransactionBlock(), publicationListToArray(), RangeVarGetRelid, recordDependencyOnOwner(), RelationGetDescr, RangeVar::relname, ReplicationOriginNameForLogicalRep(), replorigin_create(), RowExclusiveLock, RangeVar::schemaname, stmt, SUBOPT_BINARY, SUBOPT_CONNECT, SUBOPT_COPY_DATA, SUBOPT_CREATE_SLOT, SUBOPT_DISABLE_ON_ERR, SUBOPT_ENABLED, SUBOPT_FAILOVER, SUBOPT_MAX_RETENTION_DURATION, SUBOPT_ORIGIN, SUBOPT_PASSWORD_REQUIRED, SUBOPT_RETAIN_DEAD_TUPLES, SUBOPT_RUN_AS_OWNER, SUBOPT_SLOT_NAME, SUBOPT_STREAMING, SUBOPT_SYNCHRONOUS_COMMIT, SUBOPT_TWOPHASE_COMMIT, superuser(), superuser_arg(), table_close(), table_open(), UpdateTwoPhaseState(), values, walrcv_check_conninfo, walrcv_connect, walrcv_create_slot, walrcv_disconnect, WARNING, and wrconn.

Referenced by ProcessUtilitySlow().

◆ defGetStreamingMode()

char defGetStreamingMode ( DefElem def)

Definition at line 3144 of file subscriptioncmds.c.

3145{
3146 /*
3147 * If no parameter value given, assume "true" is meant.
3148 */
3149 if (!def->arg)
3150 return LOGICALREP_STREAM_ON;
3151
3152 /*
3153 * Allow 0, 1, "false", "true", "off", "on" or "parallel".
3154 */
3155 switch (nodeTag(def->arg))
3156 {
3157 case T_Integer:
3158 switch (intVal(def->arg))
3159 {
3160 case 0:
3161 return LOGICALREP_STREAM_OFF;
3162 case 1:
3163 return LOGICALREP_STREAM_ON;
3164 default:
3165 /* otherwise, error out below */
3166 break;
3167 }
3168 break;
3169 default:
3170 {
3171 char *sval = defGetString(def);
3172
3173 /*
3174 * The set of strings accepted here should match up with the
3175 * grammar's opt_boolean_or_string production.
3176 */
3177 if (pg_strcasecmp(sval, "false") == 0 ||
3178 pg_strcasecmp(sval, "off") == 0)
3179 return LOGICALREP_STREAM_OFF;
3180 if (pg_strcasecmp(sval, "true") == 0 ||
3181 pg_strcasecmp(sval, "on") == 0)
3182 return LOGICALREP_STREAM_ON;
3183 if (pg_strcasecmp(sval, "parallel") == 0)
3184 return LOGICALREP_STREAM_PARALLEL;
3185 }
3186 break;
3187 }
3188
3189 ereport(ERROR,
3190 (errcode(ERRCODE_SYNTAX_ERROR),
3191 errmsg("%s requires a Boolean value or \"parallel\"",
3192 def->defname)));
3193 return LOGICALREP_STREAM_OFF; /* keep compiler quiet */
3194}
char * defGetString(DefElem *def)
Definition: define.c:35
#define nodeTag(nodeptr)
Definition: nodes.h:139
char * defname
Definition: parsenodes.h:843
Node * arg
Definition: parsenodes.h:844
#define intVal(v)
Definition: value.h:79

References DefElem::arg, defGetString(), DefElem::defname, ereport, errcode(), errmsg(), ERROR, intVal, nodeTag, and pg_strcasecmp().

Referenced by parse_output_parameters(), and parse_subscription_options().

◆ DropSubscription()

void DropSubscription ( DropSubscriptionStmt stmt,
bool  isTopLevel 
)

Definition at line 2008 of file subscriptioncmds.c.

2009{
2010 Relation rel;
2011 ObjectAddress myself;
2012 HeapTuple tup;
2013 Oid subid;
2014 Oid subowner;
2015 Datum datum;
2016 bool isnull;
2017 char *subname;
2018 char *conninfo;
2019 char *slotname;
2020 List *subworkers;
2021 ListCell *lc;
2022 char originname[NAMEDATALEN];
2023 char *err = NULL;
2026 List *rstates;
2027 bool must_use_password;
2028
2029 /*
2030 * The launcher may concurrently start a new worker for this subscription.
2031 * During initialization, the worker checks for subscription validity and
2032 * exits if the subscription has already been dropped. See
2033 * InitializeLogRepWorker.
2034 */
2035 rel = table_open(SubscriptionRelationId, RowExclusiveLock);
2036
2037 tup = SearchSysCache2(SUBSCRIPTIONNAME, ObjectIdGetDatum(MyDatabaseId),
2038 CStringGetDatum(stmt->subname));
2039
2040 if (!HeapTupleIsValid(tup))
2041 {
2042 table_close(rel, NoLock);
2043
2044 if (!stmt->missing_ok)
2045 ereport(ERROR,
2046 (errcode(ERRCODE_UNDEFINED_OBJECT),
2047 errmsg("subscription \"%s\" does not exist",
2048 stmt->subname)));
2049 else
2051 (errmsg("subscription \"%s\" does not exist, skipping",
2052 stmt->subname)));
2053
2054 return;
2055 }
2056
2057 form = (Form_pg_subscription) GETSTRUCT(tup);
2058 subid = form->oid;
2059 subowner = form->subowner;
2060 must_use_password = !superuser_arg(subowner) && form->subpasswordrequired;
2061
2062 /* must be owner */
2063 if (!object_ownercheck(SubscriptionRelationId, subid, GetUserId()))
2065 stmt->subname);
2066
2067 /* DROP hook for the subscription being removed */
2068 InvokeObjectDropHook(SubscriptionRelationId, subid, 0);
2069
2070 /*
2071 * Lock the subscription so nobody else can do anything with it (including
2072 * the replication workers).
2073 */
2074 LockSharedObject(SubscriptionRelationId, subid, 0, AccessExclusiveLock);
2075
2076 /* Get subname */
2077 datum = SysCacheGetAttrNotNull(SUBSCRIPTIONOID, tup,
2078 Anum_pg_subscription_subname);
2079 subname = pstrdup(NameStr(*DatumGetName(datum)));
2080
2081 /* Get conninfo */
2082 datum = SysCacheGetAttrNotNull(SUBSCRIPTIONOID, tup,
2083 Anum_pg_subscription_subconninfo);
2084 conninfo = TextDatumGetCString(datum);
2085
2086 /* Get slotname */
2087 datum = SysCacheGetAttr(SUBSCRIPTIONOID, tup,
2088 Anum_pg_subscription_subslotname, &isnull);
2089 if (!isnull)
2090 slotname = pstrdup(NameStr(*DatumGetName(datum)));
2091 else
2092 slotname = NULL;
2093
2094 /*
2095 * Since dropping a replication slot is not transactional, the replication
2096 * slot stays dropped even if the transaction rolls back. So we cannot
2097 * run DROP SUBSCRIPTION inside a transaction block if dropping the
2098 * replication slot. Also, in this case, we report a message for dropping
2099 * the subscription to the cumulative stats system.
2100 *
2101 * XXX The command name should really be something like "DROP SUBSCRIPTION
2102 * of a subscription that is associated with a replication slot", but we
2103 * don't have the proper facilities for that.
2104 */
2105 if (slotname)
2106 PreventInTransactionBlock(isTopLevel, "DROP SUBSCRIPTION");
2107
2108 ObjectAddressSet(myself, SubscriptionRelationId, subid);
2109 EventTriggerSQLDropAddObject(&myself, true, true);
2110
2111 /* Remove the tuple from catalog. */
2112 CatalogTupleDelete(rel, &tup->t_self);
2113
2114 ReleaseSysCache(tup);
2115
2116 /*
2117 * Stop all the subscription workers immediately.
2118 *
2119 * This is necessary if we are dropping the replication slot, so that the
2120 * slot becomes accessible.
2121 *
2122 * It is also necessary if the subscription is disabled and was disabled
2123 * in the same transaction. Then the workers haven't seen the disabling
2124 * yet and will still be running, leading to hangs later when we want to
2125 * drop the replication origin. If the subscription was disabled before
2126 * this transaction, then there shouldn't be any workers left, so this
2127 * won't make a difference.
2128 *
2129 * New workers won't be started because we hold an exclusive lock on the
2130 * subscription till the end of the transaction.
2131 */
2132 subworkers = logicalrep_workers_find(subid, false, true);
2133 foreach(lc, subworkers)
2134 {
2136
2138 }
2139 list_free(subworkers);
2140
2141 /*
2142 * Remove the no-longer-useful entry in the launcher's table of apply
2143 * worker start times.
2144 *
2145 * If this transaction rolls back, the launcher might restart a failed
2146 * apply worker before wal_retrieve_retry_interval milliseconds have
2147 * elapsed, but that's pretty harmless.
2148 */
2150
2151 /*
2152 * Cleanup of tablesync replication origins.
2153 *
2154 * Any READY-state relations would already have dealt with clean-ups.
2155 *
2156 * Note that the state can't change because we have already stopped both
2157 * the apply and tablesync workers and they can't restart because of
2158 * exclusive lock on the subscription.
2159 */
2160 rstates = GetSubscriptionRelations(subid, true, false, true);
2161 foreach(lc, rstates)
2162 {
2164 Oid relid = rstate->relid;
2165
2166 /* Only cleanup resources of tablesync workers */
2167 if (!OidIsValid(relid))
2168 continue;
2169
2170 /*
2171 * Drop the tablesync's origin tracking if exists.
2172 *
2173 * It is possible that the origin is not yet created for tablesync
2174 * worker so passing missing_ok = true. This can happen for the states
2175 * before SUBREL_STATE_FINISHEDCOPY.
2176 */
2177 ReplicationOriginNameForLogicalRep(subid, relid, originname,
2178 sizeof(originname));
2179 replorigin_drop_by_name(originname, true, false);
2180 }
2181
2182 /* Clean up dependencies */
2183 deleteSharedDependencyRecordsFor(SubscriptionRelationId, subid, 0);
2184
2185 /* Remove any associated relation synchronization states. */
2187
2188 /* Remove the origin tracking if exists. */
2189 ReplicationOriginNameForLogicalRep(subid, InvalidOid, originname, sizeof(originname));
2190 replorigin_drop_by_name(originname, true, false);
2191
2192 /*
2193 * Tell the cumulative stats system that the subscription is getting
2194 * dropped.
2195 */
2197
2198 /*
2199 * If there is no slot associated with the subscription, we can finish
2200 * here.
2201 */
2202 if (!slotname && rstates == NIL)
2203 {
2204 table_close(rel, NoLock);
2205 return;
2206 }
2207
2208 /*
2209 * Try to acquire the connection necessary for dropping slots.
2210 *
2211 * Note: If the slotname is NONE/NULL then we allow the command to finish
2212 * and users need to manually cleanup the apply and tablesync worker slots
2213 * later.
2214 *
2215 * This has to be at the end because otherwise if there is an error while
2216 * doing the database operations we won't be able to rollback dropped
2217 * slot.
2218 */
2219 load_file("libpqwalreceiver", false);
2220
2221 wrconn = walrcv_connect(conninfo, true, true, must_use_password,
2222 subname, &err);
2223 if (wrconn == NULL)
2224 {
2225 if (!slotname)
2226 {
2227 /* be tidy */
2228 list_free(rstates);
2229 table_close(rel, NoLock);
2230 return;
2231 }
2232 else
2233 {
2234 ReportSlotConnectionError(rstates, subid, slotname, err);
2235 }
2236 }
2237
2238 PG_TRY();
2239 {
2240 foreach(lc, rstates)
2241 {
2243 Oid relid = rstate->relid;
2244
2245 /* Only cleanup resources of tablesync workers */
2246 if (!OidIsValid(relid))
2247 continue;
2248
2249 /*
2250 * Drop the tablesync slots associated with removed tables.
2251 *
2252 * For SYNCDONE/READY states, the tablesync slot is known to have
2253 * already been dropped by the tablesync worker.
2254 *
2255 * For other states, there is no certainty, maybe the slot does
2256 * not exist yet. Also, if we fail after removing some of the
2257 * slots, next time, it will again try to drop already dropped
2258 * slots and fail. For these reasons, we allow missing_ok = true
2259 * for the drop.
2260 */
2261 if (rstate->state != SUBREL_STATE_SYNCDONE)
2262 {
2263 char syncslotname[NAMEDATALEN] = {0};
2264
2265 ReplicationSlotNameForTablesync(subid, relid, syncslotname,
2266 sizeof(syncslotname));
2267 ReplicationSlotDropAtPubNode(wrconn, syncslotname, true);
2268 }
2269 }
2270
2271 list_free(rstates);
2272
2273 /*
2274 * If there is a slot associated with the subscription, then drop the
2275 * replication slot at the publisher.
2276 */
2277 if (slotname)
2278 ReplicationSlotDropAtPubNode(wrconn, slotname, false);
2279 }
2280 PG_FINALLY();
2281 {
2283 }
2284 PG_END_TRY();
2285
2286 table_close(rel, NoLock);
2287}
void EventTriggerSQLDropAddObject(const ObjectAddress *object, bool original, bool normal)
void CatalogTupleDelete(Relation heapRel, const ItemPointerData *tid)
Definition: indexing.c:365
void ApplyLauncherForgetWorkerStartTime(Oid subid)
Definition: launcher.c:1114
void list_free(List *list)
Definition: list.c:1546
char * pstrdup(const char *in)
Definition: mcxt.c:1759
#define InvokeObjectDropHook(classId, objectId, subId)
Definition: objectaccess.h:182
void deleteSharedDependencyRecordsFor(Oid classId, Oid objectId, int32 objectSubId)
Definition: pg_shdepend.c:1047
void pgstat_drop_subscription(Oid subid)
static Name DatumGetName(Datum X)
Definition: postgres.h:370
LogicalRepWorkerType type
static void ReportSlotConnectionError(List *rstates, Oid subid, char *slotname, char *err)
void ReleaseSysCache(HeapTuple tuple)
Definition: syscache.c:264
Datum SysCacheGetAttr(int cacheId, HeapTuple tup, AttrNumber attributeNumber, bool *isNull)
Definition: syscache.c:595
HeapTuple SearchSysCache2(int cacheId, Datum key1, Datum key2)
Definition: syscache.c:230
Datum SysCacheGetAttrNotNull(int cacheId, HeapTuple tup, AttrNumber attributeNumber)
Definition: syscache.c:625

References AccessExclusiveLock, aclcheck_error(), ACLCHECK_NOT_OWNER, ApplyLauncherForgetWorkerStartTime(), CatalogTupleDelete(), CStringGetDatum(), DatumGetName(), deleteSharedDependencyRecordsFor(), ereport, err(), errcode(), errmsg(), ERROR, EventTriggerSQLDropAddObject(), GETSTRUCT(), GetSubscriptionRelations(), GetUserId(), HeapTupleIsValid, InvalidOid, InvokeObjectDropHook, lfirst, list_free(), load_file(), LockSharedObject(), logicalrep_worker_stop(), logicalrep_workers_find(), MyDatabaseId, NAMEDATALEN, NameStr, NIL, NoLock, NOTICE, object_ownercheck(), OBJECT_SUBSCRIPTION, ObjectAddressSet, ObjectIdGetDatum(), OidIsValid, PG_END_TRY, PG_FINALLY, PG_TRY, pgstat_drop_subscription(), PreventInTransactionBlock(), pstrdup(), ReleaseSysCache(), SubscriptionRelState::relid, LogicalRepWorker::relid, RemoveSubscriptionRel(), ReplicationOriginNameForLogicalRep(), ReplicationSlotDropAtPubNode(), ReplicationSlotNameForTablesync(), replorigin_drop_by_name(), ReportSlotConnectionError(), RowExclusiveLock, SearchSysCache2(), SubscriptionRelState::state, stmt, LogicalRepWorker::subid, subname, superuser_arg(), SysCacheGetAttr(), SysCacheGetAttrNotNull(), HeapTupleData::t_self, table_close(), table_open(), TextDatumGetCString, LogicalRepWorker::type, walrcv_connect, walrcv_disconnect, and wrconn.

Referenced by ProcessUtilitySlow().

◆ fetch_relation_list()

static List * fetch_relation_list ( WalReceiverConn wrconn,
List publications 
)
static

Definition at line 2878 of file subscriptioncmds.c.

2879{
2880 WalRcvExecResult *res;
2881 StringInfoData cmd;
2882 TupleTableSlot *slot;
2883 Oid tableRow[4] = {TEXTOID, TEXTOID, CHAROID, InvalidOid};
2884 List *relationlist = NIL;
2886 bool check_columnlist = (server_version >= 150000);
2887 int column_count = check_columnlist ? 4 : 3;
2888 StringInfo pub_names = makeStringInfo();
2889
2890 initStringInfo(&cmd);
2891
2892 /* Build the pub_names comma-separated string. */
2893 GetPublicationsStr(publications, pub_names, true);
2894
2895 /* Get the list of relations from the publisher */
2896 if (server_version >= 160000)
2897 {
2898 tableRow[3] = INT2VECTOROID;
2899
2900 /*
2901 * From version 16, we allowed passing multiple publications to the
2902 * function pg_get_publication_tables. This helped to filter out the
2903 * partition table whose ancestor is also published in this
2904 * publication array.
2905 *
2906 * Join pg_get_publication_tables with pg_publication to exclude
2907 * non-existing publications.
2908 *
2909 * Note that attrs are always stored in sorted order so we don't need
2910 * to worry if different publications have specified them in a
2911 * different order. See pub_collist_validate.
2912 */
2913 appendStringInfo(&cmd, "SELECT DISTINCT n.nspname, c.relname, c.relkind, gpt.attrs\n"
2914 " FROM pg_class c\n"
2915 " JOIN pg_namespace n ON n.oid = c.relnamespace\n"
2916 " JOIN ( SELECT (pg_get_publication_tables(VARIADIC array_agg(pubname::text))).*\n"
2917 " FROM pg_publication\n"
2918 " WHERE pubname IN ( %s )) AS gpt\n"
2919 " ON gpt.relid = c.oid\n",
2920 pub_names->data);
2921
2922 /* From version 19, inclusion of sequences in the target is supported */
2923 if (server_version >= 190000)
2924 appendStringInfo(&cmd,
2925 "UNION ALL\n"
2926 " SELECT DISTINCT s.schemaname, s.sequencename, " CppAsString2(RELKIND_SEQUENCE) "::\"char\" AS relkind, NULL::int2vector AS attrs\n"
2927 " FROM pg_catalog.pg_publication_sequences s\n"
2928 " WHERE s.pubname IN ( %s )",
2929 pub_names->data);
2930 }
2931 else
2932 {
2933 tableRow[3] = NAMEARRAYOID;
2934 appendStringInfoString(&cmd, "SELECT DISTINCT t.schemaname, t.tablename, " CppAsString2(RELKIND_RELATION) "::\"char\" AS relkind \n");
2935
2936 /* Get column lists for each relation if the publisher supports it */
2937 if (check_columnlist)
2938 appendStringInfoString(&cmd, ", t.attnames\n");
2939
2940 appendStringInfo(&cmd, "FROM pg_catalog.pg_publication_tables t\n"
2941 " WHERE t.pubname IN ( %s )",
2942 pub_names->data);
2943 }
2944
2945 destroyStringInfo(pub_names);
2946
2947 res = walrcv_exec(wrconn, cmd.data, column_count, tableRow);
2948 pfree(cmd.data);
2949
2950 if (res->status != WALRCV_OK_TUPLES)
2951 ereport(ERROR,
2952 (errcode(ERRCODE_CONNECTION_FAILURE),
2953 errmsg("could not receive list of replicated tables from the publisher: %s",
2954 res->err)));
2955
2956 /* Process tables. */
2958 while (tuplestore_gettupleslot(res->tuplestore, true, false, slot))
2959 {
2960 char *nspname;
2961 char *relname;
2962 bool isnull;
2963 char relkind;
2965
2966 nspname = TextDatumGetCString(slot_getattr(slot, 1, &isnull));
2967 Assert(!isnull);
2968 relname = TextDatumGetCString(slot_getattr(slot, 2, &isnull));
2969 Assert(!isnull);
2970 relkind = DatumGetChar(slot_getattr(slot, 3, &isnull));
2971 Assert(!isnull);
2972
2973 relinfo->rv = makeRangeVar(nspname, relname, -1);
2974 relinfo->relkind = relkind;
2975
2976 if (relkind != RELKIND_SEQUENCE &&
2977 check_columnlist &&
2978 list_member_rangevar(relationlist, relinfo->rv))
2979 ereport(ERROR,
2980 errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
2981 errmsg("cannot use different column lists for table \"%s.%s\" in different publications",
2982 nspname, relname));
2983 else
2984 relationlist = lappend(relationlist, relinfo);
2985
2986 ExecClearTuple(slot);
2987 }
2989
2991
2992 return relationlist;
2993}
#define CppAsString2(x)
Definition: c.h:422
#define palloc_object(type)
Definition: fe_memutils.h:74
RangeVar * makeRangeVar(char *schemaname, char *relname, int location)
Definition: makefuncs.c:473
NameData relname
Definition: pg_class.h:38
static int server_version
Definition: pg_dumpall.c:109
static char DatumGetChar(Datum X)
Definition: postgres.h:122
static bool list_member_rangevar(const List *list, RangeVar *rv)

References appendStringInfo(), appendStringInfoString(), Assert(), CppAsString2, StringInfoData::data, DatumGetChar(), destroyStringInfo(), ereport, WalRcvExecResult::err, errcode(), errmsg(), ERROR, ExecClearTuple(), ExecDropSingleTupleTableSlot(), GetPublicationsStr(), initStringInfo(), InvalidOid, lappend(), list_member_rangevar(), makeRangeVar(), MakeSingleTupleTableSlot(), makeStringInfo(), NIL, palloc_object, pfree(), PublicationRelKind::relkind, relname, PublicationRelKind::rv, server_version, slot_getattr(), WalRcvExecResult::status, TextDatumGetCString, TTSOpsMinimalTuple, WalRcvExecResult::tupledesc, WalRcvExecResult::tuplestore, tuplestore_gettupleslot(), walrcv_clear_result(), walrcv_exec, WALRCV_OK_TUPLES, walrcv_server_version, and wrconn.

Referenced by AlterSubscription_refresh(), and CreateSubscription().

◆ list_member_rangevar()

static bool list_member_rangevar ( const List list,
RangeVar rv 
)
static

Definition at line 2857 of file subscriptioncmds.c.

2858{
2860 {
2861 if (equal(relinfo->rv, rv))
2862 return true;
2863 }
2864
2865 return false;
2866}
bool equal(const void *a, const void *b)
Definition: equalfuncs.c:223

References equal(), foreach_ptr, and sort-test::list.

Referenced by fetch_relation_list().

◆ merge_publications()

static List * merge_publications ( List oldpublist,
List newpublist,
bool  addpub,
const char *  subname 
)
static

Definition at line 3085 of file subscriptioncmds.c.

3086{
3087 ListCell *lc;
3088
3089 oldpublist = list_copy(oldpublist);
3090
3091 check_duplicates_in_publist(newpublist, NULL);
3092
3093 foreach(lc, newpublist)
3094 {
3095 char *name = strVal(lfirst(lc));
3096 ListCell *lc2;
3097 bool found = false;
3098
3099 foreach(lc2, oldpublist)
3100 {
3101 char *pubname = strVal(lfirst(lc2));
3102
3103 if (strcmp(name, pubname) == 0)
3104 {
3105 found = true;
3106 if (addpub)
3107 ereport(ERROR,
3109 errmsg("publication \"%s\" is already in subscription \"%s\"",
3110 name, subname)));
3111 else
3112 oldpublist = foreach_delete_current(oldpublist, lc2);
3113
3114 break;
3115 }
3116 }
3117
3118 if (addpub && !found)
3119 oldpublist = lappend(oldpublist, makeString(name));
3120 else if (!addpub && !found)
3121 ereport(ERROR,
3122 (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
3123 errmsg("publication \"%s\" is not in subscription \"%s\"",
3124 name, subname)));
3125 }
3126
3127 /*
3128 * XXX Probably no strong reason for this, but for now it's to make ALTER
3129 * SUBSCRIPTION ... DROP PUBLICATION consistent with SET PUBLICATION.
3130 */
3131 if (!oldpublist)
3132 ereport(ERROR,
3133 (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
3134 errmsg("cannot drop all the publications from a subscription")));
3135
3136 return oldpublist;
3137}
#define foreach_delete_current(lst, var_or_cell)
Definition: pg_list.h:391
static void check_duplicates_in_publist(List *publist, Datum *datums)

References check_duplicates_in_publist(), ereport, errcode(), ERRCODE_DUPLICATE_OBJECT, errmsg(), ERROR, foreach_delete_current, lappend(), lfirst, list_copy(), makeString(), name, strVal, and subname.

Referenced by AlterSubscription().

◆ parse_subscription_options()

static void parse_subscription_options ( ParseState pstate,
List stmt_options,
bits32  supported_opts,
SubOpts opts 
)
static

Definition at line 147 of file subscriptioncmds.c.

149{
150 ListCell *lc;
151
152 /* Start out with cleared opts. */
153 memset(opts, 0, sizeof(SubOpts));
154
155 /* caller must expect some option */
156 Assert(supported_opts != 0);
157
158 /* If connect option is supported, these others also need to be. */
159 Assert(!IsSet(supported_opts, SUBOPT_CONNECT) ||
160 IsSet(supported_opts, SUBOPT_ENABLED | SUBOPT_CREATE_SLOT |
162
163 /* Set default values for the supported options. */
164 if (IsSet(supported_opts, SUBOPT_CONNECT))
165 opts->connect = true;
166 if (IsSet(supported_opts, SUBOPT_ENABLED))
167 opts->enabled = true;
168 if (IsSet(supported_opts, SUBOPT_CREATE_SLOT))
169 opts->create_slot = true;
170 if (IsSet(supported_opts, SUBOPT_COPY_DATA))
171 opts->copy_data = true;
172 if (IsSet(supported_opts, SUBOPT_REFRESH))
173 opts->refresh = true;
174 if (IsSet(supported_opts, SUBOPT_BINARY))
175 opts->binary = false;
176 if (IsSet(supported_opts, SUBOPT_STREAMING))
177 opts->streaming = LOGICALREP_STREAM_PARALLEL;
178 if (IsSet(supported_opts, SUBOPT_TWOPHASE_COMMIT))
179 opts->twophase = false;
180 if (IsSet(supported_opts, SUBOPT_DISABLE_ON_ERR))
181 opts->disableonerr = false;
182 if (IsSet(supported_opts, SUBOPT_PASSWORD_REQUIRED))
183 opts->passwordrequired = true;
184 if (IsSet(supported_opts, SUBOPT_RUN_AS_OWNER))
185 opts->runasowner = false;
186 if (IsSet(supported_opts, SUBOPT_FAILOVER))
187 opts->failover = false;
188 if (IsSet(supported_opts, SUBOPT_RETAIN_DEAD_TUPLES))
189 opts->retaindeadtuples = false;
190 if (IsSet(supported_opts, SUBOPT_MAX_RETENTION_DURATION))
191 opts->maxretention = 0;
192 if (IsSet(supported_opts, SUBOPT_ORIGIN))
193 opts->origin = pstrdup(LOGICALREP_ORIGIN_ANY);
194
195 /* Parse options */
196 foreach(lc, stmt_options)
197 {
198 DefElem *defel = (DefElem *) lfirst(lc);
199
200 if (IsSet(supported_opts, SUBOPT_CONNECT) &&
201 strcmp(defel->defname, "connect") == 0)
202 {
203 if (IsSet(opts->specified_opts, SUBOPT_CONNECT))
204 errorConflictingDefElem(defel, pstate);
205
206 opts->specified_opts |= SUBOPT_CONNECT;
207 opts->connect = defGetBoolean(defel);
208 }
209 else if (IsSet(supported_opts, SUBOPT_ENABLED) &&
210 strcmp(defel->defname, "enabled") == 0)
211 {
212 if (IsSet(opts->specified_opts, SUBOPT_ENABLED))
213 errorConflictingDefElem(defel, pstate);
214
215 opts->specified_opts |= SUBOPT_ENABLED;
216 opts->enabled = defGetBoolean(defel);
217 }
218 else if (IsSet(supported_opts, SUBOPT_CREATE_SLOT) &&
219 strcmp(defel->defname, "create_slot") == 0)
220 {
221 if (IsSet(opts->specified_opts, SUBOPT_CREATE_SLOT))
222 errorConflictingDefElem(defel, pstate);
223
224 opts->specified_opts |= SUBOPT_CREATE_SLOT;
225 opts->create_slot = defGetBoolean(defel);
226 }
227 else if (IsSet(supported_opts, SUBOPT_SLOT_NAME) &&
228 strcmp(defel->defname, "slot_name") == 0)
229 {
230 if (IsSet(opts->specified_opts, SUBOPT_SLOT_NAME))
231 errorConflictingDefElem(defel, pstate);
232
233 opts->specified_opts |= SUBOPT_SLOT_NAME;
234 opts->slot_name = defGetString(defel);
235
236 /* Setting slot_name = NONE is treated as no slot name. */
237 if (strcmp(opts->slot_name, "none") == 0)
238 opts->slot_name = NULL;
239 else
240 ReplicationSlotValidateName(opts->slot_name, false, ERROR);
241 }
242 else if (IsSet(supported_opts, SUBOPT_COPY_DATA) &&
243 strcmp(defel->defname, "copy_data") == 0)
244 {
245 if (IsSet(opts->specified_opts, SUBOPT_COPY_DATA))
246 errorConflictingDefElem(defel, pstate);
247
248 opts->specified_opts |= SUBOPT_COPY_DATA;
249 opts->copy_data = defGetBoolean(defel);
250 }
251 else if (IsSet(supported_opts, SUBOPT_SYNCHRONOUS_COMMIT) &&
252 strcmp(defel->defname, "synchronous_commit") == 0)
253 {
254 if (IsSet(opts->specified_opts, SUBOPT_SYNCHRONOUS_COMMIT))
255 errorConflictingDefElem(defel, pstate);
256
257 opts->specified_opts |= SUBOPT_SYNCHRONOUS_COMMIT;
258 opts->synchronous_commit = defGetString(defel);
259
260 /* Test if the given value is valid for synchronous_commit GUC. */
261 (void) set_config_option("synchronous_commit", opts->synchronous_commit,
263 false, 0, false);
264 }
265 else if (IsSet(supported_opts, SUBOPT_REFRESH) &&
266 strcmp(defel->defname, "refresh") == 0)
267 {
268 if (IsSet(opts->specified_opts, SUBOPT_REFRESH))
269 errorConflictingDefElem(defel, pstate);
270
271 opts->specified_opts |= SUBOPT_REFRESH;
272 opts->refresh = defGetBoolean(defel);
273 }
274 else if (IsSet(supported_opts, SUBOPT_BINARY) &&
275 strcmp(defel->defname, "binary") == 0)
276 {
277 if (IsSet(opts->specified_opts, SUBOPT_BINARY))
278 errorConflictingDefElem(defel, pstate);
279
280 opts->specified_opts |= SUBOPT_BINARY;
281 opts->binary = defGetBoolean(defel);
282 }
283 else if (IsSet(supported_opts, SUBOPT_STREAMING) &&
284 strcmp(defel->defname, "streaming") == 0)
285 {
286 if (IsSet(opts->specified_opts, SUBOPT_STREAMING))
287 errorConflictingDefElem(defel, pstate);
288
289 opts->specified_opts |= SUBOPT_STREAMING;
290 opts->streaming = defGetStreamingMode(defel);
291 }
292 else if (IsSet(supported_opts, SUBOPT_TWOPHASE_COMMIT) &&
293 strcmp(defel->defname, "two_phase") == 0)
294 {
295 if (IsSet(opts->specified_opts, SUBOPT_TWOPHASE_COMMIT))
296 errorConflictingDefElem(defel, pstate);
297
298 opts->specified_opts |= SUBOPT_TWOPHASE_COMMIT;
299 opts->twophase = defGetBoolean(defel);
300 }
301 else if (IsSet(supported_opts, SUBOPT_DISABLE_ON_ERR) &&
302 strcmp(defel->defname, "disable_on_error") == 0)
303 {
304 if (IsSet(opts->specified_opts, SUBOPT_DISABLE_ON_ERR))
305 errorConflictingDefElem(defel, pstate);
306
307 opts->specified_opts |= SUBOPT_DISABLE_ON_ERR;
308 opts->disableonerr = defGetBoolean(defel);
309 }
310 else if (IsSet(supported_opts, SUBOPT_PASSWORD_REQUIRED) &&
311 strcmp(defel->defname, "password_required") == 0)
312 {
313 if (IsSet(opts->specified_opts, SUBOPT_PASSWORD_REQUIRED))
314 errorConflictingDefElem(defel, pstate);
315
316 opts->specified_opts |= SUBOPT_PASSWORD_REQUIRED;
317 opts->passwordrequired = defGetBoolean(defel);
318 }
319 else if (IsSet(supported_opts, SUBOPT_RUN_AS_OWNER) &&
320 strcmp(defel->defname, "run_as_owner") == 0)
321 {
322 if (IsSet(opts->specified_opts, SUBOPT_RUN_AS_OWNER))
323 errorConflictingDefElem(defel, pstate);
324
325 opts->specified_opts |= SUBOPT_RUN_AS_OWNER;
326 opts->runasowner = defGetBoolean(defel);
327 }
328 else if (IsSet(supported_opts, SUBOPT_FAILOVER) &&
329 strcmp(defel->defname, "failover") == 0)
330 {
331 if (IsSet(opts->specified_opts, SUBOPT_FAILOVER))
332 errorConflictingDefElem(defel, pstate);
333
334 opts->specified_opts |= SUBOPT_FAILOVER;
335 opts->failover = defGetBoolean(defel);
336 }
337 else if (IsSet(supported_opts, SUBOPT_RETAIN_DEAD_TUPLES) &&
338 strcmp(defel->defname, "retain_dead_tuples") == 0)
339 {
340 if (IsSet(opts->specified_opts, SUBOPT_RETAIN_DEAD_TUPLES))
341 errorConflictingDefElem(defel, pstate);
342
343 opts->specified_opts |= SUBOPT_RETAIN_DEAD_TUPLES;
344 opts->retaindeadtuples = defGetBoolean(defel);
345 }
346 else if (IsSet(supported_opts, SUBOPT_MAX_RETENTION_DURATION) &&
347 strcmp(defel->defname, "max_retention_duration") == 0)
348 {
349 if (IsSet(opts->specified_opts, SUBOPT_MAX_RETENTION_DURATION))
350 errorConflictingDefElem(defel, pstate);
351
352 opts->specified_opts |= SUBOPT_MAX_RETENTION_DURATION;
353 opts->maxretention = defGetInt32(defel);
354 }
355 else if (IsSet(supported_opts, SUBOPT_ORIGIN) &&
356 strcmp(defel->defname, "origin") == 0)
357 {
358 if (IsSet(opts->specified_opts, SUBOPT_ORIGIN))
359 errorConflictingDefElem(defel, pstate);
360
361 opts->specified_opts |= SUBOPT_ORIGIN;
362 pfree(opts->origin);
363
364 /*
365 * Even though the "origin" parameter allows only "none" and "any"
366 * values, it is implemented as a string type so that the
367 * parameter can be extended in future versions to support
368 * filtering using origin names specified by the user.
369 */
370 opts->origin = defGetString(defel);
371
372 if ((pg_strcasecmp(opts->origin, LOGICALREP_ORIGIN_NONE) != 0) &&
373 (pg_strcasecmp(opts->origin, LOGICALREP_ORIGIN_ANY) != 0))
375 errcode(ERRCODE_INVALID_PARAMETER_VALUE),
376 errmsg("unrecognized origin value: \"%s\"", opts->origin));
377 }
378 else if (IsSet(supported_opts, SUBOPT_LSN) &&
379 strcmp(defel->defname, "lsn") == 0)
380 {
381 char *lsn_str = defGetString(defel);
382 XLogRecPtr lsn;
383
384 if (IsSet(opts->specified_opts, SUBOPT_LSN))
385 errorConflictingDefElem(defel, pstate);
386
387 /* Setting lsn = NONE is treated as resetting LSN */
388 if (strcmp(lsn_str, "none") == 0)
389 lsn = InvalidXLogRecPtr;
390 else
391 {
392 /* Parse the argument as LSN */
394 CStringGetDatum(lsn_str)));
395
396 if (XLogRecPtrIsInvalid(lsn))
398 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
399 errmsg("invalid WAL location (LSN): %s", lsn_str)));
400 }
401
402 opts->specified_opts |= SUBOPT_LSN;
403 opts->lsn = lsn;
404 }
405 else
407 (errcode(ERRCODE_SYNTAX_ERROR),
408 errmsg("unrecognized subscription parameter: \"%s\"", defel->defname)));
409 }
410
411 /*
412 * We've been explicitly asked to not connect, that requires some
413 * additional processing.
414 */
415 if (!opts->connect && IsSet(supported_opts, SUBOPT_CONNECT))
416 {
417 /* Check for incompatible options from the user. */
418 if (opts->enabled &&
419 IsSet(opts->specified_opts, SUBOPT_ENABLED))
421 (errcode(ERRCODE_SYNTAX_ERROR),
422 /*- translator: both %s are strings of the form "option = value" */
423 errmsg("%s and %s are mutually exclusive options",
424 "connect = false", "enabled = true")));
425
426 if (opts->create_slot &&
427 IsSet(opts->specified_opts, SUBOPT_CREATE_SLOT))
429 (errcode(ERRCODE_SYNTAX_ERROR),
430 errmsg("%s and %s are mutually exclusive options",
431 "connect = false", "create_slot = true")));
432
433 if (opts->copy_data &&
434 IsSet(opts->specified_opts, SUBOPT_COPY_DATA))
436 (errcode(ERRCODE_SYNTAX_ERROR),
437 errmsg("%s and %s are mutually exclusive options",
438 "connect = false", "copy_data = true")));
439
440 /* Change the defaults of other options. */
441 opts->enabled = false;
442 opts->create_slot = false;
443 opts->copy_data = false;
444 }
445
446 /*
447 * Do additional checking for disallowed combination when slot_name = NONE
448 * was used.
449 */
450 if (!opts->slot_name &&
451 IsSet(opts->specified_opts, SUBOPT_SLOT_NAME))
452 {
453 if (opts->enabled)
454 {
455 if (IsSet(opts->specified_opts, SUBOPT_ENABLED))
457 (errcode(ERRCODE_SYNTAX_ERROR),
458 /*- translator: both %s are strings of the form "option = value" */
459 errmsg("%s and %s are mutually exclusive options",
460 "slot_name = NONE", "enabled = true")));
461 else
463 (errcode(ERRCODE_SYNTAX_ERROR),
464 /*- translator: both %s are strings of the form "option = value" */
465 errmsg("subscription with %s must also set %s",
466 "slot_name = NONE", "enabled = false")));
467 }
468
469 if (opts->create_slot)
470 {
471 if (IsSet(opts->specified_opts, SUBOPT_CREATE_SLOT))
473 (errcode(ERRCODE_SYNTAX_ERROR),
474 /*- translator: both %s are strings of the form "option = value" */
475 errmsg("%s and %s are mutually exclusive options",
476 "slot_name = NONE", "create_slot = true")));
477 else
479 (errcode(ERRCODE_SYNTAX_ERROR),
480 /*- translator: both %s are strings of the form "option = value" */
481 errmsg("subscription with %s must also set %s",
482 "slot_name = NONE", "create_slot = false")));
483 }
484 }
485}
int32 defGetInt32(DefElem *def)
Definition: define.c:149
bool defGetBoolean(DefElem *def)
Definition: define.c:94
void errorConflictingDefElem(DefElem *defel, ParseState *pstate)
Definition: define.c:371
int set_config_option(const char *name, const char *value, GucContext context, GucSource source, GucAction action, bool changeVal, int elevel, bool is_reload)
Definition: guc.c:3205
@ GUC_ACTION_SET
Definition: guc.h:203
@ PGC_S_TEST
Definition: guc.h:125
@ PGC_BACKEND
Definition: guc.h:77
Datum pg_lsn_in(PG_FUNCTION_ARGS)
Definition: pg_lsn.c:64
static XLogRecPtr DatumGetLSN(Datum X)
Definition: pg_lsn.h:25
bool ReplicationSlotValidateName(const char *name, bool allow_reserved_name, int elevel)
Definition: slot.c:266
char defGetStreamingMode(DefElem *def)

References Assert(), CStringGetDatum(), DatumGetLSN(), defGetBoolean(), defGetInt32(), defGetStreamingMode(), defGetString(), DefElem::defname, DirectFunctionCall1, ereport, errcode(), errmsg(), ERROR, errorConflictingDefElem(), GUC_ACTION_SET, InvalidXLogRecPtr, IsSet, lfirst, opts, pfree(), pg_lsn_in(), pg_strcasecmp(), PGC_BACKEND, PGC_S_TEST, pstrdup(), ReplicationSlotValidateName(), set_config_option(), SUBOPT_BINARY, SUBOPT_CONNECT, SUBOPT_COPY_DATA, SUBOPT_CREATE_SLOT, SUBOPT_DISABLE_ON_ERR, SUBOPT_ENABLED, SUBOPT_FAILOVER, SUBOPT_LSN, SUBOPT_MAX_RETENTION_DURATION, SUBOPT_ORIGIN, SUBOPT_PASSWORD_REQUIRED, SUBOPT_REFRESH, SUBOPT_RETAIN_DEAD_TUPLES, SUBOPT_RUN_AS_OWNER, SUBOPT_SLOT_NAME, SUBOPT_STREAMING, SUBOPT_SYNCHRONOUS_COMMIT, SUBOPT_TWOPHASE_COMMIT, and XLogRecPtrIsInvalid.

Referenced by AlterSubscription(), and CreateSubscription().

◆ publicationListToArray()

static Datum publicationListToArray ( List publist)
static

Definition at line 554 of file subscriptioncmds.c.

555{
556 ArrayType *arr;
557 Datum *datums;
558 MemoryContext memcxt;
559 MemoryContext oldcxt;
560
561 /* Create memory context for temporary allocations. */
563 "publicationListToArray to array",
565 oldcxt = MemoryContextSwitchTo(memcxt);
566
567 datums = (Datum *) palloc(sizeof(Datum) * list_length(publist));
568
569 check_duplicates_in_publist(publist, datums);
570
571 MemoryContextSwitchTo(oldcxt);
572
573 arr = construct_array_builtin(datums, list_length(publist), TEXTOID);
574
575 MemoryContextDelete(memcxt);
576
577 return PointerGetDatum(arr);
578}
ArrayType * construct_array_builtin(Datum *elems, int nelems, Oid elmtype)
Definition: arrayfuncs.c:3382
MemoryContext CurrentMemoryContext
Definition: mcxt.c:160
void MemoryContextDelete(MemoryContext context)
Definition: mcxt.c:469
#define AllocSetContextCreate
Definition: memutils.h:129
#define ALLOCSET_DEFAULT_SIZES
Definition: memutils.h:160
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:124
static Datum PointerGetDatum(const void *X)
Definition: postgres.h:332

References ALLOCSET_DEFAULT_SIZES, AllocSetContextCreate, check_duplicates_in_publist(), construct_array_builtin(), CurrentMemoryContext, list_length(), MemoryContextDelete(), MemoryContextSwitchTo(), palloc(), and PointerGetDatum().

Referenced by AlterSubscription(), and CreateSubscription().

◆ ReplicationSlotDropAtPubNode()

void ReplicationSlotDropAtPubNode ( WalReceiverConn wrconn,
char *  slotname,
bool  missing_ok 
)

Definition at line 2297 of file subscriptioncmds.c.

2298{
2299 StringInfoData cmd;
2300
2301 Assert(wrconn);
2302
2303 load_file("libpqwalreceiver", false);
2304
2305 initStringInfo(&cmd);
2306 appendStringInfo(&cmd, "DROP_REPLICATION_SLOT %s WAIT", quote_identifier(slotname));
2307
2308 PG_TRY();
2309 {
2310 WalRcvExecResult *res;
2311
2312 res = walrcv_exec(wrconn, cmd.data, 0, NULL);
2313
2314 if (res->status == WALRCV_OK_COMMAND)
2315 {
2316 /* NOTICE. Success. */
2318 (errmsg("dropped replication slot \"%s\" on publisher",
2319 slotname)));
2320 }
2321 else if (res->status == WALRCV_ERROR &&
2322 missing_ok &&
2323 res->sqlstate == ERRCODE_UNDEFINED_OBJECT)
2324 {
2325 /* LOG. Error, but missing_ok = true. */
2326 ereport(LOG,
2327 (errmsg("could not drop replication slot \"%s\" on publisher: %s",
2328 slotname, res->err)));
2329 }
2330 else
2331 {
2332 /* ERROR. */
2333 ereport(ERROR,
2334 (errcode(ERRCODE_CONNECTION_FAILURE),
2335 errmsg("could not drop replication slot \"%s\" on publisher: %s",
2336 slotname, res->err)));
2337 }
2338
2340 }
2341 PG_FINALLY();
2342 {
2343 pfree(cmd.data);
2344 }
2345 PG_END_TRY();
2346}
#define LOG
Definition: elog.h:31
const char * quote_identifier(const char *ident)
Definition: ruleutils.c:13062
@ WALRCV_OK_COMMAND
Definition: walreceiver.h:205
@ WALRCV_ERROR
Definition: walreceiver.h:204

References appendStringInfo(), Assert(), StringInfoData::data, ereport, WalRcvExecResult::err, errcode(), errmsg(), ERROR, initStringInfo(), load_file(), LOG, NOTICE, pfree(), PG_END_TRY, PG_FINALLY, PG_TRY, quote_identifier(), WalRcvExecResult::sqlstate, WalRcvExecResult::status, walrcv_clear_result(), WALRCV_ERROR, walrcv_exec, WALRCV_OK_COMMAND, and wrconn.

Referenced by AlterSubscription_refresh(), DropSubscription(), LogicalRepSyncTableStart(), and ProcessSyncingTablesForSync().

◆ ReportSlotConnectionError()

static void ReportSlotConnectionError ( List rstates,
Oid  subid,
char *  slotname,
char *  err 
)
static

Definition at line 3001 of file subscriptioncmds.c.

3002{
3003 ListCell *lc;
3004
3005 foreach(lc, rstates)
3006 {
3008 Oid relid = rstate->relid;
3009
3010 /* Only cleanup resources of tablesync workers */
3011 if (!OidIsValid(relid))
3012 continue;
3013
3014 /*
3015 * Caller needs to ensure that relstate doesn't change underneath us.
3016 * See DropSubscription where we get the relstates.
3017 */
3018 if (rstate->state != SUBREL_STATE_SYNCDONE)
3019 {
3020 char syncslotname[NAMEDATALEN] = {0};
3021
3022 ReplicationSlotNameForTablesync(subid, relid, syncslotname,
3023 sizeof(syncslotname));
3024 elog(WARNING, "could not drop tablesync replication slot \"%s\"",
3025 syncslotname);
3026 }
3027 }
3028
3029 ereport(ERROR,
3030 (errcode(ERRCODE_CONNECTION_FAILURE),
3031 errmsg("could not connect to publisher when attempting to drop replication slot \"%s\": %s",
3032 slotname, err),
3033 /* translator: %s is an SQL ALTER command */
3034 errhint("Use %s to disable the subscription, and then use %s to disassociate it from the slot.",
3035 "ALTER SUBSCRIPTION ... DISABLE",
3036 "ALTER SUBSCRIPTION ... SET (slot_name = NONE)")));
3037}

References elog, ereport, err(), errcode(), errhint(), errmsg(), ERROR, lfirst, NAMEDATALEN, OidIsValid, SubscriptionRelState::relid, ReplicationSlotNameForTablesync(), SubscriptionRelState::state, and WARNING.

Referenced by DropSubscription().