*** pgsql/src/backend/utils/cache/inval.c 2009/06/11 14:49:05 1.89 --- pgsql/src/backend/utils/cache/inval.c 2009/12/19 01:32:36 1.90 *************** *** 80,86 **** * Portions Copyright (c) 1994, Regents of the University of California * * IDENTIFICATION ! * $PostgreSQL: pgsql/src/backend/utils/cache/inval.c,v 1.88 2009/01/01 17:23:50 momjian Exp $ * *------------------------------------------------------------------------- */ --- 80,86 ---- * Portions Copyright (c) 1994, Regents of the University of California * * IDENTIFICATION ! * $PostgreSQL: pgsql/src/backend/utils/cache/inval.c,v 1.89 2009/06/11 14:49:05 momjian Exp $ * *------------------------------------------------------------------------- */ *************** typedef struct TransInvalidationInfo *** 155,160 **** --- 155,165 ---- static TransInvalidationInfo *transInvalInfo = NULL; + static SharedInvalidationMessage *SharedInvalidMessagesArray; + static int numSharedInvalidMessagesArray; + static int maxSharedInvalidMessagesArray; + + /* * Dynamically-registered callback functions. Current implementation * assumes there won't be very many of these at once; could improve if needed. *************** static struct RELCACHECALLBACK *** 180,193 **** static int relcache_callback_count = 0; - /* info values for 2PC callback */ - #define TWOPHASE_INFO_MSG 0 /* SharedInvalidationMessage */ - #define TWOPHASE_INFO_FILE_BEFORE 1 /* relcache file inval */ - #define TWOPHASE_INFO_FILE_AFTER 2 /* relcache file inval */ - - static void PersistInvalidationMessage(SharedInvalidationMessage *msg); - - /* ---------------------------------------------------------------- * Invalidation list support functions * --- 185,190 ---- *************** AtStart_Inval(void) *** 741,778 **** MemoryContextAllocZero(TopTransactionContext, sizeof(TransInvalidationInfo)); transInvalInfo->my_level = GetCurrentTransactionNestLevel(); ! } ! ! /* ! * AtPrepare_Inval ! * Save the inval lists state at 2PC transaction prepare. ! * ! * In this phase we just generate 2PC records for all the pending invalidation ! * work. ! */ ! void ! AtPrepare_Inval(void) ! { ! /* Must be at top of stack */ ! Assert(transInvalInfo != NULL && transInvalInfo->parent == NULL); ! ! /* ! * Relcache init file invalidation requires processing both before and ! * after we send the SI messages. ! */ ! if (transInvalInfo->RelcacheInitFileInval) ! RegisterTwoPhaseRecord(TWOPHASE_RM_INVAL_ID, TWOPHASE_INFO_FILE_BEFORE, ! NULL, 0); ! ! AppendInvalidationMessages(&transInvalInfo->PriorCmdInvalidMsgs, ! &transInvalInfo->CurrentCmdInvalidMsgs); ! ! ProcessInvalidationMessages(&transInvalInfo->PriorCmdInvalidMsgs, ! PersistInvalidationMessage); ! ! if (transInvalInfo->RelcacheInitFileInval) ! RegisterTwoPhaseRecord(TWOPHASE_RM_INVAL_ID, TWOPHASE_INFO_FILE_AFTER, ! NULL, 0); } /* --- 738,745 ---- MemoryContextAllocZero(TopTransactionContext, sizeof(TransInvalidationInfo)); transInvalInfo->my_level = GetCurrentTransactionNestLevel(); ! SharedInvalidMessagesArray = NULL; ! numSharedInvalidMessagesArray = 0; } /* *************** AtSubStart_Inval(void) *** 812,856 **** } /* ! * PersistInvalidationMessage ! * Write an invalidation message to the 2PC state file. */ static void ! PersistInvalidationMessage(SharedInvalidationMessage *msg) { ! RegisterTwoPhaseRecord(TWOPHASE_RM_INVAL_ID, TWOPHASE_INFO_MSG, ! msg, sizeof(SharedInvalidationMessage)); } /* ! * inval_twophase_postcommit ! * Process an invalidation message from the 2PC state file. */ ! void ! inval_twophase_postcommit(TransactionId xid, uint16 info, ! void *recdata, uint32 len) { ! SharedInvalidationMessage *msg; ! switch (info) ! { ! case TWOPHASE_INFO_MSG: ! msg = (SharedInvalidationMessage *) recdata; ! Assert(len == sizeof(SharedInvalidationMessage)); ! SendSharedInvalidMessages(msg, 1); ! break; ! case TWOPHASE_INFO_FILE_BEFORE: ! RelationCacheInitFileInvalidate(true); ! break; ! case TWOPHASE_INFO_FILE_AFTER: ! RelationCacheInitFileInvalidate(false); ! break; ! default: ! Assert(false); ! break; ! } ! } /* * AtEOXact_Inval --- 779,875 ---- } /* ! * Collect invalidation messages into SharedInvalidMessagesArray array. */ static void ! MakeSharedInvalidMessagesArray(const SharedInvalidationMessage *msgs, int n) { ! /* ! * Initialise array first time through in each commit ! */ ! if (SharedInvalidMessagesArray == NULL) ! { ! maxSharedInvalidMessagesArray = FIRSTCHUNKSIZE; ! numSharedInvalidMessagesArray = 0; ! ! /* ! * Although this is being palloc'd we don't actually free it directly. ! * We're so close to EOXact that we now we're going to lose it anyhow. ! */ ! SharedInvalidMessagesArray = palloc(maxSharedInvalidMessagesArray ! * sizeof(SharedInvalidationMessage)); ! } ! ! if ((numSharedInvalidMessagesArray + n) > maxSharedInvalidMessagesArray) ! { ! while ((numSharedInvalidMessagesArray + n) > maxSharedInvalidMessagesArray) ! maxSharedInvalidMessagesArray *= 2; ! ! SharedInvalidMessagesArray = repalloc(SharedInvalidMessagesArray, ! maxSharedInvalidMessagesArray ! * sizeof(SharedInvalidationMessage)); ! } ! ! /* ! * Append the next chunk onto the array ! */ ! memcpy(SharedInvalidMessagesArray + numSharedInvalidMessagesArray, ! msgs, n * sizeof(SharedInvalidationMessage)); ! numSharedInvalidMessagesArray += n; } /* ! * xactGetCommittedInvalidationMessages() is executed by ! * RecordTransactionCommit() to add invalidation messages onto the ! * commit record. This applies only to commit message types, never to ! * abort records. Must always run before AtEOXact_Inval(), since that ! * removes the data we need to see. ! * ! * Remember that this runs before we have officially committed, so we ! * must not do anything here to change what might occur *if* we should ! * fail between here and the actual commit. ! * ! * see also xact_redo_commit() and xact_desc_commit() */ ! int ! xactGetCommittedInvalidationMessages(SharedInvalidationMessage **msgs, ! bool *RelcacheInitFileInval) { ! MemoryContext oldcontext; ! /* Must be at top of stack */ ! Assert(transInvalInfo != NULL && transInvalInfo->parent == NULL); ! ! /* ! * Relcache init file invalidation requires processing both before and ! * after we send the SI messages. However, we need not do anything ! * unless we committed. ! */ ! *RelcacheInitFileInval = transInvalInfo->RelcacheInitFileInval; + /* + * Walk through TransInvalidationInfo to collect all the messages + * into a single contiguous array of invalidation messages. It must + * be contiguous so we can copy directly into WAL message. Maintain the + * order that they would be processed in by AtEOXact_Inval(), to ensure + * emulated behaviour in redo is as similar as possible to original. + * We want the same bugs, if any, not new ones. + */ + oldcontext = MemoryContextSwitchTo(CurTransactionContext); + + ProcessInvalidationMessagesMulti(&transInvalInfo->CurrentCmdInvalidMsgs, + MakeSharedInvalidMessagesArray); + ProcessInvalidationMessagesMulti(&transInvalInfo->PriorCmdInvalidMsgs, + MakeSharedInvalidMessagesArray); + MemoryContextSwitchTo(oldcontext); + + Assert(!(numSharedInvalidMessagesArray > 0 && + SharedInvalidMessagesArray == NULL)); + + *msgs = SharedInvalidMessagesArray; + + return numSharedInvalidMessagesArray; + } /* * AtEOXact_Inval *************** CommandEndInvalidationMessages(void) *** 1028,1033 **** --- 1047,1054 ---- * no need to worry about cleaning up if there's an elog(ERROR) before * reaching EndNonTransactionalInvalidation (the invals will just be thrown * away if that happens). + * + * Note that these are not replayed in standby mode. */ void BeginNonTransactionalInvalidation(void) *************** BeginNonTransactionalInvalidation(void) *** 1041,1046 **** --- 1062,1070 ---- Assert(transInvalInfo->CurrentCmdInvalidMsgs.cclist == NULL); Assert(transInvalInfo->CurrentCmdInvalidMsgs.rclist == NULL); Assert(transInvalInfo->RelcacheInitFileInval == false); + + SharedInvalidMessagesArray = NULL; + numSharedInvalidMessagesArray = 0; } /*