diff options
author | Amit Kapila | 2020-08-08 02:04:39 +0000 |
---|---|---|
committer | Amit Kapila | 2020-08-08 02:17:06 +0000 |
commit | 7259736a6e5b7c7588fff9578370736a6648acbb (patch) | |
tree | a2261d4ed09124a00d9ed8c0082f22256364aa77 /src/include/replication/reorderbuffer.h | |
parent | 0a7d771f0f63eb120e7f0a60aecd543ab25ba197 (diff) |
Implement streaming mode in ReorderBuffer.
Instead of serializing the transaction to disk after reaching the
logical_decoding_work_mem limit in memory, we consume the changes we have
in memory and invoke stream API methods added by commit 45fdc9738b.
However, sometimes if we have incomplete toast or speculative insert we
spill to the disk because we can't generate the complete tuple and stream.
And, as soon as we get the complete tuple we stream the transaction
including the serialized changes.
We can do this incremental processing thanks to having assignments
(associating subxact with toplevel xacts) in WAL right away, and
thanks to logging the invalidation messages at each command end. These
features are added by commits 0bead9af48 and c55040ccd0 respectively.
Now that we can stream in-progress transactions, the concurrent aborts
may cause failures when the output plugin consults catalogs (both system
and user-defined).
We handle such failures by returning ERRCODE_TRANSACTION_ROLLBACK
sqlerrcode from system table scan APIs to the backend or WALSender
decoding a specific uncommitted transaction. The decoding logic on the
receipt of such a sqlerrcode aborts the decoding of the current
transaction and continue with the decoding of other transactions.
We have ReorderBufferTXN pointer in each ReorderBufferChange by which we
know which xact it belongs to. The output plugin can use this to decide
which changes to discard in case of stream_abort_cb (e.g. when a subxact
gets discarded).
We also provide a new option via SQL APIs to fetch the changes being
streamed.
Author: Dilip Kumar, Tomas Vondra, Amit Kapila, Nikhil Sontakke
Reviewed-by: Amit Kapila, Kuntal Ghosh, Ajin Cherian
Tested-by: Neha Sharma, Mahendra Singh Thalor and Ajin Cherian
Discussion: https://postgr.es/m/[email protected]
Diffstat (limited to 'src/include/replication/reorderbuffer.h')
-rw-r--r-- | src/include/replication/reorderbuffer.h | 56 |
1 files changed, 54 insertions, 2 deletions
diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h index 42bc8176487..1ae17d5f11f 100644 --- a/src/include/replication/reorderbuffer.h +++ b/src/include/replication/reorderbuffer.h @@ -162,6 +162,9 @@ typedef struct ReorderBufferChange #define RBTXN_HAS_CATALOG_CHANGES 0x0001 #define RBTXN_IS_SUBXACT 0x0002 #define RBTXN_IS_SERIALIZED 0x0004 +#define RBTXN_IS_STREAMED 0x0008 +#define RBTXN_HAS_TOAST_INSERT 0x0010 +#define RBTXN_HAS_SPEC_INSERT 0x0020 /* Does the transaction have catalog changes? */ #define rbtxn_has_catalog_changes(txn) \ @@ -181,6 +184,40 @@ typedef struct ReorderBufferChange ((txn)->txn_flags & RBTXN_IS_SERIALIZED) != 0 \ ) +/* This transaction's changes has toast insert, without main table insert. */ +#define rbtxn_has_toast_insert(txn) \ +( \ + ((txn)->txn_flags & RBTXN_HAS_TOAST_INSERT) != 0 \ +) +/* + * This transaction's changes has speculative insert, without speculative + * confirm. + */ +#define rbtxn_has_spec_insert(txn) \ +( \ + ((txn)->txn_flags & RBTXN_HAS_SPEC_INSERT) != 0 \ +) + +/* Check whether this transaction has an incomplete change. */ +#define rbtxn_has_incomplete_tuple(txn) \ +( \ + rbtxn_has_toast_insert(txn) || rbtxn_has_spec_insert(txn) \ +) + +/* + * Has this transaction been streamed to downstream? + * + * (It's not possible to deduce this from nentries and nentries_mem for + * various reasons. For example, all changes may be in subtransactions in + * which case we'd have nentries==0 for the toplevel one, which would say + * nothing about the streaming. So we maintain this flag, but only for the + * toplevel transaction.) + */ +#define rbtxn_is_streamed(txn) \ +( \ + ((txn)->txn_flags & RBTXN_IS_STREAMED) != 0 \ +) + typedef struct ReorderBufferTXN { /* See above */ @@ -249,6 +286,13 @@ typedef struct ReorderBufferTXN dlist_node base_snapshot_node; /* link in txns_by_base_snapshot_lsn */ /* + * Snapshot/CID from the previous streaming run. Only valid for already + * streamed transactions (NULL/InvalidCommandId otherwise). + */ + Snapshot snapshot_now; + CommandId command_id; + + /* * How many ReorderBufferChange's do we have in this txn. * * Changes in subtransactions are *not* included but tracked separately. @@ -313,6 +357,12 @@ typedef struct ReorderBufferTXN * Size of this transaction (changes currently in memory, in bytes). */ Size size; + + /* Size of top-transaction including sub-transactions. */ + Size total_size; + + /* If we have detected concurrent abort then ignore future changes. */ + bool concurrent_abort; } ReorderBufferTXN; /* so we can define the callbacks used inside struct ReorderBuffer itself */ @@ -484,12 +534,14 @@ void ReorderBufferFree(ReorderBuffer *); ReorderBufferTupleBuf *ReorderBufferGetTupleBuf(ReorderBuffer *, Size tuple_len); void ReorderBufferReturnTupleBuf(ReorderBuffer *, ReorderBufferTupleBuf *tuple); ReorderBufferChange *ReorderBufferGetChange(ReorderBuffer *); -void ReorderBufferReturnChange(ReorderBuffer *, ReorderBufferChange *); +void ReorderBufferReturnChange(ReorderBuffer *, ReorderBufferChange *, bool); Oid *ReorderBufferGetRelids(ReorderBuffer *, int nrelids); void ReorderBufferReturnRelids(ReorderBuffer *, Oid *relids); -void ReorderBufferQueueChange(ReorderBuffer *, TransactionId, XLogRecPtr lsn, ReorderBufferChange *); +void ReorderBufferQueueChange(ReorderBuffer *, TransactionId, + XLogRecPtr lsn, ReorderBufferChange *, + bool toast_insert); void ReorderBufferQueueMessage(ReorderBuffer *, TransactionId, Snapshot snapshot, XLogRecPtr lsn, bool transactional, const char *prefix, Size message_size, const char *message); |