summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/backend/replication/pgoutput/pgoutput.c34
-rw-r--r--src/include/replication/pgoutput.h3
2 files changed, 21 insertions, 16 deletions
diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
index 251ba46da5e..c1c66848f36 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -81,7 +81,6 @@ static void pgoutput_stream_prepare_txn(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn, XLogRecPtr prepare_lsn);
static bool publications_valid;
-static bool in_streaming;
static List *LoadPublications(List *pubnames);
static void publication_invalidation_cb(Datum arg, int cacheid,
@@ -480,9 +479,6 @@ pgoutput_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("streaming requested, but not supported by output plugin")));
- /* Also remember we're currently not streaming any transaction. */
- in_streaming = false;
-
/*
* Here, we just check whether the two-phase option is passed by
* plugin and decide whether to enable it at later point of time. It
@@ -680,6 +676,7 @@ maybe_send_schema(LogicalDecodingContext *ctx,
ReorderBufferChange *change,
Relation relation, RelationSyncEntry *relentry)
{
+ PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
bool schema_sent;
TransactionId xid = InvalidTransactionId;
TransactionId topxid = InvalidTransactionId;
@@ -692,7 +689,7 @@ maybe_send_schema(LogicalDecodingContext *ctx,
* If we're not in a streaming block, just use InvalidTransactionId and
* the write methods will not include it.
*/
- if (in_streaming)
+ if (data->in_streaming)
xid = change->txn->xid;
if (rbtxn_is_subtxn(change->txn))
@@ -712,7 +709,7 @@ maybe_send_schema(LogicalDecodingContext *ctx,
* doing that we need to study its impact on the case where we have a mix
* of streaming and non-streaming transactions.
*/
- if (in_streaming)
+ if (data->in_streaming)
schema_sent = get_schema_sent_in_streamed_txn(relentry, topxid);
else
schema_sent = relentry->schema_sent;
@@ -736,7 +733,7 @@ maybe_send_schema(LogicalDecodingContext *ctx,
send_relation_and_attrs(relation, xid, ctx, relentry->columns);
- if (in_streaming)
+ if (data->in_streaming)
set_schema_sent_in_streamed_txn(relentry, topxid);
else
relentry->schema_sent = true;
@@ -1422,7 +1419,7 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
* their association and on aborts, it can discard the corresponding
* changes.
*/
- if (in_streaming)
+ if (data->in_streaming)
xid = change->txn->xid;
relentry = get_rel_sync_entry(data, relation);
@@ -1571,7 +1568,7 @@ pgoutput_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
TransactionId xid = InvalidTransactionId;
/* Remember the xid for the change in streaming mode. See pgoutput_change. */
- if (in_streaming)
+ if (data->in_streaming)
xid = change->txn->xid;
old = MemoryContextSwitchTo(data->context);
@@ -1640,7 +1637,7 @@ pgoutput_message(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
* Remember the xid for the message in streaming mode. See
* pgoutput_change.
*/
- if (in_streaming)
+ if (data->in_streaming)
xid = txn->xid;
/*
@@ -1743,10 +1740,11 @@ static void
pgoutput_stream_start(struct LogicalDecodingContext *ctx,
ReorderBufferTXN *txn)
{
+ PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
bool send_replication_origin = txn->origin_id != InvalidRepOriginId;
/* we can't nest streaming of transactions */
- Assert(!in_streaming);
+ Assert(!data->in_streaming);
/*
* If we already sent the first stream for this transaction then don't
@@ -1764,7 +1762,7 @@ pgoutput_stream_start(struct LogicalDecodingContext *ctx,
OutputPluginWrite(ctx, true);
/* we're streaming a chunk of transaction now */
- in_streaming = true;
+ data->in_streaming = true;
}
/*
@@ -1774,15 +1772,17 @@ static void
pgoutput_stream_stop(struct LogicalDecodingContext *ctx,
ReorderBufferTXN *txn)
{
+ PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
+
/* we should be streaming a transaction */
- Assert(in_streaming);
+ Assert(data->in_streaming);
OutputPluginPrepareWrite(ctx, true);
logicalrep_write_stream_stop(ctx->out);
OutputPluginWrite(ctx, true);
/* we've stopped streaming a transaction */
- in_streaming = false;
+ data->in_streaming = false;
}
/*
@@ -1802,7 +1802,7 @@ pgoutput_stream_abort(struct LogicalDecodingContext *ctx,
* The abort should happen outside streaming block, even for streamed
* transactions. The transaction has to be marked as streamed, though.
*/
- Assert(!in_streaming);
+ Assert(!data->in_streaming);
/* determine the toplevel transaction */
toptxn = rbtxn_get_toptxn(txn);
@@ -1827,11 +1827,13 @@ pgoutput_stream_commit(struct LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
XLogRecPtr commit_lsn)
{
+ PGOutputData *data PG_USED_FOR_ASSERTS_ONLY = (PGOutputData *) ctx->output_plugin_private;
+
/*
* The commit should happen outside streaming block, even for streamed
* transactions. The transaction has to be marked as streamed, though.
*/
- Assert(!in_streaming);
+ Assert(!data->in_streaming);
Assert(rbtxn_is_streamed(txn));
OutputPluginUpdateProgress(ctx, false);
diff --git a/src/include/replication/pgoutput.h b/src/include/replication/pgoutput.h
index b3f9a016293..cee209e4cc2 100644
--- a/src/include/replication/pgoutput.h
+++ b/src/include/replication/pgoutput.h
@@ -21,6 +21,9 @@ typedef struct PGOutputData
* allocations */
MemoryContext cachectx; /* private memory context for cache data */
+ bool in_streaming; /* true if we are streaming a chunk of
+ * transaction */
+
/* client-supplied info: */
uint32 protocol_version;
List *publication_names;