summaryrefslogtreecommitdiff
path: root/src/include/replication/reorderbuffer.h
diff options
context:
space:
mode:
Diffstat (limited to 'src/include/replication/reorderbuffer.h')
-rw-r--r--src/include/replication/reorderbuffer.h56
1 files changed, 54 insertions, 2 deletions
diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h
index 42bc8176487..1ae17d5f11f 100644
--- a/src/include/replication/reorderbuffer.h
+++ b/src/include/replication/reorderbuffer.h
@@ -162,6 +162,9 @@ typedef struct ReorderBufferChange
#define RBTXN_HAS_CATALOG_CHANGES 0x0001
#define RBTXN_IS_SUBXACT 0x0002
#define RBTXN_IS_SERIALIZED 0x0004
+#define RBTXN_IS_STREAMED 0x0008
+#define RBTXN_HAS_TOAST_INSERT 0x0010
+#define RBTXN_HAS_SPEC_INSERT 0x0020
/* Does the transaction have catalog changes? */
#define rbtxn_has_catalog_changes(txn) \
@@ -181,6 +184,40 @@ typedef struct ReorderBufferChange
((txn)->txn_flags & RBTXN_IS_SERIALIZED) != 0 \
)
+/* This transaction's changes has toast insert, without main table insert. */
+#define rbtxn_has_toast_insert(txn) \
+( \
+ ((txn)->txn_flags & RBTXN_HAS_TOAST_INSERT) != 0 \
+)
+/*
+ * This transaction's changes has speculative insert, without speculative
+ * confirm.
+ */
+#define rbtxn_has_spec_insert(txn) \
+( \
+ ((txn)->txn_flags & RBTXN_HAS_SPEC_INSERT) != 0 \
+)
+
+/* Check whether this transaction has an incomplete change. */
+#define rbtxn_has_incomplete_tuple(txn) \
+( \
+ rbtxn_has_toast_insert(txn) || rbtxn_has_spec_insert(txn) \
+)
+
+/*
+ * Has this transaction been streamed to downstream?
+ *
+ * (It's not possible to deduce this from nentries and nentries_mem for
+ * various reasons. For example, all changes may be in subtransactions in
+ * which case we'd have nentries==0 for the toplevel one, which would say
+ * nothing about the streaming. So we maintain this flag, but only for the
+ * toplevel transaction.)
+ */
+#define rbtxn_is_streamed(txn) \
+( \
+ ((txn)->txn_flags & RBTXN_IS_STREAMED) != 0 \
+)
+
typedef struct ReorderBufferTXN
{
/* See above */
@@ -249,6 +286,13 @@ typedef struct ReorderBufferTXN
dlist_node base_snapshot_node; /* link in txns_by_base_snapshot_lsn */
/*
+ * Snapshot/CID from the previous streaming run. Only valid for already
+ * streamed transactions (NULL/InvalidCommandId otherwise).
+ */
+ Snapshot snapshot_now;
+ CommandId command_id;
+
+ /*
* How many ReorderBufferChange's do we have in this txn.
*
* Changes in subtransactions are *not* included but tracked separately.
@@ -313,6 +357,12 @@ typedef struct ReorderBufferTXN
* Size of this transaction (changes currently in memory, in bytes).
*/
Size size;
+
+ /* Size of top-transaction including sub-transactions. */
+ Size total_size;
+
+ /* If we have detected concurrent abort then ignore future changes. */
+ bool concurrent_abort;
} ReorderBufferTXN;
/* so we can define the callbacks used inside struct ReorderBuffer itself */
@@ -484,12 +534,14 @@ void ReorderBufferFree(ReorderBuffer *);
ReorderBufferTupleBuf *ReorderBufferGetTupleBuf(ReorderBuffer *, Size tuple_len);
void ReorderBufferReturnTupleBuf(ReorderBuffer *, ReorderBufferTupleBuf *tuple);
ReorderBufferChange *ReorderBufferGetChange(ReorderBuffer *);
-void ReorderBufferReturnChange(ReorderBuffer *, ReorderBufferChange *);
+void ReorderBufferReturnChange(ReorderBuffer *, ReorderBufferChange *, bool);
Oid *ReorderBufferGetRelids(ReorderBuffer *, int nrelids);
void ReorderBufferReturnRelids(ReorderBuffer *, Oid *relids);
-void ReorderBufferQueueChange(ReorderBuffer *, TransactionId, XLogRecPtr lsn, ReorderBufferChange *);
+void ReorderBufferQueueChange(ReorderBuffer *, TransactionId,
+ XLogRecPtr lsn, ReorderBufferChange *,
+ bool toast_insert);
void ReorderBufferQueueMessage(ReorderBuffer *, TransactionId, Snapshot snapshot, XLogRecPtr lsn,
bool transactional, const char *prefix,
Size message_size, const char *message);