diff options
author | Andres Freund | 2019-03-11 19:46:41 +0000 |
---|---|---|
committer | Andres Freund | 2019-03-11 19:46:41 +0000 |
commit | c2fe139c201c48f1133e9fbea2dd99b8efe2fadd (patch) | |
tree | ab0a6261b412b8284b6c91af158f72af97e02a35 /src/backend/access/table/tableam.c | |
parent | a47841528107921f02c280e0c5f91c5a1d86adb0 (diff) |
tableam: Add and use scan APIs.
Too allow table accesses to be not directly dependent on heap, several
new abstractions are needed. Specifically:
1) Heap scans need to be generalized into table scans. Do this by
introducing TableScanDesc, which will be the "base class" for
individual AMs. This contains the AM independent fields from
HeapScanDesc.
The previous heap_{beginscan,rescan,endscan} et al. have been
replaced with a table_ version.
There's no direct replacement for heap_getnext(), as that returned
a HeapTuple, which is undesirable for a other AMs. Instead there's
table_scan_getnextslot(). But note that heap_getnext() lives on,
it's still used widely to access catalog tables.
This is achieved by new scan_begin, scan_end, scan_rescan,
scan_getnextslot callbacks.
2) The portion of parallel scans that's shared between backends need
to be able to do so without the user doing per-AM work. To achieve
that new parallelscan_{estimate, initialize, reinitialize}
callbacks are introduced, which operate on a new
ParallelTableScanDesc, which again can be subclassed by AMs.
As it is likely that several AMs are going to be block oriented,
block oriented callbacks that can be shared between such AMs are
provided and used by heap. table_block_parallelscan_{estimate,
intiialize, reinitialize} as callbacks, and
table_block_parallelscan_{nextpage, init} for use in AMs. These
operate on a ParallelBlockTableScanDesc.
3) Index scans need to be able to access tables to return a tuple, and
there needs to be state across individual accesses to the heap to
store state like buffers. That's now handled by introducing a
sort-of-scan IndexFetchTable, which again is intended to be
subclassed by individual AMs (for heap IndexFetchHeap).
The relevant callbacks for an AM are index_fetch_{end, begin,
reset} to create the necessary state, and index_fetch_tuple to
retrieve an indexed tuple. Note that index_fetch_tuple
implementations need to be smarter than just blindly fetching the
tuples for AMs that have optimizations similar to heap's HOT - the
currently alive tuple in the update chain needs to be fetched if
appropriate.
Similar to table_scan_getnextslot(), it's undesirable to continue
to return HeapTuples. Thus index_fetch_heap (might want to rename
that later) now accepts a slot as an argument. Core code doesn't
have a lot of call sites performing index scans without going
through the systable_* API (in contrast to loads of heap_getnext
calls and working directly with HeapTuples).
Index scans now store the result of a search in
IndexScanDesc->xs_heaptid, rather than xs_ctup->t_self. As the
target is not generally a HeapTuple anymore that seems cleaner.
To be able to sensible adapt code to use the above, two further
callbacks have been introduced:
a) slot_callbacks returns a TupleTableSlotOps* suitable for creating
slots capable of holding a tuple of the AMs
type. table_slot_callbacks() and table_slot_create() are based
upon that, but have additional logic to deal with views, foreign
tables, etc.
While this change could have been done separately, nearly all the
call sites that needed to be adapted for the rest of this commit
also would have been needed to be adapted for
table_slot_callbacks(), making separation not worthwhile.
b) tuple_satisfies_snapshot checks whether the tuple in a slot is
currently visible according to a snapshot. That's required as a few
places now don't have a buffer + HeapTuple around, but a
slot (which in heap's case internally has that information).
Additionally a few infrastructure changes were needed:
I) SysScanDesc, as used by systable_{beginscan, getnext} et al. now
internally uses a slot to keep track of tuples. While
systable_getnext() still returns HeapTuples, and will so for the
foreseeable future, the index API (see 1) above) now only deals with
slots.
The remainder, and largest part, of this commit is then adjusting all
scans in postgres to use the new APIs.
Author: Andres Freund, Haribabu Kommi, Alvaro Herrera
Discussion:
https://postgr.es/m/[email protected]
https://postgr.es/m/[email protected]
Diffstat (limited to 'src/backend/access/table/tableam.c')
-rw-r--r-- | src/backend/access/table/tableam.c | 293 |
1 files changed, 292 insertions, 1 deletions
diff --git a/src/backend/access/table/tableam.c b/src/backend/access/table/tableam.c index 84851e4ff88..628d930c130 100644 --- a/src/backend/access/table/tableam.c +++ b/src/backend/access/table/tableam.c @@ -6,13 +6,304 @@ * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group * Portions Copyright (c) 1994, Regents of the University of California * - * src/backend/access/table/tableam.c + * + * IDENTIFICATION + * src/backend/access/table/tableam.c + * + * NOTES + * Note that most function in here are documented in tableam.h, rather than + * here. That's because there's a lot of inline functions in tableam.h and + * it'd be harder to understand if one constantly had to switch between files. + * *---------------------------------------------------------------------- */ #include "postgres.h" +#include "access/heapam.h" /* for ss_* */ #include "access/tableam.h" +#include "access/xact.h" +#include "storage/bufmgr.h" +#include "storage/shmem.h" /* GUC variables */ char *default_table_access_method = DEFAULT_TABLE_ACCESS_METHOD; +bool synchronize_seqscans = true; + + +/* ---------------------------------------------------------------------------- + * Slot functions. + * ---------------------------------------------------------------------------- + */ + +const TupleTableSlotOps * +table_slot_callbacks(Relation relation) +{ + const TupleTableSlotOps *tts_cb; + + if (relation->rd_tableam) + tts_cb = relation->rd_tableam->slot_callbacks(relation); + else if (relation->rd_rel->relkind == RELKIND_FOREIGN_TABLE) + { + /* + * Historically FDWs expect to store heap tuples in slots. Continue + * handing them one, to make it less painful to adapt FDWs to new + * versions. The cost of a heap slot over a virtual slot is pretty + * small. + */ + tts_cb = &TTSOpsHeapTuple; + } + else + { + /* + * These need to be supported, as some parts of the code (like COPY) + * need to create slots for such relations too. It seems better to + * centralize the knowledge that a heap slot is the right thing in + * that case here. + */ + Assert(relation->rd_rel->relkind == RELKIND_VIEW || + relation->rd_rel->relkind == RELKIND_PARTITIONED_TABLE); + tts_cb = &TTSOpsVirtual; + } + + return tts_cb; +} + +TupleTableSlot * +table_slot_create(Relation relation, List **reglist) +{ + const TupleTableSlotOps *tts_cb; + TupleTableSlot *slot; + + tts_cb = table_slot_callbacks(relation); + slot = MakeSingleTupleTableSlot(RelationGetDescr(relation), tts_cb); + + if (reglist) + *reglist = lappend(*reglist, slot); + + return slot; +} + + +/* ---------------------------------------------------------------------------- + * Table scan functions. + * ---------------------------------------------------------------------------- + */ + +TableScanDesc +table_beginscan_catalog(Relation relation, int nkeys, struct ScanKeyData *key) +{ + Oid relid = RelationGetRelid(relation); + Snapshot snapshot = RegisterSnapshot(GetCatalogSnapshot(relid)); + + return relation->rd_tableam->scan_begin(relation, snapshot, nkeys, key, NULL, + true, true, true, false, false, true); +} + +void +table_scan_update_snapshot(TableScanDesc scan, Snapshot snapshot) +{ + Assert(IsMVCCSnapshot(snapshot)); + + RegisterSnapshot(snapshot); + scan->rs_snapshot = snapshot; + scan->rs_temp_snap = true; +} + + +/* ---------------------------------------------------------------------------- + * Parallel table scan related functions. + * ---------------------------------------------------------------------------- + */ + +Size +table_parallelscan_estimate(Relation rel, Snapshot snapshot) +{ + Size sz = 0; + + if (IsMVCCSnapshot(snapshot)) + sz = add_size(sz, EstimateSnapshotSpace(snapshot)); + else + Assert(snapshot == SnapshotAny); + + sz = add_size(sz, rel->rd_tableam->parallelscan_estimate(rel)); + + return sz; +} + +void +table_parallelscan_initialize(Relation rel, ParallelTableScanDesc pscan, + Snapshot snapshot) +{ + Size snapshot_off = rel->rd_tableam->parallelscan_initialize(rel, pscan); + + pscan->phs_snapshot_off = snapshot_off; + + if (IsMVCCSnapshot(snapshot)) + { + SerializeSnapshot(snapshot, (char *) pscan + pscan->phs_snapshot_off); + pscan->phs_snapshot_any = false; + } + else + { + Assert(snapshot == SnapshotAny); + pscan->phs_snapshot_any = true; + } +} + +TableScanDesc +table_beginscan_parallel(Relation relation, ParallelTableScanDesc parallel_scan) +{ + Snapshot snapshot; + + Assert(RelationGetRelid(relation) == parallel_scan->phs_relid); + + if (!parallel_scan->phs_snapshot_any) + { + /* Snapshot was serialized -- restore it */ + snapshot = RestoreSnapshot((char *) parallel_scan + + parallel_scan->phs_snapshot_off); + RegisterSnapshot(snapshot); + } + else + { + /* SnapshotAny passed by caller (not serialized) */ + snapshot = SnapshotAny; + } + + return relation->rd_tableam->scan_begin(relation, snapshot, 0, NULL, parallel_scan, + true, true, true, false, false, !parallel_scan->phs_snapshot_any); +} + + +/* ---------------------------------------------------------------------------- + * Helper functions to implement parallel scans for block oriented AMs. + * ---------------------------------------------------------------------------- + */ + +Size +table_block_parallelscan_estimate(Relation rel) +{ + return sizeof(ParallelBlockTableScanDescData); +} + +Size +table_block_parallelscan_initialize(Relation rel, ParallelTableScanDesc pscan) +{ + ParallelBlockTableScanDesc bpscan = (ParallelBlockTableScanDesc) pscan; + + bpscan->base.phs_relid = RelationGetRelid(rel); + bpscan->phs_nblocks = RelationGetNumberOfBlocks(rel); + /* compare phs_syncscan initialization to similar logic in initscan */ + bpscan->base.phs_syncscan = synchronize_seqscans && + !RelationUsesLocalBuffers(rel) && + bpscan->phs_nblocks > NBuffers / 4; + SpinLockInit(&bpscan->phs_mutex); + bpscan->phs_startblock = InvalidBlockNumber; + pg_atomic_init_u64(&bpscan->phs_nallocated, 0); + + return sizeof(ParallelBlockTableScanDescData); +} + +void +table_block_parallelscan_reinitialize(Relation rel, ParallelTableScanDesc pscan) +{ + ParallelBlockTableScanDesc bpscan = (ParallelBlockTableScanDesc) pscan; + + pg_atomic_write_u64(&bpscan->phs_nallocated, 0); +} + +/* + * find and set the scan's startblock + * + * Determine where the parallel seq scan should start. This function may be + * called many times, once by each parallel worker. We must be careful only + * to set the startblock once. + */ +void +table_block_parallelscan_startblock_init(Relation rel, ParallelBlockTableScanDesc pbscan) +{ + BlockNumber sync_startpage = InvalidBlockNumber; + +retry: + /* Grab the spinlock. */ + SpinLockAcquire(&pbscan->phs_mutex); + + /* + * If the scan's startblock has not yet been initialized, we must do so + * now. If this is not a synchronized scan, we just start at block 0, but + * if it is a synchronized scan, we must get the starting position from + * the synchronized scan machinery. We can't hold the spinlock while + * doing that, though, so release the spinlock, get the information we + * need, and retry. If nobody else has initialized the scan in the + * meantime, we'll fill in the value we fetched on the second time + * through. + */ + if (pbscan->phs_startblock == InvalidBlockNumber) + { + if (!pbscan->base.phs_syncscan) + pbscan->phs_startblock = 0; + else if (sync_startpage != InvalidBlockNumber) + pbscan->phs_startblock = sync_startpage; + else + { + SpinLockRelease(&pbscan->phs_mutex); + sync_startpage = ss_get_location(rel, pbscan->phs_nblocks); + goto retry; + } + } + SpinLockRelease(&pbscan->phs_mutex); +} + +/* + * get the next page to scan + * + * Get the next page to scan. Even if there are no pages left to scan, + * another backend could have grabbed a page to scan and not yet finished + * looking at it, so it doesn't follow that the scan is done when the first + * backend gets an InvalidBlockNumber return. + */ +BlockNumber +table_block_parallelscan_nextpage(Relation rel, ParallelBlockTableScanDesc pbscan) +{ + BlockNumber page; + uint64 nallocated; + + /* + * phs_nallocated tracks how many pages have been allocated to workers + * already. When phs_nallocated >= rs_nblocks, all blocks have been + * allocated. + * + * Because we use an atomic fetch-and-add to fetch the current value, the + * phs_nallocated counter will exceed rs_nblocks, because workers will + * still increment the value, when they try to allocate the next block but + * all blocks have been allocated already. The counter must be 64 bits + * wide because of that, to avoid wrapping around when rs_nblocks is close + * to 2^32. + * + * The actual page to return is calculated by adding the counter to the + * starting block number, modulo nblocks. + */ + nallocated = pg_atomic_fetch_add_u64(&pbscan->phs_nallocated, 1); + if (nallocated >= pbscan->phs_nblocks) + page = InvalidBlockNumber; /* all blocks have been allocated */ + else + page = (nallocated + pbscan->phs_startblock) % pbscan->phs_nblocks; + + /* + * Report scan location. Normally, we report the current page number. + * When we reach the end of the scan, though, we report the starting page, + * not the ending page, just so the starting positions for later scans + * doesn't slew backwards. We only report the position at the end of the + * scan once, though: subsequent callers will report nothing. + */ + if (pbscan->base.phs_syncscan) + { + if (page != InvalidBlockNumber) + ss_report_location(rel, page); + else if (nallocated == pbscan->phs_nblocks) + ss_report_location(rel, pbscan->phs_startblock); + } + + return page; +} |