29
29
#include "getopt_long.h"
30
30
31
31
#define DEFAULT_SUB_PORT "50432"
32
+ #define OBJECTTYPE_PUBLICATIONS 0x0001
32
33
33
34
/* Command-line options */
34
35
struct CreateSubscriberOptions
@@ -44,6 +45,7 @@ struct CreateSubscriberOptions
44
45
SimpleStringList sub_names ; /* list of subscription names */
45
46
SimpleStringList replslot_names ; /* list of replication slot names */
46
47
int recovery_timeout ; /* stop recovery after this time */
48
+ SimpleStringList objecttypes_to_remove ; /* list of object types to remove */
47
49
};
48
50
49
51
/* per-database publication/subscription info */
@@ -68,6 +70,8 @@ struct LogicalRepInfos
68
70
{
69
71
struct LogicalRepInfo * dbinfo ;
70
72
bool two_phase ; /* enable-two-phase option */
73
+ bits32 objecttypes_to_remove ; /* flags indicating which object types
74
+ * to remove on subscriber */
71
75
};
72
76
73
77
static void cleanup_objects_atexit (void );
@@ -109,7 +113,9 @@ static void stop_standby_server(const char *datadir);
109
113
static void wait_for_end_recovery (const char * conninfo ,
110
114
const struct CreateSubscriberOptions * opt );
111
115
static void create_publication (PGconn * conn , struct LogicalRepInfo * dbinfo );
112
- static void drop_publication (PGconn * conn , struct LogicalRepInfo * dbinfo );
116
+ static void drop_publication (PGconn * conn , const char * pubname ,
117
+ const char * dbname , bool * made_publication );
118
+ static void check_and_drop_publications (PGconn * conn , struct LogicalRepInfo * dbinfo );
113
119
static void create_subscription (PGconn * conn , const struct LogicalRepInfo * dbinfo );
114
120
static void set_replication_progress (PGconn * conn , const struct LogicalRepInfo * dbinfo ,
115
121
const char * lsn );
@@ -194,7 +200,8 @@ cleanup_objects_atexit(void)
194
200
if (conn != NULL )
195
201
{
196
202
if (dbinfo -> made_publication )
197
- drop_publication (conn , dbinfo );
203
+ drop_publication (conn , dbinfo -> pubname , dbinfo -> dbname ,
204
+ & dbinfo -> made_publication );
198
205
if (dbinfo -> made_replslot )
199
206
drop_replication_slot (conn , dbinfo , dbinfo -> replslotname );
200
207
disconnect_database (conn , false);
@@ -241,6 +248,8 @@ usage(void)
241
248
printf (_ (" -n, --dry-run dry run, just show what would be done\n" ));
242
249
printf (_ (" -p, --subscriber-port=PORT subscriber port number (default %s)\n" ), DEFAULT_SUB_PORT );
243
250
printf (_ (" -P, --publisher-server=CONNSTR publisher connection string\n" ));
251
+ printf (_ (" -R, --remove=OBJECTTYPE remove all objects of the specified type from specified\n"
252
+ " databases on the subscriber; accepts: publications\n" ));
244
253
printf (_ (" -s, --socketdir=DIR socket directory to use (default current dir.)\n" ));
245
254
printf (_ (" -t, --recovery-timeout=SECS seconds to wait for recovery to end\n" ));
246
255
printf (_ (" -T, --enable-two-phase enable two-phase commit for all subscriptions\n" ));
@@ -1193,12 +1202,8 @@ setup_subscriber(struct LogicalRepInfo *dbinfo, const char *consistent_lsn)
1193
1202
*/
1194
1203
check_and_drop_existing_subscriptions (conn , & dbinfo [i ]);
1195
1204
1196
- /*
1197
- * Since the publication was created before the consistent LSN, it is
1198
- * available on the subscriber when the physical replica is promoted.
1199
- * Remove publications from the subscriber because it has no use.
1200
- */
1201
- drop_publication (conn , & dbinfo [i ]);
1205
+ /* Check and drop the required publications in the given database. */
1206
+ check_and_drop_publications (conn , & dbinfo [i ]);
1202
1207
1203
1208
create_subscription (conn , & dbinfo [i ]);
1204
1209
@@ -1663,21 +1668,22 @@ create_publication(PGconn *conn, struct LogicalRepInfo *dbinfo)
1663
1668
}
1664
1669
1665
1670
/*
1666
- * Remove publication if it couldn't finish all steps .
1671
+ * Drop the specified publication in the given database .
1667
1672
*/
1668
1673
static void
1669
- drop_publication (PGconn * conn , struct LogicalRepInfo * dbinfo )
1674
+ drop_publication (PGconn * conn , const char * pubname , const char * dbname ,
1675
+ bool * made_publication )
1670
1676
{
1671
1677
PQExpBuffer str = createPQExpBuffer ();
1672
1678
PGresult * res ;
1673
1679
char * pubname_esc ;
1674
1680
1675
1681
Assert (conn != NULL );
1676
1682
1677
- pubname_esc = PQescapeIdentifier (conn , dbinfo -> pubname , strlen (dbinfo -> pubname ));
1683
+ pubname_esc = PQescapeIdentifier (conn , pubname , strlen (pubname ));
1678
1684
1679
1685
pg_log_info ("dropping publication \"%s\" in database \"%s\"" ,
1680
- dbinfo -> pubname , dbinfo -> dbname );
1686
+ pubname , dbname );
1681
1687
1682
1688
appendPQExpBuffer (str , "DROP PUBLICATION %s" , pubname_esc );
1683
1689
@@ -1691,8 +1697,8 @@ drop_publication(PGconn *conn, struct LogicalRepInfo *dbinfo)
1691
1697
if (PQresultStatus (res ) != PGRES_COMMAND_OK )
1692
1698
{
1693
1699
pg_log_error ("could not drop publication \"%s\" in database \"%s\": %s" ,
1694
- dbinfo -> pubname , dbinfo -> dbname , PQresultErrorMessage (res ));
1695
- dbinfo -> made_publication = false; /* don't try again. */
1700
+ pubname , dbname , PQresultErrorMessage (res ));
1701
+ * made_publication = false; /* don't try again. */
1696
1702
1697
1703
/*
1698
1704
* Don't disconnect and exit here. This routine is used by primary
@@ -1708,6 +1714,49 @@ drop_publication(PGconn *conn, struct LogicalRepInfo *dbinfo)
1708
1714
destroyPQExpBuffer (str );
1709
1715
}
1710
1716
1717
+ /*
1718
+ * Retrieve and drop the publications.
1719
+ *
1720
+ * Since the publications were created before the consistent LSN, they
1721
+ * remain on the subscriber even after the physical replica is
1722
+ * promoted. Remove these publications from the subscriber because
1723
+ * they have no use. Additionally, if requested, drop all pre-existing
1724
+ * publications.
1725
+ */
1726
+ static void
1727
+ check_and_drop_publications (PGconn * conn , struct LogicalRepInfo * dbinfo )
1728
+ {
1729
+ PGresult * res ;
1730
+
1731
+ Assert (conn != NULL );
1732
+
1733
+ if (dbinfos .objecttypes_to_remove & OBJECTTYPE_PUBLICATIONS )
1734
+ {
1735
+ pg_log_info ("dropping all existing publications in database \"%s\"" ,
1736
+ dbinfo -> dbname );
1737
+
1738
+ /* Fetch all publication names */
1739
+ res = PQexec (conn , "SELECT pubname FROM pg_catalog.pg_publication;" );
1740
+ if (PQresultStatus (res ) != PGRES_TUPLES_OK )
1741
+ {
1742
+ pg_log_error ("could not obtain publication information: %s" ,
1743
+ PQresultErrorMessage (res ));
1744
+ PQclear (res );
1745
+ disconnect_database (conn , true);
1746
+ }
1747
+
1748
+ /* Drop each publication */
1749
+ for (int i = 0 ; i < PQntuples (res ); i ++ )
1750
+ drop_publication (conn , PQgetvalue (res , i , 0 ), dbinfo -> dbname ,
1751
+ & dbinfo -> made_publication );
1752
+
1753
+ PQclear (res );
1754
+ }
1755
+ else
1756
+ drop_publication (conn , dbinfo -> pubname , dbinfo -> dbname ,
1757
+ & dbinfo -> made_publication );
1758
+ }
1759
+
1711
1760
/*
1712
1761
* Create a subscription with some predefined options.
1713
1762
*
@@ -1914,6 +1963,7 @@ main(int argc, char **argv)
1914
1963
{"dry-run" , no_argument , NULL , 'n' },
1915
1964
{"subscriber-port" , required_argument , NULL , 'p' },
1916
1965
{"publisher-server" , required_argument , NULL , 'P' },
1966
+ {"remove" , required_argument , NULL , 'R' },
1917
1967
{"socketdir" , required_argument , NULL , 's' },
1918
1968
{"recovery-timeout" , required_argument , NULL , 't' },
1919
1969
{"enable-two-phase" , no_argument , NULL , 'T' },
@@ -1995,7 +2045,7 @@ main(int argc, char **argv)
1995
2045
1996
2046
get_restricted_token ();
1997
2047
1998
- while ((c = getopt_long (argc , argv , "d:D:np:P:s:t:TU:v" ,
2048
+ while ((c = getopt_long (argc , argv , "d:D:np:P:R: s:t:TU:v" ,
1999
2049
long_options , & option_index )) != -1 )
2000
2050
{
2001
2051
switch (c )
@@ -2025,6 +2075,12 @@ main(int argc, char **argv)
2025
2075
case 'P' :
2026
2076
opt .pub_conninfo_str = pg_strdup (optarg );
2027
2077
break ;
2078
+ case 'R' :
2079
+ if (!simple_string_list_member (& opt .objecttypes_to_remove , optarg ))
2080
+ simple_string_list_append (& opt .objecttypes_to_remove , optarg );
2081
+ else
2082
+ pg_fatal ("object type \"%s\" is specified more than once for --remove" , optarg );
2083
+ break ;
2028
2084
case 's' :
2029
2085
opt .socket_dir = pg_strdup (optarg );
2030
2086
canonicalize_path (opt .socket_dir );
@@ -2189,6 +2245,19 @@ main(int argc, char **argv)
2189
2245
exit (1 );
2190
2246
}
2191
2247
2248
+ /* Verify the object types specified for removal from the subscriber */
2249
+ for (SimpleStringListCell * cell = opt .objecttypes_to_remove .head ; cell ; cell = cell -> next )
2250
+ {
2251
+ if (pg_strcasecmp (cell -> val , "publications" ) == 0 )
2252
+ dbinfos .objecttypes_to_remove |= OBJECTTYPE_PUBLICATIONS ;
2253
+ else
2254
+ {
2255
+ pg_log_error ("invalid object type \"%s\" specified for --remove" , cell -> val );
2256
+ pg_log_error_hint ("The valid option is: \"publications\"" );
2257
+ exit (1 );
2258
+ }
2259
+ }
2260
+
2192
2261
/* Get the absolute path of pg_ctl and pg_resetwal on the subscriber */
2193
2262
pg_ctl_path = get_exec_path (argv [0 ], "pg_ctl" );
2194
2263
pg_resetwal_path = get_exec_path (argv [0 ], "pg_resetwal" );
0 commit comments