summaryrefslogtreecommitdiff
path: root/src/backend/replication/pgoutput/pgoutput.c
diff options
context:
space:
mode:
authorAmit Kapila2021-07-14 02:03:50 +0000
committerAmit Kapila2021-07-14 02:03:50 +0000
commita8fd13cab0ba815e9925dc9676e6309f699b5f72 (patch)
treebfebac6bfc2d32a9212e33f9090bd700b0316fae /src/backend/replication/pgoutput/pgoutput.c
parent6c9c2831668345122fd0f92280b30f3bbe2dd4e6 (diff)
Add support for prepared transactions to built-in logical replication.
To add support for streaming transactions at prepare time into the built-in logical replication, we need to do the following things: * Modify the output plugin (pgoutput) to implement the new two-phase API callbacks, by leveraging the extended replication protocol. * Modify the replication apply worker, to properly handle two-phase transactions by replaying them on prepare. * Add a new SUBSCRIPTION option "two_phase" to allow users to enable two-phase transactions. We enable the two_phase once the initial data sync is over. We however must explicitly disable replication of two-phase transactions during replication slot creation, even if the plugin supports it. We don't need to replicate the changes accumulated during this phase, and moreover, we don't have a replication connection open so we don't know where to send the data anyway. The streaming option is not allowed with this new two_phase option. This can be done as a separate patch. We don't allow to toggle two_phase option of a subscription because it can lead to an inconsistent replica. For the same reason, we don't allow to refresh the publication once the two_phase is enabled for a subscription unless copy_data option is false. Author: Peter Smith, Ajin Cherian and Amit Kapila based on previous work by Nikhil Sontakke and Stas Kelvich Reviewed-by: Amit Kapila, Sawada Masahiko, Vignesh C, Dilip Kumar, Takamichi Osumi, Greg Nancarrow Tested-By: Haiying Tang Discussion: https://postgr.es/m/[email protected] Discussion: https://postgr.es/m/CAA4eK1+opiV4aFTmWWUF9h_32=HfPOW9vZASHarT0UA5oBrtGw@mail.gmail.com
Diffstat (limited to 'src/backend/replication/pgoutput/pgoutput.c')
-rw-r--r--src/backend/replication/pgoutput/pgoutput.c201
1 files changed, 165 insertions, 36 deletions
diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
index abd5217ab1b..e4314af13ae 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -51,6 +51,16 @@ static void pgoutput_message(LogicalDecodingContext *ctx,
Size sz, const char *message);
static bool pgoutput_origin_filter(LogicalDecodingContext *ctx,
RepOriginId origin_id);
+static void pgoutput_begin_prepare_txn(LogicalDecodingContext *ctx,
+ ReorderBufferTXN *txn);
+static void pgoutput_prepare_txn(LogicalDecodingContext *ctx,
+ ReorderBufferTXN *txn, XLogRecPtr prepare_lsn);
+static void pgoutput_commit_prepared_txn(LogicalDecodingContext *ctx,
+ ReorderBufferTXN *txn, XLogRecPtr commit_lsn);
+static void pgoutput_rollback_prepared_txn(LogicalDecodingContext *ctx,
+ ReorderBufferTXN *txn,
+ XLogRecPtr prepare_end_lsn,
+ TimestampTz prepare_time);
static void pgoutput_stream_start(struct LogicalDecodingContext *ctx,
ReorderBufferTXN *txn);
static void pgoutput_stream_stop(struct LogicalDecodingContext *ctx,
@@ -70,6 +80,9 @@ static void publication_invalidation_cb(Datum arg, int cacheid,
uint32 hashvalue);
static void send_relation_and_attrs(Relation relation, TransactionId xid,
LogicalDecodingContext *ctx);
+static void send_repl_origin(LogicalDecodingContext *ctx,
+ RepOriginId origin_id, XLogRecPtr origin_lsn,
+ bool send_origin);
/*
* Entry in the map used to remember which relation schemas we sent.
@@ -145,6 +158,11 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb)
cb->truncate_cb = pgoutput_truncate;
cb->message_cb = pgoutput_message;
cb->commit_cb = pgoutput_commit_txn;
+
+ cb->begin_prepare_cb = pgoutput_begin_prepare_txn;
+ cb->prepare_cb = pgoutput_prepare_txn;
+ cb->commit_prepared_cb = pgoutput_commit_prepared_txn;
+ cb->rollback_prepared_cb = pgoutput_rollback_prepared_txn;
cb->filter_by_origin_cb = pgoutput_origin_filter;
cb->shutdown_cb = pgoutput_shutdown;
@@ -156,6 +174,8 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb)
cb->stream_change_cb = pgoutput_change;
cb->stream_message_cb = pgoutput_message;
cb->stream_truncate_cb = pgoutput_truncate;
+ /* transaction streaming - two-phase commit */
+ cb->stream_prepare_cb = NULL;
}
static void
@@ -167,10 +187,12 @@ parse_output_parameters(List *options, PGOutputData *data)
bool binary_option_given = false;
bool messages_option_given = false;
bool streaming_given = false;
+ bool two_phase_option_given = false;
data->binary = false;
data->streaming = false;
data->messages = false;
+ data->two_phase = false;
foreach(lc, options)
{
@@ -246,8 +268,29 @@ parse_output_parameters(List *options, PGOutputData *data)
data->streaming = defGetBoolean(defel);
}
+ else if (strcmp(defel->defname, "two_phase") == 0)
+ {
+ if (two_phase_option_given)
+ ereport(ERROR,
+ (errcode(ERRCODE_SYNTAX_ERROR),
+ errmsg("conflicting or redundant options")));
+ two_phase_option_given = true;
+
+ data->two_phase = defGetBoolean(defel);
+ }
else
elog(ERROR, "unrecognized pgoutput option: %s", defel->defname);
+
+ /*
+ * Do additional checking for the disallowed combination of two_phase
+ * and streaming. While streaming and two_phase can theoretically be
+ * supported, it needs more analysis to allow them together.
+ */
+ if (data->two_phase && data->streaming)
+ ereport(ERROR,
+ (errcode(ERRCODE_SYNTAX_ERROR),
+ errmsg("%s and %s are mutually exclusive options",
+ "two_phase", "streaming")));
}
}
@@ -319,6 +362,27 @@ pgoutput_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
/* Also remember we're currently not streaming any transaction. */
in_streaming = false;
+ /*
+ * Here, we just check whether the two-phase option is passed by
+ * plugin and decide whether to enable it at later point of time. It
+ * remains enabled if the previous start-up has done so. But we only
+ * allow the option to be passed in with sufficient version of the
+ * protocol, and when the output plugin supports it.
+ */
+ if (!data->two_phase)
+ ctx->twophase_opt_given = false;
+ else if (data->protocol_version < LOGICALREP_PROTO_TWOPHASE_VERSION_NUM)
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+ errmsg("requested proto_version=%d does not support two-phase commit, need %d or higher",
+ data->protocol_version, LOGICALREP_PROTO_TWOPHASE_VERSION_NUM)));
+ else if (!ctx->twophase)
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+ errmsg("two-phase commit requested, but not supported by output plugin")));
+ else
+ ctx->twophase_opt_given = true;
+
/* Init publication state. */
data->publications = NIL;
publications_valid = false;
@@ -331,8 +395,12 @@ pgoutput_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
}
else
{
- /* Disable the streaming during the slot initialization mode. */
+ /*
+ * Disable the streaming and prepared transactions during the slot
+ * initialization mode.
+ */
ctx->streaming = false;
+ ctx->twophase = false;
}
}
@@ -347,29 +415,8 @@ pgoutput_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
OutputPluginPrepareWrite(ctx, !send_replication_origin);
logicalrep_write_begin(ctx->out, txn);
- if (send_replication_origin)
- {
- char *origin;
-
- /*----------
- * XXX: which behaviour do we want here?
- *
- * Alternatives:
- * - don't send origin message if origin name not found
- * (that's what we do now)
- * - throw error - that will break replication, not good
- * - send some special "unknown" origin
- *----------
- */
- if (replorigin_by_oid(txn->origin_id, true, &origin))
- {
- /* Message boundary */
- OutputPluginWrite(ctx, false);
- OutputPluginPrepareWrite(ctx, true);
- logicalrep_write_origin(ctx->out, origin, txn->origin_lsn);
- }
-
- }
+ send_repl_origin(ctx, txn->origin_id, txn->origin_lsn,
+ send_replication_origin);
OutputPluginWrite(ctx, true);
}
@@ -389,6 +436,68 @@ pgoutput_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
}
/*
+ * BEGIN PREPARE callback
+ */
+static void
+pgoutput_begin_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
+{
+ bool send_replication_origin = txn->origin_id != InvalidRepOriginId;
+
+ OutputPluginPrepareWrite(ctx, !send_replication_origin);
+ logicalrep_write_begin_prepare(ctx->out, txn);
+
+ send_repl_origin(ctx, txn->origin_id, txn->origin_lsn,
+ send_replication_origin);
+
+ OutputPluginWrite(ctx, true);
+}
+
+/*
+ * PREPARE callback
+ */
+static void
+pgoutput_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
+ XLogRecPtr prepare_lsn)
+{
+ OutputPluginUpdateProgress(ctx);
+
+ OutputPluginPrepareWrite(ctx, true);
+ logicalrep_write_prepare(ctx->out, txn, prepare_lsn);
+ OutputPluginWrite(ctx, true);
+}
+
+/*
+ * COMMIT PREPARED callback
+ */
+static void
+pgoutput_commit_prepared_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
+ XLogRecPtr commit_lsn)
+{
+ OutputPluginUpdateProgress(ctx);
+
+ OutputPluginPrepareWrite(ctx, true);
+ logicalrep_write_commit_prepared(ctx->out, txn, commit_lsn);
+ OutputPluginWrite(ctx, true);
+}
+
+/*
+ * ROLLBACK PREPARED callback
+ */
+static void
+pgoutput_rollback_prepared_txn(LogicalDecodingContext *ctx,
+ ReorderBufferTXN *txn,
+ XLogRecPtr prepare_end_lsn,
+ TimestampTz prepare_time)
+{
+ OutputPluginUpdateProgress(ctx);
+
+ OutputPluginPrepareWrite(ctx, true);
+ logicalrep_write_rollback_prepared(ctx->out, txn, prepare_end_lsn,
+ prepare_time);
+ OutputPluginWrite(ctx, true);
+}
+
+/*
* Write the current schema of the relation and its ancestor (if any) if not
* done yet.
*/
@@ -839,18 +948,8 @@ pgoutput_stream_start(struct LogicalDecodingContext *ctx,
OutputPluginPrepareWrite(ctx, !send_replication_origin);
logicalrep_write_stream_start(ctx->out, txn->xid, !rbtxn_is_streamed(txn));
- if (send_replication_origin)
- {
- char *origin;
-
- if (replorigin_by_oid(txn->origin_id, true, &origin))
- {
- /* Message boundary */
- OutputPluginWrite(ctx, false);
- OutputPluginPrepareWrite(ctx, true);
- logicalrep_write_origin(ctx->out, origin, InvalidXLogRecPtr);
- }
- }
+ send_repl_origin(ctx, txn->origin_id, InvalidXLogRecPtr,
+ send_replication_origin);
OutputPluginWrite(ctx, true);
@@ -1270,3 +1369,33 @@ rel_sync_cache_publication_cb(Datum arg, int cacheid, uint32 hashvalue)
entry->pubactions.pubtruncate = false;
}
}
+
+/* Send Replication origin */
+static void
+send_repl_origin(LogicalDecodingContext *ctx, RepOriginId origin_id,
+ XLogRecPtr origin_lsn, bool send_origin)
+{
+ if (send_origin)
+ {
+ char *origin;
+
+ /*----------
+ * XXX: which behaviour do we want here?
+ *
+ * Alternatives:
+ * - don't send origin message if origin name not found
+ * (that's what we do now)
+ * - throw error - that will break replication, not good
+ * - send some special "unknown" origin
+ *----------
+ */
+ if (replorigin_by_oid(origin_id, true, &origin))
+ {
+ /* Message boundary */
+ OutputPluginWrite(ctx, false);
+ OutputPluginPrepareWrite(ctx, true);
+
+ logicalrep_write_origin(ctx->out, origin, origin_lsn);
+ }
+ }
+}