Skip to content

Commit 325f2ec

Browse files
committed
Handle heap rewrites even better in logical decoding
Logical decoding should not publish anything about tables created as part of a heap rewrite during DDL. Those tables don't exist externally, so consumers of logical decoding cannot do anything sensible with that information. In ab28fea, we worked around this for built-in logical replication, but that was hack. This is a more proper fix: We mark such transient heaps using the new field pg_class.relwrite, linking to the original relation OID. By default, we ignore them in logical decoding before they get to the output plugin. Optionally, a plugin can register their interest in getting such changes, if they handle DDL specially, in which case the new field will help them get information about the actual table. Reviewed-by: Craig Ringer <[email protected]>
1 parent be8a7a6 commit 325f2ec

File tree

20 files changed

+113
-103
lines changed

20 files changed

+113
-103
lines changed

contrib/test_decoding/expected/concurrent_ddl_dml.out

Lines changed: 28 additions & 54 deletions
Large diffs are not rendered by default.

contrib/test_decoding/expected/ddl.out

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -117,11 +117,11 @@ SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'inc
117117
(22 rows)
118118

119119
ALTER TABLE replication_example ALTER COLUMN somenum TYPE int4 USING (somenum::int4);
120-
-- throw away changes, they contain oids
120+
-- check that this doesn't produce any changes from the heap rewrite
121121
SELECT count(data) FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
122122
count
123123
-------
124-
12
124+
0
125125
(1 row)
126126

127127
INSERT INTO replication_example(somedata, somenum) VALUES (5, 1);
@@ -192,16 +192,20 @@ SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'inc
192192
COMMIT
193193
(33 rows)
194194

195-
-- hide changes bc of oid visible in full table rewrites
196195
CREATE TABLE tr_unique(id2 serial unique NOT NULL, data int);
197196
INSERT INTO tr_unique(data) VALUES(10);
198197
ALTER TABLE tr_unique RENAME TO tr_pkey;
199198
ALTER TABLE tr_pkey ADD COLUMN id serial primary key;
200-
SELECT count(data) FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
201-
count
202-
-------
203-
6
204-
(1 row)
199+
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'include-rewrites', '1');
200+
data
201+
-----------------------------------------------------------------------------
202+
BEGIN
203+
table public.tr_unique: INSERT: id2[integer]:1 data[integer]:10
204+
COMMIT
205+
BEGIN
206+
table public.tr_pkey: INSERT: id2[integer]:1 data[integer]:10 id[integer]:1
207+
COMMIT
208+
(6 rows)
205209

206210
INSERT INTO tr_pkey(data) VALUES(1);
207211
--show deletion with primary key

contrib/test_decoding/specs/concurrent_ddl_dml.spec

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ step "s2_alter_tbl2_3rd_char" { ALTER TABLE tbl2 ALTER COLUMN val3 TYPE characte
5353
step "s2_alter_tbl2_3rd_text" { ALTER TABLE tbl2 ALTER COLUMN val3 TYPE text; }
5454
step "s2_alter_tbl2_3rd_int" { ALTER TABLE tbl2 ALTER COLUMN val3 TYPE int USING val3::integer; }
5555

56-
step "s2_get_changes" { SELECT regexp_replace(data, 'temp_\d+', 'temp') AS data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); }
56+
step "s2_get_changes" { SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); }
5757

5858

5959

contrib/test_decoding/sql/ddl.sql

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ INSERT INTO replication_example(somedata, somenum) VALUES (4, 1);
6767
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
6868

6969
ALTER TABLE replication_example ALTER COLUMN somenum TYPE int4 USING (somenum::int4);
70-
-- throw away changes, they contain oids
70+
-- check that this doesn't produce any changes from the heap rewrite
7171
SELECT count(data) FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
7272

7373
INSERT INTO replication_example(somedata, somenum) VALUES (5, 1);
@@ -93,12 +93,11 @@ COMMIT;
9393
/* display results */
9494
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
9595

96-
-- hide changes bc of oid visible in full table rewrites
9796
CREATE TABLE tr_unique(id2 serial unique NOT NULL, data int);
9897
INSERT INTO tr_unique(data) VALUES(10);
9998
ALTER TABLE tr_unique RENAME TO tr_pkey;
10099
ALTER TABLE tr_pkey ADD COLUMN id serial primary key;
101-
SELECT count(data) FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
100+
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'include-rewrites', '1');
102101

103102
INSERT INTO tr_pkey(data) VALUES(1);
104103
--show deletion with primary key

contrib/test_decoding/test_decoding.c

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,7 @@ pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
101101
ctx->output_plugin_private = data;
102102

103103
opt->output_type = OUTPUT_PLUGIN_TEXTUAL_OUTPUT;
104+
opt->receive_rewrites = false;
104105

