diff options
author | Robert Haas | 2014-03-03 21:32:18 +0000 |
---|---|---|
committer | Robert Haas | 2014-03-03 21:32:18 +0000 |
commit | b89e151054a05f0f6d356ca52e3b725dd0505e53 (patch) | |
tree | 9b9193e808625a381003650ff68b66cdb5f9f46e /src/include | |
parent | de94b47c0a92faeddab5ac980449d3fa877b4a4f (diff) |
Introduce logical decoding.
This feature, building on previous commits, allows the write-ahead log
stream to be decoded into a series of logical changes; that is,
inserts, updates, and deletes and the transactions which contain them.
It is capable of handling decoding even across changes to the schema
of the effected tables. The output format is controlled by a
so-called "output plugin"; an example is included. To make use of
this in a real replication system, the output plugin will need to be
modified to produce output in the format appropriate to that system,
and to perform filtering.
Currently, information can be extracted from the logical decoding
system only via SQL; future commits will add the ability to stream
changes via walsender.
Andres Freund, with review and other contributions from many other
people, including Álvaro Herrera, Abhijit Menon-Sen, Peter Gheogegan,
Kevin Grittner, Robert Haas, Heikki Linnakangas, Fujii Masao, Abhijit
Menon-Sen, Michael Paquier, Simon Riggs, Craig Ringer, and Steve
Singer.
Diffstat (limited to 'src/include')
24 files changed, 897 insertions, 19 deletions
diff --git a/src/include/access/heapam.h b/src/include/access/heapam.h index bfdadc3d5bb..0f802577c70 100644 --- a/src/include/access/heapam.h +++ b/src/include/access/heapam.h @@ -164,8 +164,7 @@ extern void heap_restrpos(HeapScanDesc scan); extern void heap_sync(Relation relation); /* in heap/pruneheap.c */ -extern void heap_page_prune_opt(Relation relation, Buffer buffer, - TransactionId OldestXmin); +extern void heap_page_prune_opt(Relation relation, Buffer buffer); extern int heap_page_prune(Relation relation, Buffer buffer, TransactionId OldestXmin, bool report_stats, TransactionId *latestRemovedXid); diff --git a/src/include/access/heapam_xlog.h b/src/include/access/heapam_xlog.h index d4383ab2cbe..194635952cb 100644 --- a/src/include/access/heapam_xlog.h +++ b/src/include/access/heapam_xlog.h @@ -48,7 +48,7 @@ * the ones above associated with RM_HEAP_ID. XLOG_HEAP_OPMASK applies to * these, too. */ -/* 0x00 is free, was XLOG_HEAP2_FREEZE */ +#define XLOG_HEAP2_REWRITE 0x00 #define XLOG_HEAP2_CLEAN 0x10 #define XLOG_HEAP2_FREEZE_PAGE 0x20 #define XLOG_HEAP2_CLEANUP_INFO 0x30 @@ -332,6 +332,17 @@ typedef struct xl_heap_new_cid xl_heaptid target; } xl_heap_new_cid; +/* logical rewrite xlog record header */ +typedef struct xl_heap_rewrite_mapping +{ + TransactionId mapped_xid; /* xid that might need to see the row */ + Oid mapped_db; /* DbOid or InvalidOid for shared rels */ + Oid mapped_rel; /* Oid of the mapped relation */ + off_t offset; /* How far have we written so far */ + uint32 num_mappings; /* Number of in-memory mappings */ + XLogRecPtr start_lsn; /* Insert LSN at begin of rewrite */ +} xl_heap_rewrite_mapping; + #define SizeOfHeapNewCid (offsetof(xl_heap_new_cid, target) + SizeOfHeapTid) extern void HeapTupleHeaderAdvanceLatestRemovedXid(HeapTupleHeader tuple, @@ -341,6 +352,7 @@ extern void heap_redo(XLogRecPtr lsn, XLogRecord *rptr); extern void heap_desc(StringInfo buf, uint8 xl_info, char *rec); extern void heap2_redo(XLogRecPtr lsn, XLogRecord *rptr); extern void heap2_desc(StringInfo buf, uint8 xl_info, char *rec); +extern void heap_xlog_logical_rewrite(XLogRecPtr lsn, XLogRecord *r); extern XLogRecPtr log_heap_cleanup_info(RelFileNode rnode, TransactionId latestRemovedXid); diff --git a/src/include/access/rewriteheap.h b/src/include/access/rewriteheap.h index d098a0b1711..07df3b4f2b0 100644 --- a/src/include/access/rewriteheap.h +++ b/src/include/access/rewriteheap.h @@ -14,12 +14,14 @@ #define REWRITE_HEAP_H #include "access/htup.h" +#include "storage/itemptr.h" +#include "storage/relfilenode.h" #include "utils/relcache.h" /* struct definition is private to rewriteheap.c */ typedef struct RewriteStateData *RewriteState; -extern RewriteState begin_heap_rewrite(Relation NewHeap, +extern RewriteState begin_heap_rewrite(Relation OldHeap, Relation NewHeap, TransactionId OldestXmin, TransactionId FreezeXid, MultiXactId MultiXactCutoff, bool use_wal); extern void end_heap_rewrite(RewriteState state); @@ -27,4 +29,29 @@ extern void rewrite_heap_tuple(RewriteState state, HeapTuple oldTuple, HeapTuple newTuple); extern bool rewrite_heap_dead_tuple(RewriteState state, HeapTuple oldTuple); +/* + * On-Disk data format for an individual logical rewrite mapping. + */ +typedef struct LogicalRewriteMappingData +{ + RelFileNode old_node; + RelFileNode new_node; + ItemPointerData old_tid; + ItemPointerData new_tid; +} LogicalRewriteMappingData; + +/* --- + * The filename consists out of the following, dash separated, + * components: + * 1) database oid or InvalidOid for shared relations + * 2) the oid of the relation + * 3) xid we are mapping for + * 4) upper 32bit of the LSN at which a rewrite started + * 5) lower 32bit of the LSN at which a rewrite started + * 6) xid of the xact performing the mapping + * --- + */ +#define LOGICAL_REWRITE_FORMAT "map-%x-%x-%X_%X-%x-%x" +void CheckPointLogicalRewriteHeap(void); + #endif /* REWRITE_HEAP_H */ diff --git a/src/include/access/transam.h b/src/include/access/transam.h index 8376dfd669b..a9774e9f593 100644 --- a/src/include/access/transam.h +++ b/src/include/access/transam.h @@ -63,6 +63,11 @@ (AssertMacro(TransactionIdIsNormal(id1) && TransactionIdIsNormal(id2)), \ (int32) ((id1) - (id2)) < 0) +/* compare two XIDs already known to be normal; this is a macro for speed */ +#define NormalTransactionIdFollows(id1, id2) \ + (AssertMacro(TransactionIdIsNormal(id1) && TransactionIdIsNormal(id2)), \ + (int32) ((id1) - (id2)) > 0) + /* ---------- * Object ID (OID) zero is InvalidOid. * diff --git a/src/include/access/tuptoaster.h b/src/include/access/tuptoaster.h index 5adf4f28169..296d016c9fc 100644 --- a/src/include/access/tuptoaster.h +++ b/src/include/access/tuptoaster.h @@ -98,9 +98,34 @@ /* Size of an EXTERNAL datum that contains a standard TOAST pointer */ #define TOAST_POINTER_SIZE (VARHDRSZ_EXTERNAL + sizeof(struct varatt_external)) -/* Size of an indirect datum that contains an indirect TOAST pointer */ +/* Size of an indirect datum that contains a standard TOAST pointer */ #define INDIRECT_POINTER_SIZE (VARHDRSZ_EXTERNAL + sizeof(struct varatt_indirect)) +/* + * Testing whether an externally-stored value is compressed now requires + * comparing extsize (the actual length of the external data) to rawsize + * (the original uncompressed datum's size). The latter includes VARHDRSZ + * overhead, the former doesn't. We never use compression unless it actually + * saves space, so we expect either equality or less-than. + */ +#define VARATT_EXTERNAL_IS_COMPRESSED(toast_pointer) \ + ((toast_pointer).va_extsize < (toast_pointer).va_rawsize - VARHDRSZ) + +/* + * Macro to fetch the possibly-unaligned contents of an EXTERNAL datum + * into a local "struct varatt_external" toast pointer. This should be + * just a memcpy, but some versions of gcc seem to produce broken code + * that assumes the datum contents are aligned. Introducing an explicit + * intermediate "varattrib_1b_e *" variable seems to fix it. + */ +#define VARATT_EXTERNAL_GET_POINTER(toast_pointer, attr) \ +do { \ + varattrib_1b_e *attre = (varattrib_1b_e *) (attr); \ + Assert(VARATT_IS_EXTERNAL(attre)); \ + Assert(VARSIZE_EXTERNAL(attre) == sizeof(toast_pointer) + VARHDRSZ_EXTERNAL); \ + memcpy(&(toast_pointer), VARDATA_EXTERNAL(attre), sizeof(toast_pointer)); \ +} while (0) + /* ---------- * toast_insert_or_update - * diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h index 11ab2771990..a238292b76e 100644 --- a/src/include/access/xlog.h +++ b/src/include/access/xlog.h @@ -288,6 +288,7 @@ extern int XLogFileOpen(XLogSegNo segno); extern XLogRecPtr XLogSaveBufferForHint(Buffer buffer, bool buffer_std); extern void CheckXLogRemoved(XLogSegNo segno, TimeLineID tli); +extern XLogSegNo XLogGetLastRemovedSegno(void); extern void XLogSetAsyncXactLSN(XLogRecPtr record); extern void XLogSetReplicationSlotMinimumLSN(XLogRecPtr lsn); diff --git a/src/include/catalog/catversion.h b/src/include/catalog/catversion.h index 80560574bf5..22dd0fc58e8 100644 --- a/src/include/catalog/catversion.h +++ b/src/include/catalog/catversion.h @@ -53,6 +53,6 @@ */ /* yyyymmddN */ -#define CATALOG_VERSION_NO 201403031 +#define CATALOG_VERSION_NO 201403032 #endif diff --git a/src/include/catalog/pg_proc.h b/src/include/catalog/pg_proc.h index 7a11721ba44..c5706518722 100644 --- a/src/include/catalog/pg_proc.h +++ b/src/include/catalog/pg_proc.h @@ -4804,8 +4804,18 @@ DATA(insert OID = 3779 ( pg_create_physical_replication_slot PGNSP PGUID 12 1 0 DESCR("create a physical replication slot"); DATA(insert OID = 3780 ( pg_drop_replication_slot PGNSP PGUID 12 1 0 0 0 f f f f f f v 1 0 2278 "19" _null_ _null_ _null_ _null_ pg_drop_replication_slot _null_ _null_ _null_ )); DESCR("drop a replication slot"); -DATA(insert OID = 3781 ( pg_get_replication_slots PGNSP PGUID 12 1 10 0 0 f f f f f t s 0 0 2249 "" "{19,25,26,16,28,3220}" "{o,o,o,o,o,o}" "{slot_name,slot_type,datoid,active,xmin,restart_lsn}" _null_ pg_get_replication_slots _null_ _null_ _null_ )); +DATA(insert OID = 3781 ( pg_get_replication_slots PGNSP PGUID 12 1 10 0 0 f f f f f t s 0 0 2249 "" "{19,19,25,26,16,28,28,3220}" "{o,o,o,o,o,o,o,o}" "{slot_name,plugin,slot_type,datoid,active,xmin,catalog_xmin,restart_lsn}" _null_ pg_get_replication_slots _null_ _null_ _null_ )); DESCR("information about replication slots currently in use"); +DATA(insert OID = 3786 ( pg_create_logical_replication_slot PGNSP PGUID 12 1 0 0 0 f f f f f f v 2 0 2249 "19 19" "{19,19,25,3220}" "{i,i,o,o}" "{slotname,plugin,slotname,xlog_position}" _null_ pg_create_logical_replication_slot _null_ _null_ _null_ )); +DESCR("set up a logical replication slot"); +DATA(insert OID = 3782 ( pg_logical_slot_get_changes PGNSP PGUID 12 1000 1000 25 0 f f f f f t v 4 0 2249 "19 3220 23 1009" "{19,3220,23,1009,3220,28,25}" "{i,i,i,v,o,o,o}" "{slotname,upto_lsn,upto_nchanges,options,location,xid,data}" _null_ pg_logical_slot_get_changes _null_ _null_ _null_ )); +DESCR("get changes from replication slot"); +DATA(insert OID = 3783 ( pg_logical_slot_get_binary_changes PGNSP PGUID 12 1000 1000 25 0 f f f f f t v 4 0 2249 "19 3220 23 1009" "{19,3220,23,1009,3220,28,17}" "{i,i,i,v,o,o,o}" "{slotname,upto_lsn,upto_nchanges,options,location,xid,data}" _null_ pg_logical_slot_get_binary_changes _null_ _null_ _null_ )); +DESCR("get binary changes from replication slot"); +DATA(insert OID = 3784 ( pg_logical_slot_peek_changes PGNSP PGUID 12 1000 1000 25 0 f f f f f t v 4 0 2249 "19 3220 23 1009" "{19,3220,23,1009,3220,28,25}" "{i,i,i,v,o,o,o}" "{slotname,upto_lsn,upto_nchanges,options,location,xid,data}" _null_ pg_logical_slot_peek_changes _null_ _null_ _null_ )); +DESCR("peek at changes from replication slot"); +DATA(insert OID = 3785 ( pg_logical_slot_peek_binary_changes PGNSP PGUID 12 1000 1000 25 0 f f f f f t v 4 0 2249 "19 3220 23 1009" "{19,3220,23,1009,3220,28,17}" "{i,i,i,v,o,o,o}" "{slotname,upto_lsn,upto_nchanges,options,location,xid,data}" _null_ pg_logical_slot_peek_binary_changes _null_ _null_ _null_ )); +DESCR("peek at binary changes from replication slot"); /* event triggers */ DATA(insert OID = 3566 ( pg_event_trigger_dropped_objects PGNSP PGUID 12 10 100 0 0 f f f f t t s 0 0 2249 "" "{26,26,23,25,25,25,25}" "{o,o,o,o,o,o,o}" "{classid, objid, objsubid, object_type, schema_name, object_name, object_identity}" _null_ pg_event_trigger_dropped_objects _null_ _null_ _null_ )); diff --git a/src/include/commands/vacuum.h b/src/include/commands/vacuum.h index 70350e02cb2..058dc5f6675 100644 --- a/src/include/commands/vacuum.h +++ b/src/include/commands/vacuum.h @@ -157,10 +157,10 @@ extern void vac_update_relstats(Relation relation, bool hasindex, TransactionId frozenxid, MultiXactId minmulti); -extern void vacuum_set_xid_limits(int freeze_min_age, int freeze_table_age, +extern void vacuum_set_xid_limits(Relation rel, + int freeze_min_age, int freeze_table_age, int multixact_freeze_min_age, int multixact_freeze_table_age, - bool sharedRel, TransactionId *oldestXmin, TransactionId *freezeLimit, TransactionId *xidFullScanLimit, diff --git a/src/include/replication/decode.h b/src/include/replication/decode.h new file mode 100644 index 00000000000..7f55d789a23 --- /dev/null +++ b/src/include/replication/decode.h @@ -0,0 +1,19 @@ +/*------------------------------------------------------------------------- + * decode.h + * PostgreSQL WAL to logical transformation + * + * Portions Copyright (c) 2012-2014, PostgreSQL Global Development Group + * + *------------------------------------------------------------------------- + */ +#ifndef DECODE_H +#define DECODE_H + +#include "access/xlogreader.h" +#include "replication/reorderbuffer.h" +#include "replication/logical.h" + +void LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, + XLogRecord *record); + +#endif diff --git a/src/include/replication/logical.h b/src/include/replication/logical.h new file mode 100644 index 00000000000..e65c8b8075f --- /dev/null +++ b/src/include/replication/logical.h @@ -0,0 +1,100 @@ +/*------------------------------------------------------------------------- + * logical.h + * PostgreSQL logical decoding coordination + * + * Copyright (c) 2012-2014, PostgreSQL Global Development Group + * + *------------------------------------------------------------------------- + */ +#ifndef LOGICAL_H +#define LOGICAL_H + +#include "replication/slot.h" + +#include "access/xlog.h" +#include "access/xlogreader.h" +#include "replication/output_plugin.h" + +struct LogicalDecodingContext; + +typedef void (*LogicalOutputPluginWriterWrite) ( + struct LogicalDecodingContext *lr, + XLogRecPtr Ptr, + TransactionId xid, + bool last_write +); + +typedef LogicalOutputPluginWriterWrite LogicalOutputPluginWriterPrepareWrite; + +typedef struct LogicalDecodingContext +{ + /* memory context this is all allocated in */ + MemoryContext context; + + /* infrastructure pieces */ + XLogReaderState *reader; + ReplicationSlot *slot; + struct ReorderBuffer *reorder; + struct SnapBuild *snapshot_builder; + + OutputPluginCallbacks callbacks; + OutputPluginOptions options; + + /* + * User specified options + */ + List *output_plugin_options; + + /* + * User-Provided callback for writing/streaming out data. + */ + LogicalOutputPluginWriterPrepareWrite prepare_write; + LogicalOutputPluginWriterWrite write; + + /* + * Output buffer. + */ + StringInfo out; + + /* + * Private data pointer of the output plugin. + */ + void *output_plugin_private; + + /* + * Private data pointer for the data writer. + */ + void *output_writer_private; + + /* + * State for writing output. + */ + bool accept_writes; + bool prepared_write; + XLogRecPtr write_location; + TransactionId write_xid; +} LogicalDecodingContext; + +extern void CheckLogicalDecodingRequirements(void); + +extern LogicalDecodingContext *CreateInitDecodingContext(char *plugin, + List *output_plugin_options, + XLogPageReadCB read_page, + LogicalOutputPluginWriterPrepareWrite prepare_write, + LogicalOutputPluginWriterWrite do_write); +extern LogicalDecodingContext *CreateDecodingContext( + XLogRecPtr start_lsn, + List *output_plugin_options, + XLogPageReadCB read_page, + LogicalOutputPluginWriterPrepareWrite prepare_write, + LogicalOutputPluginWriterWrite do_write); +extern void DecodingContextFindStartpoint(LogicalDecodingContext *ctx); +extern bool DecodingContextReady(LogicalDecodingContext *ctx); +extern void FreeDecodingContext(LogicalDecodingContext *ctx); + +extern void LogicalIncreaseXminForSlot(XLogRecPtr lsn, TransactionId xmin); +extern void LogicalIncreaseRestartDecodingForSlot(XLogRecPtr current_lsn, + XLogRecPtr restart_lsn); +extern void LogicalConfirmReceivedLocation(XLogRecPtr lsn); + +#endif diff --git a/src/include/replication/logicalfuncs.h b/src/include/replication/logicalfuncs.h new file mode 100644 index 00000000000..21bf44ec4b7 --- /dev/null +++ b/src/include/replication/logicalfuncs.h @@ -0,0 +1,24 @@ +/*------------------------------------------------------------------------- + * logicalfuncs.h + * PostgreSQL WAL to logical transformation support functions + * + * Copyright (c) 2012-2014, PostgreSQL Global Development Group + * + *------------------------------------------------------------------------- + */ +#ifndef LOGICALFUNCS_H +#define LOGICALFUNCS_H + +#include "replication/logical.h" + +extern int logical_read_local_xlog_page(XLogReaderState *state, + XLogRecPtr targetPagePtr, + int reqLen, XLogRecPtr targetRecPtr, + char *cur_page, TimeLineID *pageTLI); + +extern Datum pg_logical_slot_get_changes(PG_FUNCTION_ARGS); +extern Datum pg_logical_slot_get_binary_changes(PG_FUNCTION_ARGS); +extern Datum pg_logical_slot_peek_changes(PG_FUNCTION_ARGS); +extern Datum pg_logical_slot_peek_binary_changes(PG_FUNCTION_ARGS); + +#endif diff --git a/src/include/replication/output_plugin.h b/src/include/replication/output_plugin.h new file mode 100644 index 00000000000..c47c24c8dbe --- /dev/null +++ b/src/include/replication/output_plugin.h @@ -0,0 +1,98 @@ +/*------------------------------------------------------------------------- + * output_plugin.h + * PostgreSQL Logical Decode Plugin Interface + * + * Copyright (c) 2012-2014, PostgreSQL Global Development Group + * + *------------------------------------------------------------------------- + */ +#ifndef OUTPUT_PLUGIN_H +#define OUTPUT_PLUGIN_H + +#include "replication/reorderbuffer.h" + +struct LogicalDecodingContext; +struct OutputPluginCallbacks; + +typedef enum OutputPluginOutputType +{ + OUTPUT_PLUGIN_BINARY_OUTPUT, + OUTPUT_PLUGIN_TEXTUAL_OUTPUT +} OutputPluginOutputType; + +/* + * Options set by the output plugin, in the startup callback. + */ +typedef struct OutputPluginOptions +{ + OutputPluginOutputType output_type; +} OutputPluginOptions; + +/* + * Type of the shared library symbol _PG_output_plugin_init that is looked up + * when loading an output plugin shared library. + */ +typedef void (*LogicalOutputPluginInit)(struct OutputPluginCallbacks *cb); + +/* + * Callback that gets called in a user-defined plugin. ctx->private_data can + * be set to some private data. + * + * "is_init" will be set to "true" if the decoding slot just got defined. When + * the same slot is used from there one, it will be "false". + */ +typedef void (*LogicalDecodeStartupCB) ( + struct LogicalDecodingContext *ctx, + OutputPluginOptions *options, + bool is_init +); + +/* + * Callback called for every (explicit or implicit) BEGIN of a successful + * transaction. + */ +typedef void (*LogicalDecodeBeginCB) ( + struct LogicalDecodingContext *, + ReorderBufferTXN *txn); + +/* + * Callback for every individual change in a successful transaction. + */ +typedef void (*LogicalDecodeChangeCB) ( + struct LogicalDecodingContext *, + ReorderBufferTXN *txn, + Relation relation, + ReorderBufferChange *change +); + +/* + * Called for every (explicit or implicit) COMMIT of a successful transaction. + */ +typedef void (*LogicalDecodeCommitCB) ( + struct LogicalDecodingContext *, + ReorderBufferTXN *txn, + XLogRecPtr commit_lsn); + +/* + * Called to shutdown an output plugin. + */ +typedef void (*LogicalDecodeShutdownCB) ( + struct LogicalDecodingContext * +); + +/* + * Output plugin callbacks + */ +typedef struct OutputPluginCallbacks +{ + LogicalDecodeStartupCB startup_cb; + LogicalDecodeBeginCB begin_cb; + LogicalDecodeChangeCB change_cb; + LogicalDecodeCommitCB commit_cb; + LogicalDecodeShutdownCB shutdown_cb; +} OutputPluginCallbacks; + +void OutputPluginPrepareWrite(struct LogicalDecodingContext *ctx, bool last_write); +void OutputPluginWrite(struct LogicalDecodingContext *ctx, bool last_write); + +#endif /* OUTPUT_PLUGIN_H */ diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h new file mode 100644 index 00000000000..01eabfb7be7 --- /dev/null +++ b/src/include/replication/reorderbuffer.h @@ -0,0 +1,351 @@ +/* + * reorderbuffer.h + * PostgreSQL logical replay/reorder buffer management. + * + * Copyright (c) 2012-2014, PostgreSQL Global Development Group + * + * src/include/replication/reorderbuffer.h + */ +#ifndef REORDERBUFFER_H +#define REORDERBUFFER_H + +#include "access/htup_details.h" + +#include "lib/ilist.h" + +#include "storage/sinval.h" + +#include "utils/hsearch.h" +#include "utils/rel.h" +#include "utils/snapshot.h" +#include "utils/timestamp.h" + +/* an individual tuple, stored in one chunk of memory */ +typedef struct ReorderBufferTupleBuf +{ + /* position in preallocated list */ + slist_node node; + + /* tuple, stored sequentially */ + HeapTupleData tuple; + HeapTupleHeaderData header; + char data[MaxHeapTupleSize]; +} ReorderBufferTupleBuf; + +/* types of the change passed to a 'change' callback */ +enum ReorderBufferChangeType +{ + REORDER_BUFFER_CHANGE_INSERT, + REORDER_BUFFER_CHANGE_UPDATE, + REORDER_BUFFER_CHANGE_DELETE +}; + +/* + * a single 'change', can be an insert (with one tuple), an update (old, new), + * or a delete (old). + * + * The same struct is also used internally for other purposes but that should + * never be visible outside reorderbuffer.c. + */ +typedef struct ReorderBufferChange +{ + XLogRecPtr lsn; + + /* type of change */ + union + { + enum ReorderBufferChangeType action; + /* do not leak internal enum values to the outside */ + int action_internal; + }; + + /* + * Context data for the change, which part of the union is valid depends + * on action/action_internal. + */ + union + { + /* old, new tuples when action == *_INSERT|UPDATE|DELETE */ + struct + { + /* relation that has been changed */ + RelFileNode relnode; + /* valid for DELETE || UPDATE */ + ReorderBufferTupleBuf *oldtuple; + /* valid for INSERT || UPDATE */ + ReorderBufferTupleBuf *newtuple; + } tp; + + /* new snapshot */ + Snapshot snapshot; + + /* new command id for existing snapshot in a catalog changing tx */ + CommandId command_id; + + /* new cid mapping for catalog changing transaction */ + struct + { + RelFileNode node; + ItemPointerData tid; + CommandId cmin; + CommandId cmax; + CommandId combocid; + } tuplecid; + }; + + /* + * While in use this is how a change is linked into a transactions, + * otherwise it's the preallocated list. + */ + dlist_node node; +} ReorderBufferChange; + +typedef struct ReorderBufferTXN +{ + /* + * The transactions transaction id, can be a toplevel or sub xid. + */ + TransactionId xid; + + /* did the TX have catalog changes */ + bool has_catalog_changes; + + /* + * Do we know this is a subxact? + */ + bool is_known_as_subxact; + + /* + * LSN of the first data carrying, WAL record with knowledge about this + * xid. This is allowed to *not* be first record adorned with this xid, if + * the previous records aren't relevant for logical decoding. + */ + XLogRecPtr first_lsn; + + /* ---- + * LSN of the record that lead to this xact to be committed or + * aborted. This can be a + * * plain commit record + * * plain commit record, of a parent transaction + * * prepared transaction commit + * * plain abort record + * * prepared transaction abort + * * error during decoding + * ---- + */ + XLogRecPtr final_lsn; + + /* + * LSN pointing to the end of the commit record + 1. + */ + XLogRecPtr end_lsn; + + /* + * LSN of the last lsn at which snapshot information reside, so we can + * restart decoding from there and fully recover this transaction from + * WAL. + */ + XLogRecPtr restart_decoding_lsn; + + /* + * Commit time, only known when we read the actual commit record. + */ + TimestampTz commit_time; + + /* + * Base snapshot or NULL. + */ + Snapshot base_snapshot; + XLogRecPtr base_snapshot_lsn; + + /* + * How many ReorderBufferChange's do we have in this txn. + * + * Changes in subtransactions are *not* included but tracked separately. + */ + uint64 nentries; + + /* + * How many of the above entries are stored in memory in contrast to being + * spilled to disk. + */ + uint64 nentries_mem; + + /* + * List of ReorderBufferChange structs, including new Snapshots and new + * CommandIds + */ + dlist_head changes; + + /* + * List of (relation, ctid) => (cmin, cmax) mappings for catalog tuples. + * Those are always assigned to the toplevel transaction. (Keep track of + * #entries to create a hash of the right size) + */ + dlist_head tuplecids; + uint64 ntuplecids; + + /* + * On-demand built hash for looking up the above values. + */ + HTAB *tuplecid_hash; + + /* + * Hash containing (potentially partial) toast entries. NULL if no toast + * tuples have been found for the current change. + */ + HTAB *toast_hash; + + /* + * non-hierarchical list of subtransactions that are *not* aborted. Only + * used in toplevel transactions. + */ + dlist_head subtxns; + uint32 nsubtxns; + + /* + * Stored cache invalidations. This is not a linked list because we get + * all the invalidations at once. + */ + uint32 ninvalidations; + SharedInvalidationMessage *invalidations; + + /* --- + * Position in one of three lists: + * * list of subtransactions if we are *known* to be subxact + * * list of toplevel xacts (can be a as-yet unknown subxact) + * * list of preallocated ReorderBufferTXNs + * --- + */ + dlist_node node; + +} ReorderBufferTXN; + +/* so we can define the callbacks used inside struct ReorderBuffer itself */ +typedef struct ReorderBuffer ReorderBuffer; + +/* change callback signature */ +typedef void (*ReorderBufferApplyChangeCB) ( + ReorderBuffer *rb, + ReorderBufferTXN *txn, + Relation relation, + ReorderBufferChange *change); + +/* begin callback signature */ +typedef void (*ReorderBufferBeginCB) ( + ReorderBuffer *rb, + ReorderBufferTXN *txn); + +/* commit callback signature */ +typedef void (*ReorderBufferCommitCB) ( + ReorderBuffer *rb, + ReorderBufferTXN *txn, + XLogRecPtr commit_lsn); + +struct ReorderBuffer +{ + /* + * xid => ReorderBufferTXN lookup table + */ + HTAB *by_txn; + + /* + * Transactions that could be a toplevel xact, ordered by LSN of the first + * record bearing that xid.. + */ + dlist_head toplevel_by_lsn; + + /* + * one-entry sized cache for by_txn. Very frequently the same txn gets + * looked up over and over again. + */ + TransactionId by_txn_last_xid; + ReorderBufferTXN *by_txn_last_txn; + + /* + * Callacks to be called when a transactions commits. + */ + ReorderBufferBeginCB begin; + ReorderBufferApplyChangeCB apply_change; + ReorderBufferCommitCB commit; + + /* + * Pointer that will be passed untouched to the callbacks. + */ + void *private_data; + + /* + * Private memory context. + */ + MemoryContext context; + + /* + * Data structure slab cache. + * + * We allocate/deallocate some structures very frequently, to avoid bigger + * overhead we cache some unused ones here. + * + * The maximum number of cached entries is controlled by const variables + * ontop of reorderbuffer.c + */ + + /* cached ReorderBufferTXNs */ + dlist_head cached_transactions; + Size nr_cached_transactions; + + /* cached ReorderBufferChanges */ + dlist_head cached_changes; + Size nr_cached_changes; + + /* cached ReorderBufferTupleBufs */ + slist_head cached_tuplebufs; + Size nr_cached_tuplebufs; + + XLogRecPtr current_restart_decoding_lsn; + + /* buffer for disk<->memory conversions */ + char *outbuf; + Size outbufsize; +}; + + +ReorderBuffer *ReorderBufferAllocate(void); +void ReorderBufferFree(ReorderBuffer *); + +ReorderBufferTupleBuf *ReorderBufferGetTupleBuf(ReorderBuffer *); +void ReorderBufferReturnTupleBuf(ReorderBuffer *, ReorderBufferTupleBuf *tuple); +ReorderBufferChange *ReorderBufferGetChange(ReorderBuffer *); +void ReorderBufferReturnChange(ReorderBuffer *, ReorderBufferChange *); + +void ReorderBufferQueueChange(ReorderBuffer *, TransactionId, XLogRecPtr lsn, ReorderBufferChange *); +void ReorderBufferCommit(ReorderBuffer *, TransactionId, + XLogRecPtr commit_lsn, XLogRecPtr end_lsn, + TimestampTz commit_time); +void ReorderBufferAssignChild(ReorderBuffer *, TransactionId, TransactionId, XLogRecPtr commit_lsn); +void ReorderBufferCommitChild(ReorderBuffer *, TransactionId, TransactionId, + XLogRecPtr commit_lsn, XLogRecPtr end_lsn); +void ReorderBufferAbort(ReorderBuffer *, TransactionId, XLogRecPtr lsn); +void ReorderBufferAbortOld(ReorderBuffer *, TransactionId xid); +void ReorderBufferForget(ReorderBuffer *, TransactionId, XLogRecPtr lsn); + +void ReorderBufferSetBaseSnapshot(ReorderBuffer *, TransactionId, XLogRecPtr lsn, struct SnapshotData *snap); +void ReorderBufferAddSnapshot(ReorderBuffer *, TransactionId, XLogRecPtr lsn, struct SnapshotData *snap); +void ReorderBufferAddNewCommandId(ReorderBuffer *, TransactionId, XLogRecPtr lsn, + CommandId cid); +void ReorderBufferAddNewTupleCids(ReorderBuffer *, TransactionId, XLogRecPtr lsn, + RelFileNode node, ItemPointerData pt, + CommandId cmin, CommandId cmax, CommandId combocid); +void ReorderBufferAddInvalidations(ReorderBuffer *, TransactionId, XLogRecPtr lsn, + Size nmsgs, SharedInvalidationMessage *msgs); +bool ReorderBufferIsXidKnown(ReorderBuffer *, TransactionId xid); +void ReorderBufferXidSetCatalogChanges(ReorderBuffer *, TransactionId xid, XLogRecPtr lsn); +bool ReorderBufferXidHasCatalogChanges(ReorderBuffer *, TransactionId xid); +bool ReorderBufferXidHasBaseSnapshot(ReorderBuffer *, TransactionId xid); + +ReorderBufferTXN *ReorderBufferGetOldestTXN(ReorderBuffer *); + +void ReorderBufferSetRestartPoint(ReorderBuffer *, XLogRecPtr ptr); + +void StartupReorderBuffer(void); + +#endif diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h index 089b0f4b70c..c354c9133bf 100644 --- a/src/include/replication/slot.h +++ b/src/include/replication/slot.h @@ -16,6 +16,24 @@ #include "storage/shmem.h" #include "storage/spin.h" +/* + * Behaviour of replication slots, upon release or crash. + * + * Slots marked as PERSISTENT are crashsafe and will not be dropped when + * released. Slots marked as EPHEMERAL will be dropped when released or after + * restarts. + * + * EPHEMERAL slots can be made PERSISTENT by calling ReplicationSlotPersist(). + */ +typedef enum ReplicationSlotPersistency +{ + RS_PERSISTENT, + RS_EPHEMERAL +} ReplicationSlotPersistency; + +/* + * On-Disk data of a replication slot, preserved across restarts. + */ typedef struct ReplicationSlotPersistentData { /* The slot's identifier */ @@ -25,6 +43,11 @@ typedef struct ReplicationSlotPersistentData Oid database; /* + * The slot's behaviour when being dropped (or restored after a crash). + */ + ReplicationSlotPersistency persistency; + + /* * xmin horizon for data * * NB: This may represent a value that hasn't been written to disk yet; @@ -32,9 +55,22 @@ typedef struct ReplicationSlotPersistentData */ TransactionId xmin; + /* + * xmin horizon for catalog tuples + * + * NB: This may represent a value that hasn't been written to disk yet; + * see notes for effective_xmin, below. + */ + TransactionId catalog_xmin; + /* oldest LSN that might be required by this replication slot */ XLogRecPtr restart_lsn; + /* oldest LSN that the client has acked receipt for */ + XLogRecPtr confirmed_flush; + + /* plugin name */ + NameData plugin; } ReplicationSlotPersistentData; /* @@ -67,12 +103,26 @@ typedef struct ReplicationSlot * same as the persistent value (data.xmin). */ TransactionId effective_xmin; + TransactionId effective_catalog_xmin; /* data surviving shutdowns and crashes */ ReplicationSlotPersistentData data; /* is somebody performing io on this slot? */ LWLock *io_in_progress_lock; + + /* all the remaining data is only used for logical slots */ + + /* ---- + * When the client has confirmed flushes >= candidate_xmin_lsn we can + * advance the catalog xmin, when restart_valid has been passed, + * restart_lsn can be increased. + * ---- + */ + TransactionId candidate_catalog_xmin; + XLogRecPtr candidate_xmin_lsn; + XLogRecPtr candidate_restart_valid; + XLogRecPtr candidate_restart_lsn; } ReplicationSlot; /* @@ -97,8 +147,11 @@ extern Size ReplicationSlotsShmemSize(void); extern void ReplicationSlotsShmemInit(void); /* management of individual slots */ -extern void ReplicationSlotCreate(const char *name, bool db_specific); +extern void ReplicationSlotCreate(const char *name, bool db_specific, + ReplicationSlotPersistency p); +extern void ReplicationSlotPersist(void); extern void ReplicationSlotDrop(const char *name); + extern void ReplicationSlotAcquire(const char *name); extern void ReplicationSlotRelease(void); extern void ReplicationSlotSave(void); @@ -106,15 +159,20 @@ extern void ReplicationSlotMarkDirty(void); /* misc stuff */ extern bool ReplicationSlotValidateName(const char *name, int elevel); -extern void ReplicationSlotsComputeRequiredXmin(void); +extern void ReplicationSlotsComputeRequiredXmin(bool already_locked); extern void ReplicationSlotsComputeRequiredLSN(void); +extern XLogRecPtr ReplicationSlotsComputeLogicalRestartLSN(void); +extern bool ReplicationSlotsCountDBSlots(Oid dboid, int *nslots, int *nactive); + extern void StartupReplicationSlots(XLogRecPtr checkPointRedo); extern void CheckPointReplicationSlots(void); extern void CheckSlotRequirements(void); -extern void ReplicationSlotAtProcExit(void); /* SQL callable functions */ +extern Datum pg_create_physical_replication_slot(PG_FUNCTION_ARGS); +extern Datum pg_create_logical_replication_slot(PG_FUNCTION_ARGS); +extern Datum pg_drop_replication_slot(PG_FUNCTION_ARGS); extern Datum pg_get_replication_slots(PG_FUNCTION_ARGS); #endif /* SLOT_H */ diff --git a/src/include/replication/snapbuild.h b/src/include/replication/snapbuild.h new file mode 100644 index 00000000000..087c0e510d5 --- /dev/null +++ b/src/include/replication/snapbuild.h @@ -0,0 +1,83 @@ +/*------------------------------------------------------------------------- + * + * snapbuild.h + * Exports from replication/logical/snapbuild.c. + * + * Copyright (c) 2012-2014, PostgreSQL Global Development Group + * + * src/include/replication/snapbuild.h + * + *------------------------------------------------------------------------- + */ +#ifndef SNAPBUILD_H +#define SNAPBUILD_H + +#include "access/xlogdefs.h" +#include "utils/snapmgr.h" + +typedef enum +{ + /* + * Initial state, we can't do much yet. + */ + SNAPBUILD_START, + + /* + * We have collected enough information to decode tuples in transactions + * that started after this. + * + * Once we reached this we start to collect changes. We cannot apply them + * yet because the might be based on transactions that were still running + * when we reached them yet. + */ + SNAPBUILD_FULL_SNAPSHOT, + + /* + * Found a point after hitting built_full_snapshot where all transactions + * that were running at that point finished. Till we reach that we hold + * off calling any commit callbacks. + */ + SNAPBUILD_CONSISTENT +} SnapBuildState; + +/* forward declare so we don't have to expose the struct to the public */ +struct SnapBuild; +typedef struct SnapBuild SnapBuild; + +/* forward declare so we don't have to include reorderbuffer.h */ +struct ReorderBuffer; + +/* forward declare so we don't have to include heapam_xlog.h */ +struct xl_heap_new_cid; +struct xl_running_xacts; + +extern void CheckPointSnapBuild(void); + +extern SnapBuild *AllocateSnapshotBuilder(struct ReorderBuffer *cache, + TransactionId xmin_horizon, XLogRecPtr start_lsn); +extern void FreeSnapshotBuilder(SnapBuild *cache); + +extern void SnapBuildSnapDecRefcount(Snapshot snap); + +extern const char *SnapBuildExportSnapshot(SnapBuild *snapstate); +extern void SnapBuildClearExportedSnapshot(void); + +extern SnapBuildState SnapBuildCurrentState(SnapBuild *snapstate); + +extern bool SnapBuildXactNeedsSkip(SnapBuild *snapstate, XLogRecPtr ptr); + +extern void SnapBuildCommitTxn(SnapBuild *builder, XLogRecPtr lsn, + TransactionId xid, int nsubxacts, + TransactionId *subxacts); +extern void SnapBuildAbortTxn(SnapBuild *builder, XLogRecPtr lsn, + TransactionId xid, int nsubxacts, + TransactionId *subxacts); +extern bool SnapBuildProcessChange(SnapBuild *builder, TransactionId xid, + XLogRecPtr lsn); +extern void SnapBuildProcessNewCid(SnapBuild *builder, TransactionId xid, + XLogRecPtr lsn, struct xl_heap_new_cid *cid); +extern void SnapBuildProcessRunningXacts(SnapBuild *builder, XLogRecPtr lsn, + struct xl_running_xacts *running); +extern void SnapBuildSerializationPoint(SnapBuild *builder, XLogRecPtr lsn); + +#endif /* SNAPBUILD_H */ diff --git a/src/include/storage/itemptr.h b/src/include/storage/itemptr.h index 67bbdbb988a..0b81d53f5f8 100644 --- a/src/include/storage/itemptr.h +++ b/src/include/storage/itemptr.h @@ -116,6 +116,9 @@ typedef ItemPointerData *ItemPointer; /* * ItemPointerCopy * Copies the contents of one disk item pointer to another. + * + * Should there ever be padding in an ItemPointer this would need to be handled + * differently as it's used as hash key. */ #define ItemPointerCopy(fromPointer, toPointer) \ ( \ diff --git a/src/include/storage/proc.h b/src/include/storage/proc.h index a3cadd9a017..5218b448cd6 100644 --- a/src/include/storage/proc.h +++ b/src/include/storage/proc.h @@ -41,10 +41,12 @@ struct XidCache #define PROC_IS_AUTOVACUUM 0x01 /* is it an autovac worker? */ #define PROC_IN_VACUUM 0x02 /* currently running lazy vacuum */ #define PROC_IN_ANALYZE 0x04 /* currently running analyze */ -#define PROC_VACUUM_FOR_WRAPAROUND 0x08 /* set by autovac only */ +#define PROC_VACUUM_FOR_WRAPAROUND 0x08 /* set by autovac only */ +#define PROC_IN_LOGICAL_DECODING 0x10 /* currently doing logical decoding */ /* flags reset at EOXact */ -#define PROC_VACUUM_STATE_MASK (0x0E) +#define PROC_VACUUM_STATE_MASK \ + (PROC_IN_VACUUM | PROC_IN_ANALYZE | PROC_VACUUM_FOR_WRAPAROUND) /* * We allow a small number of "weak" relation locks (AccesShareLock, diff --git a/src/include/storage/procarray.h b/src/include/storage/procarray.h index d1a58a3661b..d0b4103a09e 100644 --- a/src/include/storage/procarray.h +++ b/src/include/storage/procarray.h @@ -15,6 +15,7 @@ #define PROCARRAY_H #include "storage/standby.h" +#include "utils/relcache.h" #include "utils/snapshot.h" @@ -50,8 +51,9 @@ extern RunningTransactions GetRunningTransactionData(void); extern bool TransactionIdIsInProgress(TransactionId xid); extern bool TransactionIdIsActive(TransactionId xid); -extern TransactionId GetOldestXmin(bool allDbs, bool ignoreVacuum); +extern TransactionId GetOldestXmin(Relation rel, bool ignoreVacuum); extern TransactionId GetOldestActiveTransactionId(void); +extern TransactionId GetOldestSafeDecodingTransactionId(void); extern VirtualTransactionId *GetVirtualXIDsDelayingChkpt(int *nvxids); extern bool HaveVirtualXIDsDelayingChkpt(VirtualTransactionId *vxids, int nvxids); @@ -77,6 +79,10 @@ extern void XidCacheRemoveRunningXids(TransactionId xid, int nxids, const TransactionId *xids, TransactionId latestXid); -extern void ProcArraySetReplicationSlotXmin(TransactionId xmin); +extern void ProcArraySetReplicationSlotXmin(TransactionId xmin, + TransactionId catalog_xmin, bool already_locked); + +extern void ProcArrayGetReplicationSlotXmin(TransactionId *xmin, + TransactionId *catalog_xmin); #endif /* PROCARRAY_H */ diff --git a/src/include/storage/sinval.h b/src/include/storage/sinval.h index 0cae810f49e..d5bb850337d 100644 --- a/src/include/storage/sinval.h +++ b/src/include/storage/sinval.h @@ -147,4 +147,6 @@ extern void ProcessCommittedInvalidationMessages(SharedInvalidationMessage *msgs int nmsgs, bool RelcacheInitFileInval, Oid dbid, Oid tsid); +extern void LocalExecuteInvalidationMessage(SharedInvalidationMessage *msg); + #endif /* SINVAL_H */ diff --git a/src/include/utils/inval.h b/src/include/utils/inval.h index c1409c863db..6156e0219d0 100644 --- a/src/include/utils/inval.h +++ b/src/include/utils/inval.h @@ -64,4 +64,5 @@ extern void CacheRegisterRelcacheCallback(RelcacheCallbackFunction func, extern void CallSyscacheCallbacks(int cacheid, uint32 hashvalue); +extern void InvalidateSystemCaches(void); #endif /* INVAL_H */ diff --git a/src/include/utils/snapmgr.h b/src/include/utils/snapmgr.h index c601770ec99..abe7016d040 100644 --- a/src/include/utils/snapmgr.h +++ b/src/include/utils/snapmgr.h @@ -23,12 +23,14 @@ extern bool FirstSnapshotSet; extern TransactionId TransactionXmin; extern TransactionId RecentXmin; extern TransactionId RecentGlobalXmin; +extern TransactionId RecentGlobalDataXmin; extern Snapshot GetTransactionSnapshot(void); extern Snapshot GetLatestSnapshot(void); extern void SnapshotSetCommandId(CommandId curcid); extern Snapshot GetCatalogSnapshot(Oid relid); +extern Snapshot GetNonHistoricCatalogSnapshot(Oid relid); extern void InvalidateCatalogSnapshot(void); extern void PushActiveSnapshot(Snapshot snapshot); @@ -53,4 +55,13 @@ extern bool XactHasExportedSnapshots(void); extern void DeleteAllExportedSnapshotFiles(void); extern bool ThereAreNoPriorRegisteredSnapshots(void); +extern char *ExportSnapshot(Snapshot snapshot); + +/* Support for catalog timetravel for logical decoding */ +struct HTAB; +extern struct HTAB *HistoricSnapshotGetTupleCids(void); +extern void SetupHistoricSnapshot(Snapshot snapshot_now, struct HTAB *tuplecids); +extern void TeardownHistoricSnapshot(bool is_error); +extern bool HistoricSnapshotActive(void); + #endif /* SNAPMGR_H */ diff --git a/src/include/utils/snapshot.h b/src/include/utils/snapshot.h index c542238825a..4b256074b0b 100644 --- a/src/include/utils/snapshot.h +++ b/src/include/utils/snapshot.h @@ -30,6 +30,22 @@ typedef struct SnapshotData *Snapshot; typedef bool (*SnapshotSatisfiesFunc) (HeapTuple htup, Snapshot snapshot, Buffer buffer); +/* + * Struct representing all kind of possible snapshots. + * + * There are several different kinds of snapshots: + * * Normal MVCC snapshots + * * MVCC snapshots taken during recovery (in Hot-Standby mode) + * * Historic MVCC snapshots used during logical decoding + * * snapshots passed to HeapTupleSatisfiesDirty() + * * snapshots used for SatisfiesAny, Toast, Self where no members are + * accessed. + * + * TODO: It's probably a good idea to split this struct using a NodeTag + * similar to how parser and executor nodes are handled, with one type for + * each different kind of snapshot to avoid overloading the meaning of + * individual fields. + */ typedef struct SnapshotData { SnapshotSatisfiesFunc satisfies; /* tuple test function */ @@ -46,11 +62,23 @@ typedef struct SnapshotData */ TransactionId xmin; /* all XID < xmin are visible to me */ TransactionId xmax; /* all XID >= xmax are invisible to me */ - TransactionId *xip; /* array of xact IDs in progress */ + /* + * For normal MVCC snapshot this contains the all xact IDs that are in + * progress, unless the snapshot was taken during recovery in which case + * it's empty. For historic MVCC snapshots, the meaning is inverted, + * i.e. it contains *committed* transactions between xmin and xmax. + */ + TransactionId *xip; uint32 xcnt; /* # of xact ids in xip[] */ /* note: all ids in xip[] satisfy xmin <= xip[i] < xmax */ int32 subxcnt; /* # of xact ids in subxip[] */ - TransactionId *subxip; /* array of subxact IDs in progress */ + /* + * For non-historic MVCC snapshots, this contains subxact IDs that are in + * progress (and other transactions that are in progress if taken during + * recovery). For historic snapshot it contains *all* xids assigned to the + * replayed transaction, including the toplevel xid. + */ + TransactionId *subxip; bool suboverflowed; /* has the subxip array overflowed? */ bool takenDuringRecovery; /* recovery-shaped snapshot? */ bool copied; /* false if it's a static snapshot */ diff --git a/src/include/utils/tqual.h b/src/include/utils/tqual.h index e34c28a4f78..48abe62983d 100644 --- a/src/include/utils/tqual.h +++ b/src/include/utils/tqual.h @@ -22,6 +22,7 @@ extern PGDLLIMPORT SnapshotData SnapshotSelfData; extern PGDLLIMPORT SnapshotData SnapshotAnyData; extern PGDLLIMPORT SnapshotData SnapshotToastData; +extern PGDLLIMPORT SnapshotData CatalogSnapshotData; #define SnapshotSelf (&SnapshotSelfData) #define SnapshotAny (&SnapshotAnyData) @@ -37,7 +38,8 @@ extern PGDLLIMPORT SnapshotData SnapshotToastData; /* This macro encodes the knowledge of which snapshots are MVCC-safe */ #define IsMVCCSnapshot(snapshot) \ - ((snapshot)->satisfies == HeapTupleSatisfiesMVCC) + ((snapshot)->satisfies == HeapTupleSatisfiesMVCC || \ + (snapshot)->satisfies == HeapTupleSatisfiesHistoricMVCC) /* * HeapTupleSatisfiesVisibility @@ -73,6 +75,8 @@ extern bool HeapTupleSatisfiesToast(HeapTuple htup, Snapshot snapshot, Buffer buffer); extern bool HeapTupleSatisfiesDirty(HeapTuple htup, Snapshot snapshot, Buffer buffer); +extern bool HeapTupleSatisfiesHistoricMVCC(HeapTuple htup, + Snapshot snapshot, Buffer buffer); /* Special "satisfies" routines with different APIs */ extern HTSU_Result HeapTupleSatisfiesUpdate(HeapTuple htup, @@ -86,4 +90,13 @@ extern void HeapTupleSetHintBits(HeapTupleHeader tuple, Buffer buffer, uint16 infomask, TransactionId xid); extern bool HeapTupleHeaderIsOnlyLocked(HeapTupleHeader tuple); +/* + * To avoid leaking to much knowledge about reorderbuffer implementation + * details this is implemented in reorderbuffer.c not tqual.c. + */ +extern bool ResolveCminCmaxDuringDecoding(struct HTAB *tuplecid_data, + Snapshot snapshot, + HeapTuple htup, + Buffer buffer, + CommandId *cmin, CommandId *cmax); #endif /* TQUAL_H */ |