diff options
author | Michael Paquier | 2023-09-28 00:33:51 +0000 |
---|---|---|
committer | Michael Paquier | 2023-09-28 00:33:51 +0000 |
commit | 9210afd3bcd65feccb883ace4ed6dcef6a684585 (patch) | |
tree | 2be068139f410dac7648a1bf05f1075026fc2abc /src/backend | |
parent | ebf76f2753a91615d45f113f1535a8443fa8d076 (diff) |
Move tracking of in_streaming to PGOutputData
"in_streaming" is a flag used to track if an instance of pgoutput is
streaming changes. When pgoutput is started, the flag was always reset,
switched it back and forth in the stream start/stop callbacks.
Before this commit, it was a global variable, which is confusing as it
is actually attached to a state of PGOutputData. Per my analysis, using
a global variable did not lead to an active bug like in 54ccfd65868c,
but it makes the code more consistent. Note that we cannot backpatch
this change anyway as it requires the addition of a new field to
PGOutputData, exposed in pgoutput.h.
Author: Hou Zhijie
Reviewed-by: Amit Kapila, Michael Paquier, Peter Smith
Discussion: https://postgr.es/m/OS0PR01MB571690EF24F51F51EFFCBB0E94FAA@OS0PR01MB5716.jpnprd01.prod.outlook.com
Diffstat (limited to 'src/backend')
-rw-r--r-- | src/backend/replication/pgoutput/pgoutput.c | 34 |
1 files changed, 18 insertions, 16 deletions
diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c index 251ba46da5e..c1c66848f36 100644 --- a/src/backend/replication/pgoutput/pgoutput.c +++ b/src/backend/replication/pgoutput/pgoutput.c @@ -81,7 +81,6 @@ static void pgoutput_stream_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr prepare_lsn); static bool publications_valid; -static bool in_streaming; static List *LoadPublications(List *pubnames); static void publication_invalidation_cb(Datum arg, int cacheid, @@ -480,9 +479,6 @@ pgoutput_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), errmsg("streaming requested, but not supported by output plugin"))); - /* Also remember we're currently not streaming any transaction. */ - in_streaming = false; - /* * Here, we just check whether the two-phase option is passed by * plugin and decide whether to enable it at later point of time. It @@ -680,6 +676,7 @@ maybe_send_schema(LogicalDecodingContext *ctx, ReorderBufferChange *change, Relation relation, RelationSyncEntry *relentry) { + PGOutputData *data = (PGOutputData *) ctx->output_plugin_private; bool schema_sent; TransactionId xid = InvalidTransactionId; TransactionId topxid = InvalidTransactionId; @@ -692,7 +689,7 @@ maybe_send_schema(LogicalDecodingContext *ctx, * If we're not in a streaming block, just use InvalidTransactionId and * the write methods will not include it. */ - if (in_streaming) + if (data->in_streaming) xid = change->txn->xid; if (rbtxn_is_subtxn(change->txn)) @@ -712,7 +709,7 @@ maybe_send_schema(LogicalDecodingContext *ctx, * doing that we need to study its impact on the case where we have a mix * of streaming and non-streaming transactions. */ - if (in_streaming) + if (data->in_streaming) schema_sent = get_schema_sent_in_streamed_txn(relentry, topxid); else schema_sent = relentry->schema_sent; @@ -736,7 +733,7 @@ maybe_send_schema(LogicalDecodingContext *ctx, send_relation_and_attrs(relation, xid, ctx, relentry->columns); - if (in_streaming) + if (data->in_streaming) set_schema_sent_in_streamed_txn(relentry, topxid); else relentry->schema_sent = true; @@ -1422,7 +1419,7 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, * their association and on aborts, it can discard the corresponding * changes. */ - if (in_streaming) + if (data->in_streaming) xid = change->txn->xid; relentry = get_rel_sync_entry(data, relation); @@ -1571,7 +1568,7 @@ pgoutput_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, TransactionId xid = InvalidTransactionId; /* Remember the xid for the change in streaming mode. See pgoutput_change. */ - if (in_streaming) + if (data->in_streaming) xid = change->txn->xid; old = MemoryContextSwitchTo(data->context); @@ -1640,7 +1637,7 @@ pgoutput_message(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, * Remember the xid for the message in streaming mode. See * pgoutput_change. */ - if (in_streaming) + if (data->in_streaming) xid = txn->xid; /* @@ -1743,10 +1740,11 @@ static void pgoutput_stream_start(struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn) { + PGOutputData *data = (PGOutputData *) ctx->output_plugin_private; bool send_replication_origin = txn->origin_id != InvalidRepOriginId; /* we can't nest streaming of transactions */ - Assert(!in_streaming); + Assert(!data->in_streaming); /* * If we already sent the first stream for this transaction then don't @@ -1764,7 +1762,7 @@ pgoutput_stream_start(struct LogicalDecodingContext *ctx, OutputPluginWrite(ctx, true); /* we're streaming a chunk of transaction now */ - in_streaming = true; + data->in_streaming = true; } /* @@ -1774,15 +1772,17 @@ static void pgoutput_stream_stop(struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn) { + PGOutputData *data = (PGOutputData *) ctx->output_plugin_private; + /* we should be streaming a transaction */ - Assert(in_streaming); + Assert(data->in_streaming); OutputPluginPrepareWrite(ctx, true); logicalrep_write_stream_stop(ctx->out); OutputPluginWrite(ctx, true); /* we've stopped streaming a transaction */ - in_streaming = false; + data->in_streaming = false; } /* @@ -1802,7 +1802,7 @@ pgoutput_stream_abort(struct LogicalDecodingContext *ctx, * The abort should happen outside streaming block, even for streamed * transactions. The transaction has to be marked as streamed, though. */ - Assert(!in_streaming); + Assert(!data->in_streaming); /* determine the toplevel transaction */ toptxn = rbtxn_get_toptxn(txn); @@ -1827,11 +1827,13 @@ pgoutput_stream_commit(struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr commit_lsn) { + PGOutputData *data PG_USED_FOR_ASSERTS_ONLY = (PGOutputData *) ctx->output_plugin_private; + /* * The commit should happen outside streaming block, even for streamed * transactions. The transaction has to be marked as streamed, though. */ - Assert(!in_streaming); + Assert(!data->in_streaming); Assert(rbtxn_is_streamed(txn)); OutputPluginUpdateProgress(ctx, false); |