105106
foreach(option, ctx->output_plugin_options)
106107
{
@@ -166,6 +167,17 @@ pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
166167
errmsg("could not parse value \"%s\" for parameter \"%s\"",
167168
strVal(elem->arg), elem->defname)));
168169
}
170+
else if (strcmp(elem->defname, "include-rewrites") == 0)
171+
{
172+
173+
if (elem->arg == NULL)
174+
continue;
175+
else if (!parse_bool(strVal(elem->arg), &opt->receive_rewrites))
176+
ereport(ERROR,
177+
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
178+
errmsg("could not parse value \"%s\" for parameter \"%s\"",
179+
strVal(elem->arg), elem->defname)));
180+
}
169181
else
170182
{
171183
ereport(ERROR,
@@ -412,6 +424,8 @@ pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
412424
quote_qualified_identifier(
413425
get_namespace_name(
414426
get_rel_namespace(RelationGetRelid(relation))),
427+
class_form->relrewrite ?
428+
get_rel_name(class_form->relrewrite) :
415429
NameStr(class_form->relname)));
416430
appendStringInfoChar(ctx->out, ':');
417431

doc/src/sgml/catalogs.sgml

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1923,6 +1923,18 @@ SCRAM-SHA-256$<replaceable>&lt;iteration count&gt;</replaceable>:<replaceable>&l
19231923
<entry>True if table is a partition</entry>
19241924
</row>
19251925

1926+
<row>
1927+
<entry><structfield>relrewrite</structfield></entry>
1928+
<entry><type>oid</type></entry>
1929+
<entry><literal><link linkend="catalog-pg-class"><structname>pg_class</structname></link>.oid</literal></entry>
1930+
<entry>
1931+
For new relations being written during a DDL operation that requires a
1932+
table rewrite, this contains the OID of the original relation;
1933+
otherwise 0. That state is only visible internally; this field should
1934+
never contain anything other than 0 for a user-visible relation.
1935+
</entry>
1936+
</row>
1937+
19261938
<row>
19271939
<entry><structfield>relfrozenxid</structfield></entry>
19281940
<entry><type>xid</type></entry>

doc/src/sgml/logicaldecoding.sgml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -486,12 +486,17 @@ typedef void (*LogicalDecodeStartupCB) (struct LogicalDecodingContext *ctx,
486486
typedef struct OutputPluginOptions
487487
{
488488
OutputPluginOutputType output_type;
489+
bool receive_rewrites;
489490
} OutputPluginOptions;
490491
</programlisting>
491492
<literal>output_type</literal> has to either be set to
492493
<literal>OUTPUT_PLUGIN_TEXTUAL_OUTPUT</literal>
493494
or <literal>OUTPUT_PLUGIN_BINARY_OUTPUT</literal>. See also
494495
<xref linkend="logicaldecoding-output-mode"/>.
496+
If <literal>receive_rewrites</literal> is true, the output plugin will
497+
also be called for changes made by heap rewrites during certain DDL
498+
operations. These are of interest to plugins that handle DDL
499+
replication, but they require special handling.
495500
</para>
496501

497502
<para>

src/backend/bootstrap/bootparse.y

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -257,6 +257,7 @@ Boot_CreateStmt:
257257
false,
258258
true,
259259
false,
260+
InvalidOid,
260261
NULL);
261262
elog(DEBUG4, "relation created with OID %u", id);
262263
}

src/backend/catalog/heap.c

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -806,6 +806,7 @@ InsertPgClassTuple(Relation pg_class_desc,
806806
values[Anum_pg_class_relispopulated - 1] = BoolGetDatum(rd_rel->relispopulated);
807807
values[Anum_pg_class_relreplident - 1] = CharGetDatum(rd_rel->relreplident);
808808
values[Anum_pg_class_relispartition - 1] = BoolGetDatum(rd_rel->relispartition);
809+
values[Anum_pg_class_relrewrite - 1] = ObjectIdGetDatum(rd_rel->relrewrite);
809810
values[Anum_pg_class_relfrozenxid - 1] = TransactionIdGetDatum(rd_rel->relfrozenxid);
810811
values[Anum_pg_class_relminmxid - 1] = MultiXactIdGetDatum(rd_rel->relminmxid);
811812
if (relacl != (Datum) 0)
@@ -1038,6 +1039,7 @@ heap_create_with_catalog(const char *relname,
10381039
bool use_user_acl,
10391040
bool allow_system_table_mods,
10401041
bool is_internal,
1042+
Oid relrewrite,
10411043
ObjectAddress *typaddress)
10421044
{
10431045
Relation pg_class_desc;
@@ -1176,6 +1178,8 @@ heap_create_with_catalog(const char *relname,
11761178

11771179
Assert(relid == RelationGetRelid(new_rel_desc));
11781180

1181+
new_rel_desc->rd_rel->relrewrite = relrewrite;
1182+
11791183
/*
11801184
* Decide whether to create an array type over the relation's rowtype. We
11811185
* do not create any array types for system catalogs (ie, those made

src/backend/catalog/toasting.c

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -279,6 +279,7 @@ create_toast_table(Relation rel, Oid toastOid, Oid toastIndexOid,
279279
false,
280280
true,
281281
true,
282+
InvalidOid,
282283
NULL);
283284
Assert(toast_relid != InvalidOid);
284285

0 commit comments

Comments
 (0)