*** pgsql/src/backend/access/heap/heapam.c 2008/11/19 10:34:50 1.270 --- pgsql/src/backend/access/heap/heapam.c 2008/12/03 13:05:22 1.271 *************** *** 8,14 **** * * * IDENTIFICATION ! * $PostgreSQL: pgsql/src/backend/access/heap/heapam.c,v 1.269 2008/11/06 20:51:14 tgl Exp $ * * * INTERFACE ROUTINES --- 8,14 ---- * * * IDENTIFICATION ! * $PostgreSQL: pgsql/src/backend/access/heap/heapam.c,v 1.270 2008/11/19 10:34:50 heikki Exp $ * * * INTERFACE ROUTINES *************** *** 47,52 **** --- 47,53 ---- #include "access/transam.h" #include "access/tuptoaster.h" #include "access/valid.h" + #include "access/visibilitymap.h" #include "access/xact.h" #include "access/xlogutils.h" #include "catalog/catalog.h" *************** heapgetpage(HeapScanDesc scan, BlockNumb *** 195,200 **** --- 196,202 ---- int ntup; OffsetNumber lineoff; ItemId lpp; + bool all_visible; Assert(page < scan->rs_nblocks); *************** heapgetpage(HeapScanDesc scan, BlockNumb *** 233,252 **** lines = PageGetMaxOffsetNumber(dp); ntup = 0; for (lineoff = FirstOffsetNumber, lpp = PageGetItemId(dp, lineoff); lineoff <= lines; lineoff++, lpp++) { if (ItemIdIsNormal(lpp)) { - HeapTupleData loctup; bool valid; ! loctup.t_data = (HeapTupleHeader) PageGetItem((Page) dp, lpp); ! loctup.t_len = ItemIdGetLength(lpp); ! ItemPointerSet(&(loctup.t_self), page, lineoff); ! valid = HeapTupleSatisfiesVisibility(&loctup, snapshot, buffer); if (valid) scan->rs_vistuples[ntup++] = lineoff; } --- 235,266 ---- lines = PageGetMaxOffsetNumber(dp); ntup = 0; + /* + * If the all-visible flag indicates that all tuples on the page are + * visible to everyone, we can skip the per-tuple visibility tests. + */ + all_visible = PageIsAllVisible(dp); + for (lineoff = FirstOffsetNumber, lpp = PageGetItemId(dp, lineoff); lineoff <= lines; lineoff++, lpp++) { if (ItemIdIsNormal(lpp)) { bool valid; ! if (all_visible) ! valid = true; ! else ! { ! HeapTupleData loctup; ! ! loctup.t_data = (HeapTupleHeader) PageGetItem((Page) dp, lpp); ! loctup.t_len = ItemIdGetLength(lpp); ! ItemPointerSet(&(loctup.t_self), page, lineoff); ! valid = HeapTupleSatisfiesVisibility(&loctup, snapshot, buffer); ! } if (valid) scan->rs_vistuples[ntup++] = lineoff; } *************** heap_insert(Relation relation, HeapTuple *** 1860,1865 **** --- 1874,1880 ---- TransactionId xid = GetCurrentTransactionId(); HeapTuple heaptup; Buffer buffer; + bool all_visible_cleared = false; if (relation->rd_rel->relhasoids) { *************** heap_insert(Relation relation, HeapTuple *** 1920,1925 **** --- 1935,1946 ---- RelationPutHeapTuple(relation, buffer, heaptup); + if (PageIsAllVisible(BufferGetPage(buffer))) + { + all_visible_cleared = true; + PageClearAllVisible(BufferGetPage(buffer)); + } + /* * XXX Should we set PageSetPrunable on this page ? * *************** heap_insert(Relation relation, HeapTuple *** 1943,1948 **** --- 1964,1970 ---- Page page = BufferGetPage(buffer); uint8 info = XLOG_HEAP_INSERT; + xlrec.all_visible_cleared = all_visible_cleared; xlrec.target.node = relation->rd_node; xlrec.target.tid = heaptup->t_self; rdata[0].data = (char *) &xlrec; *************** heap_insert(Relation relation, HeapTuple *** 1994,1999 **** --- 2016,2026 ---- UnlockReleaseBuffer(buffer); + /* Clear the bit in the visibility map if necessary */ + if (all_visible_cleared) + visibilitymap_clear(relation, + ItemPointerGetBlockNumber(&(heaptup->t_self))); + /* * If tuple is cachable, mark it for invalidation from the caches in case * we abort. Note it is OK to do this after releasing the buffer, because *************** heap_delete(Relation relation, ItemPoint *** 2070,2075 **** --- 2097,2103 ---- Buffer buffer; bool have_tuple_lock = false; bool iscombo; + bool all_visible_cleared = false; Assert(ItemPointerIsValid(tid)); *************** l1: *** 2216,2221 **** --- 2244,2255 ---- */ PageSetPrunable(page, xid); + if (PageIsAllVisible(page)) + { + all_visible_cleared = true; + PageClearAllVisible(page); + } + /* store transaction information of xact deleting the tuple */ tp.t_data->t_infomask &= ~(HEAP_XMAX_COMMITTED | HEAP_XMAX_INVALID | *************** l1: *** 2237,2242 **** --- 2271,2277 ---- XLogRecPtr recptr; XLogRecData rdata[2]; + xlrec.all_visible_cleared = all_visible_cleared; xlrec.target.node = relation->rd_node; xlrec.target.tid = tp.t_self; rdata[0].data = (char *) &xlrec; *************** l1: *** 2281,2286 **** --- 2316,2325 ---- */ CacheInvalidateHeapTuple(relation, &tp); + /* Clear the bit in the visibility map if necessary */ + if (all_visible_cleared) + visibilitymap_clear(relation, BufferGetBlockNumber(buffer)); + /* Now we can release the buffer */ ReleaseBuffer(buffer); *************** heap_update(Relation relation, ItemPoint *** 2388,2393 **** --- 2427,2434 ---- bool have_tuple_lock = false; bool iscombo; bool use_hot_update = false; + bool all_visible_cleared = false; + bool all_visible_cleared_new = false; Assert(ItemPointerIsValid(otid)); *************** l2: *** 2763,2768 **** --- 2804,2815 ---- MarkBufferDirty(newbuf); MarkBufferDirty(buffer); + /* + * Note: we mustn't clear PD_ALL_VISIBLE flags before writing the WAL + * record, because log_heap_update looks at those flags to set the + * corresponding flags in the WAL record. + */ + /* XLOG stuff */ if (!relation->rd_istemp) { *************** l2: *** 2778,2783 **** --- 2825,2842 ---- PageSetTLI(BufferGetPage(buffer), ThisTimeLineID); } + /* Clear PD_ALL_VISIBLE flags */ + if (PageIsAllVisible(BufferGetPage(buffer))) + { + all_visible_cleared = true; + PageClearAllVisible(BufferGetPage(buffer)); + } + if (newbuf != buffer && PageIsAllVisible(BufferGetPage(newbuf))) + { + all_visible_cleared_new = true; + PageClearAllVisible(BufferGetPage(newbuf)); + } + END_CRIT_SECTION(); if (newbuf != buffer) *************** l2: *** 2791,2796 **** --- 2850,2861 ---- */ CacheInvalidateHeapTuple(relation, &oldtup); + /* Clear bits in visibility map */ + if (all_visible_cleared) + visibilitymap_clear(relation, BufferGetBlockNumber(buffer)); + if (all_visible_cleared_new) + visibilitymap_clear(relation, BufferGetBlockNumber(newbuf)); + /* Now we can release the buffer(s) */ if (newbuf != buffer) ReleaseBuffer(newbuf); *************** l3: *** 3412,3417 **** --- 3477,3487 ---- LockBuffer(*buffer, BUFFER_LOCK_UNLOCK); /* + * Don't update the visibility map here. Locking a tuple doesn't + * change visibility info. + */ + + /* * Now that we have successfully marked the tuple as locked, we can * release the lmgr tuple lock, if we had it. */ *************** log_heap_update(Relation reln, Buffer ol *** 3916,3922 **** --- 3986,3994 ---- xlrec.target.node = reln->rd_node; xlrec.target.tid = from; + xlrec.all_visible_cleared = PageIsAllVisible(BufferGetPage(oldbuf)); xlrec.newtid = newtup->t_self; + xlrec.new_all_visible_cleared = PageIsAllVisible(BufferGetPage(newbuf)); rdata[0].data = (char *) &xlrec; rdata[0].len = SizeOfHeapUpdate; *************** heap_xlog_delete(XLogRecPtr lsn, XLogRec *** 4185,4197 **** OffsetNumber offnum; ItemId lp = NULL; HeapTupleHeader htup; if (record->xl_info & XLR_BKP_BLOCK_1) return; ! buffer = XLogReadBuffer(xlrec->target.node, ! ItemPointerGetBlockNumber(&(xlrec->target.tid)), ! false); if (!BufferIsValid(buffer)) return; page = (Page) BufferGetPage(buffer); --- 4257,4281 ---- OffsetNumber offnum; ItemId lp = NULL; HeapTupleHeader htup; + BlockNumber blkno; + + blkno = ItemPointerGetBlockNumber(&(xlrec->target.tid)); + + /* + * The visibility map always needs to be updated, even if the heap page + * is already up-to-date. + */ + if (xlrec->all_visible_cleared) + { + Relation reln = CreateFakeRelcacheEntry(xlrec->target.node); + visibilitymap_clear(reln, blkno); + FreeFakeRelcacheEntry(reln); + } if (record->xl_info & XLR_BKP_BLOCK_1) return; ! buffer = XLogReadBuffer(xlrec->target.node, blkno, false); if (!BufferIsValid(buffer)) return; page = (Page) BufferGetPage(buffer); *************** heap_xlog_delete(XLogRecPtr lsn, XLogRec *** 4223,4228 **** --- 4307,4315 ---- /* Mark the page as a candidate for pruning */ PageSetPrunable(page, record->xl_xid); + if (xlrec->all_visible_cleared) + PageClearAllVisible(page); + /* Make sure there is no forward chain link in t_ctid */ htup->t_ctid = xlrec->target.tid; PageSetLSN(page, lsn); *************** heap_xlog_insert(XLogRecPtr lsn, XLogRec *** 4249,4259 **** Size freespace; BlockNumber blkno; if (record->xl_info & XLR_BKP_BLOCK_1) return; - blkno = ItemPointerGetBlockNumber(&(xlrec->target.tid)); - if (record->xl_info & XLOG_HEAP_INIT_PAGE) { buffer = XLogReadBuffer(xlrec->target.node, blkno, true); --- 4336,4357 ---- Size freespace; BlockNumber blkno; + blkno = ItemPointerGetBlockNumber(&(xlrec->target.tid)); + + /* + * The visibility map always needs to be updated, even if the heap page + * is already up-to-date. + */ + if (xlrec->all_visible_cleared) + { + Relation reln = CreateFakeRelcacheEntry(xlrec->target.node); + visibilitymap_clear(reln, blkno); + FreeFakeRelcacheEntry(reln); + } + if (record->xl_info & XLR_BKP_BLOCK_1) return; if (record->xl_info & XLOG_HEAP_INIT_PAGE) { buffer = XLogReadBuffer(xlrec->target.node, blkno, true); *************** heap_xlog_insert(XLogRecPtr lsn, XLogRec *** 4307,4312 **** --- 4405,4414 ---- PageSetLSN(page, lsn); PageSetTLI(page, ThisTimeLineID); + + if (xlrec->all_visible_cleared) + PageClearAllVisible(page); + MarkBufferDirty(buffer); UnlockReleaseBuffer(buffer); *************** heap_xlog_update(XLogRecPtr lsn, XLogRec *** 4347,4352 **** --- 4449,4466 ---- uint32 newlen; Size freespace; + /* + * The visibility map always needs to be updated, even if the heap page + * is already up-to-date. + */ + if (xlrec->all_visible_cleared) + { + Relation reln = CreateFakeRelcacheEntry(xlrec->target.node); + visibilitymap_clear(reln, + ItemPointerGetBlockNumber(&xlrec->target.tid)); + FreeFakeRelcacheEntry(reln); + } + if (record->xl_info & XLR_BKP_BLOCK_1) { if (samepage) *************** heap_xlog_update(XLogRecPtr lsn, XLogRec *** 4411,4416 **** --- 4525,4533 ---- /* Mark the page as a candidate for pruning */ PageSetPrunable(page, record->xl_xid); + if (xlrec->all_visible_cleared) + PageClearAllVisible(page); + /* * this test is ugly, but necessary to avoid thinking that insert change * is already applied *************** heap_xlog_update(XLogRecPtr lsn, XLogRec *** 4426,4431 **** --- 4543,4559 ---- newt:; + /* + * The visibility map always needs to be updated, even if the heap page + * is already up-to-date. + */ + if (xlrec->new_all_visible_cleared) + { + Relation reln = CreateFakeRelcacheEntry(xlrec->target.node); + visibilitymap_clear(reln, ItemPointerGetBlockNumber(&xlrec->newtid)); + FreeFakeRelcacheEntry(reln); + } + if (record->xl_info & XLR_BKP_BLOCK_2) return; *************** newsame:; *** 4504,4509 **** --- 4632,4640 ---- if (offnum == InvalidOffsetNumber) elog(PANIC, "heap_update_redo: failed to add tuple"); + if (xlrec->new_all_visible_cleared) + PageClearAllVisible(page); + freespace = PageGetHeapFreeSpace(page); /* needed to update FSM below */ PageSetLSN(page, lsn);