/*-------------------------------------------------------------------------
 *
 * sinvaladt.c
 *	  POSTGRES shared cache invalidation segment definitions.
 *
 * Copyright (c) 1994, Regents of the University of California
 *
 *
 * IDENTIFICATION
 *	  $Header: /cvsroot/pgsql/src/backend/storage/ipc/sinvaladt.c,v 1.23 1999/07/17 20:17:44 momjian Exp $
 *
 *-------------------------------------------------------------------------
 */
#include <signal.h>
#include <unistd.h>

#include "postgres.h"

#include "storage/backendid.h"
#include "storage/lmgr.h"
#include "utils/trace.h"

/* ----------------
 *		global variable notes
 *
 *		SharedInvalidationSemaphore
 *
 *		shmInvalBuffer
 *				the shared buffer segment, set by SISegmentAttach()
 *
 *		MyBackendId
 *				might be removed later, used only for
 *				debugging in debug routines (end of file)
 *
 *		SIDbId
 *				identification of buffer (disappears)
 *
 *		SIRelId			\
 *		SIDummyOid		 \	identification of buffer
 *		SIXidData		 /
 *		SIXid			/
 *
 *	XXX This file really needs to be cleaned up.  We switched to using
 *		spinlocks to protect critical sections (as opposed to using fake
 *		relations and going through the lock manager) and some of the old
 *		cruft was 'ifdef'ed out, while other parts (now unused) are still
 *		compiled into the system. -mer 5/24/92
 * ----------------
 */
#ifdef HAS_TEST_AND_SET
int			SharedInvalidationLockId;

#else
IpcSemaphoreId SharedInvalidationSemaphore;

#endif

SISeg	   *shmInvalBuffer;
extern BackendId MyBackendId;

static void CleanupInvalidationState(int status, SISeg *segInOutP);
static BackendId SIAssignBackendId(SISeg *segInOutP, BackendTag backendTag);
static int	SIGetNumEntries(SISeg *segP);

/************************************************************************/
/* SISetActiveProcess(segP, backendId)	set the backend status active	*/
/*		should be called only by the postmaster when creating a backend */
/************************************************************************/
/* XXX I suspect that the segP parameter is extraneous. -hirohama */
static void
SISetActiveProcess(SISeg *segInOutP, BackendId backendId)
{
	/* mark all messages as read */

	/* Assert(segP->procState[backendId - 1].tag == MyBackendTag); */

	segInOutP->procState[backendId - 1].resetState = false;
	segInOutP->procState[backendId - 1].limit = SIGetNumEntries(segInOutP);
}

/****************************************************************************/
/* SIBackendInit()	initializes a backend to operate on the buffer			*/
/****************************************************************************/
int
SIBackendInit(SISeg *segInOutP)
{
	LockRelId	LtCreateRelId();
	TransactionId LMITransactionIdCopy();

	Assert(MyBackendTag > 0);

	MyBackendId = SIAssignBackendId(segInOutP, MyBackendTag);
	if (MyBackendId == InvalidBackendTag)
		return 0;

#ifdef	INVALIDDEBUG
	elog(DEBUG, "SIBackendInit: backend tag %d; backend id %d.",
		 MyBackendTag, MyBackendId);
#endif	 /* INVALIDDEBUG */

	SISetActiveProcess(segInOutP, MyBackendId);
	on_shmem_exit(CleanupInvalidationState, (caddr_t) segInOutP);
	return 1;
}

/* ----------------
 *		SIAssignBackendId
 * ----------------
 */
static BackendId
SIAssignBackendId(SISeg *segInOutP, BackendTag backendTag)
{
	Index		index;
	ProcState  *stateP = NULL;

	for (index = 0; index < segInOutP->maxBackends; index++)
	{
		if (segInOutP->procState[index].tag == InvalidBackendTag ||
			segInOutP->procState[index].tag == backendTag)
		{
			stateP = &segInOutP->procState[index];
			break;
		}

		if (!PointerIsValid(stateP) ||
			(segInOutP->procState[index].resetState &&
			 (!stateP->resetState ||
			  stateP->tag < backendTag)) ||
			(!stateP->resetState &&
			 (segInOutP->procState[index].limit <
			  stateP->limit ||
			  stateP->tag < backendTag)))
			stateP = &segInOutP->procState[index];
	}

	/* verify that all "procState" entries checked for matching tags */

	for (index++; index < segInOutP->maxBackends; index++)
	{
		if (segInOutP->procState[index].tag == backendTag)
			elog(FATAL, "SIAssignBackendId: tag %d found twice", backendTag);
	}

	Assert(stateP);

	if (stateP->tag != InvalidBackendTag)
	{
		if (stateP->tag == backendTag)
			elog(NOTICE, "SIAssignBackendId: reusing tag %d", backendTag);
		else
		{
			elog(NOTICE, "SIAssignBackendId: discarding tag %d", stateP->tag);
			return InvalidBackendTag;
		}
	}

	stateP->tag = backendTag;

	return 1 + stateP - &segInOutP->procState[0];
}


/************************************************************************/
/* The following function should be called only by the postmaster !!	*/
/************************************************************************/

/************************************************************************/
/* SISetDeadProcess(segP, backendId)  set the backend status DEAD		*/
/*		should be called only by the postmaster when a backend died		*/
/************************************************************************/
static void
SISetDeadProcess(SISeg *segP, int backendId)
{
	/* XXX call me.... */

	segP->procState[backendId - 1].resetState = false;
	segP->procState[backendId - 1].limit = -1;
	segP->procState[backendId - 1].tag = InvalidBackendTag;
}

/*
 * CleanupInvalidationState
 * Note:
 *		This is a temporary hack.  ExitBackend should call this instead
 *		of exit (via on_shmem_exit).
 */
static void
CleanupInvalidationState(int status,	/* XXX */
						 SISeg *segInOutP)		/* XXX style */
{
	Assert(PointerIsValid(segInOutP));

	SISetDeadProcess(segInOutP, MyBackendId);
}


/************************************************************************/
/* SIComputeSize()	- compute size and offsets for SI segment			*/
/************************************************************************/
static void
SIComputeSize(SISegOffsets *oP, int maxBackends)
{
	int			A,
				B,
				a,
				b,
				totalSize;

	A = 0;
	/* sizeof(SISeg) includes the first ProcState entry */
	a = sizeof(SISeg) + sizeof(ProcState) * (maxBackends - 1);
	a = MAXALIGN(a);			/* offset to first data entry */
	b = sizeof(SISegEntry) * MAXNUMMESSAGES;
	B = A + a + b;
	B = MAXALIGN(B);
	totalSize = B - A;

	oP->startSegment = A;
	oP->offsetToFirstEntry = a; /* relative to A */
	oP->offsetToEndOfSegment = totalSize;		/* relative to A */
}


/************************************************************************/
/* SISetStartEntrySection(segP, offset)		- sets the offset			*/
/************************************************************************/
static void
SISetStartEntrySection(SISeg *segP, Offset offset)
{
	segP->startEntrySection = offset;
}

/************************************************************************/
/* SIGetStartEntrySection(segP)		- returnss the offset				*/
/************************************************************************/
static Offset
SIGetStartEntrySection(SISeg *segP)
{
	return segP->startEntrySection;
}


/************************************************************************/
/* SISetEndEntrySection(segP, offset)	- sets the offset				*/
/************************************************************************/
static void
SISetEndEntrySection(SISeg *segP, Offset offset)
{
	segP->endEntrySection = offset;
}

/************************************************************************/
/* SISetEndEntryChain(segP, offset)		- sets the offset				*/
/************************************************************************/
static void
SISetEndEntryChain(SISeg *segP, Offset offset)
{
	segP->endEntryChain = offset;
}

/************************************************************************/
/* SIGetEndEntryChain(segP)		- returnss the offset					*/
/************************************************************************/
static Offset
SIGetEndEntryChain(SISeg *segP)
{
	return segP->endEntryChain;
}

/************************************************************************/
/* SISetStartEntryChain(segP, offset)	- sets the offset				*/
/************************************************************************/
static void
SISetStartEntryChain(SISeg *segP, Offset offset)
{
	segP->startEntryChain = offset;
}

/************************************************************************/
/* SIGetStartEntryChain(segP)	- returns  the offset					*/
/************************************************************************/
static Offset
SIGetStartEntryChain(SISeg *segP)
{
	return segP->startEntryChain;
}

/************************************************************************/
/* SISetNumEntries(segP, num)	sets the current nuber of entries		*/
/************************************************************************/
static bool
SISetNumEntries(SISeg *segP, int num)
{
	if (num <= MAXNUMMESSAGES)
	{
		segP->numEntries = num;
		return true;
	}
	else
	{
		return false;			/* table full */
	}
}

/************************************************************************/
/* SIGetNumEntries(segP)	- returns the current nuber of entries		*/
/************************************************************************/
static int
SIGetNumEntries(SISeg *segP)
{
	return segP->numEntries;
}


/************************************************************************/
/* SISetMaxNumEntries(segP, num)	sets the maximal number of entries	*/
/************************************************************************/
static bool
SISetMaxNumEntries(SISeg *segP, int num)
{
	if (num <= MAXNUMMESSAGES)
	{
		segP->maxNumEntries = num;
		return true;
	}
	else
	{
		return false;			/* wrong number */
	}
}


/************************************************************************/
/* SIGetProcStateLimit(segP, i) returns the limit of read messages		*/
/************************************************************************/

#define SIGetProcStateLimit(segP,i) \
		((segP)->procState[i].limit)

/************************************************************************/
/* SIIncNumEntries(segP, num)	increments the current nuber of entries */
/************************************************************************/
static bool
SIIncNumEntries(SISeg *segP, int num)
{

	/*
	 * Try to prevent table overflow. When the table is 70% full send a
	 * SIGUSR2 to the postmaster which will send it back to all the
	 * backends. This will be handled by Async_NotifyHandler() with a
	 * StartTransactionCommand() which will flush unread SI entries for
	 * each backend.									dz - 27 Jan 1998
	 */
	if (segP->numEntries == (MAXNUMMESSAGES * 70 / 100))
	{
		TPRINTF(TRACE_VERBOSE,
			"SIIncNumEntries: table is 70%% full, signaling postmaster");
		kill(getppid(), SIGUSR2);
	}

	if ((segP->numEntries + num) <= MAXNUMMESSAGES)
	{
		segP->numEntries = segP->numEntries + num;
		return true;
	}
	else
	{
		return false;			/* table full */
	}
}

/************************************************************************/
/* SIDecNumEntries(segP, num)	decrements the current nuber of entries */
/************************************************************************/
static bool
SIDecNumEntries(SISeg *segP, int num)
{
	if ((segP->numEntries - num) >= 0)
	{
		segP->numEntries = segP->numEntries - num;
		return true;
	}
	else
	{
		return false;			/* not enough entries in table */
	}
}

/************************************************************************/
/* SISetStartFreeSpace(segP, offset)  - sets the offset					*/
/************************************************************************/
static void
SISetStartFreeSpace(SISeg *segP, Offset offset)
{
	segP->startFreeSpace = offset;
}

/************************************************************************/
/* SIGetStartFreeSpace(segP)  - returns the offset						*/
/************************************************************************/
static Offset
SIGetStartFreeSpace(SISeg *segP)
{
	return segP->startFreeSpace;
}



/************************************************************************/
/* SIGetFirstDataEntry(segP)  returns first data entry					*/
/************************************************************************/
static SISegEntry *
SIGetFirstDataEntry(SISeg *segP)
{
	SISegEntry *eP;
	Offset		startChain;

	startChain = SIGetStartEntryChain(segP);

	if (startChain == InvalidOffset)
		return NULL;

	eP = (SISegEntry *) ((Pointer) segP +
						 SIGetStartEntrySection(segP) +
						 startChain);
	return eP;
}


/************************************************************************/
/* SIGetLastDataEntry(segP)  returns last data entry in the chain		*/
/************************************************************************/
static SISegEntry *
SIGetLastDataEntry(SISeg *segP)
{
	SISegEntry *eP;
	Offset		endChain;

	endChain = SIGetEndEntryChain(segP);

	if (endChain == InvalidOffset)
		return NULL;

	eP = (SISegEntry *) ((Pointer) segP +
						 SIGetStartEntrySection(segP) +
						 endChain);
	return eP;
}

/************************************************************************/
/* SIGetNextDataEntry(segP, offset)  returns next data entry			*/
/************************************************************************/
static SISegEntry *
SIGetNextDataEntry(SISeg *segP, Offset offset)
{
	SISegEntry *eP;

	if (offset == InvalidOffset)
		return NULL;

	eP = (SISegEntry *) ((Pointer) segP +
						 SIGetStartEntrySection(segP) +
						 offset);
	return eP;
}


/************************************************************************/
/* SIGetNthDataEntry(segP, n)	returns the n-th data entry in chain	*/
/************************************************************************/
static SISegEntry *
SIGetNthDataEntry(SISeg *segP,
				  int n)		/* must range from 1 to MaxMessages */
{
	SISegEntry *eP;
	int			i;

	if (n <= 0)
		return NULL;

	eP = SIGetFirstDataEntry(segP);
	for (i = 1; i < n; i++)
	{
		/* skip one and get the next	*/
		eP = SIGetNextDataEntry(segP, eP->next);
	}

	return eP;
}

/************************************************************************/
/* SIEntryOffset(segP, entryP)	 returns the offset for an pointer		*/
/************************************************************************/
static Offset
SIEntryOffset(SISeg *segP, SISegEntry *entryP)
{
	/* relative to B !! */
	return ((Offset) ((Pointer) entryP -
					  (Pointer) segP -
					  SIGetStartEntrySection(segP)));
}


/************************************************************************/
/* SISetDataEntry(segP, data)  - sets a message in the segemnt			*/
/************************************************************************/
bool
SISetDataEntry(SISeg *segP, SharedInvalidData *data)
{
	Offset		offsetToNewData;
	SISegEntry *eP,
			   *lastP;

	if (!SIIncNumEntries(segP, 1))
		return false;			/* no space */

	/* get a free entry */
	offsetToNewData = SIGetStartFreeSpace(segP);
	eP = SIGetNextDataEntry(segP, offsetToNewData);		/* it's a free one */
	SISetStartFreeSpace(segP, eP->next);
	/* fill it up */
	eP->entryData = *data;
	eP->isfree = false;
	eP->next = InvalidOffset;

	/* handle insertion point at the end of the chain !! */
	lastP = SIGetLastDataEntry(segP);
	if (lastP == NULL)
	{
		/* there is no chain, insert the first entry */
		SISetStartEntryChain(segP, SIEntryOffset(segP, eP));
	}
	else
	{
		/* there is a last entry in the chain */
		lastP->next = SIEntryOffset(segP, eP);
	}
	SISetEndEntryChain(segP, SIEntryOffset(segP, eP));
	return true;
}


/************************************************************************/
/* SIDecProcLimit(segP, num)  decrements all process limits				*/
/************************************************************************/
static void
SIDecProcLimit(SISeg *segP, int num)
{
	int			i;

	for (i = 0; i < segP->maxBackends; i++)
	{
		/* decrement only, if there is a limit > 0	*/
		if (segP->procState[i].limit > 0)
		{
			segP->procState[i].limit = segP->procState[i].limit - num;
			if (segP->procState[i].limit < 0)
			{
				/* limit was not high enough, reset to zero */
				/* negative means it's a dead backend	    */
				segP->procState[i].limit = 0;
			}
		}
	}
}


/************************************************************************/
/* SIDelDataEntry(segP)		- free the FIRST entry						*/
/************************************************************************/
bool
SIDelDataEntry(SISeg *segP)
{
	SISegEntry *e1P;

	if (!SIDecNumEntries(segP, 1))
	{
		/* no entries in buffer */
		return false;
	}

	e1P = SIGetFirstDataEntry(segP);
	SISetStartEntryChain(segP, e1P->next);
	if (SIGetStartEntryChain(segP) == InvalidOffset)
	{
		/* it was the last entry */
		SISetEndEntryChain(segP, InvalidOffset);
	}
	/* free the entry */
	e1P->isfree = true;
	e1P->next = SIGetStartFreeSpace(segP);
	SISetStartFreeSpace(segP, SIEntryOffset(segP, e1P));
	SIDecProcLimit(segP, 1);
	return true;
}



/************************************************************************/
/* SISetProcStateInvalid(segP)	checks and marks a backends state as	*/
/*									invalid								*/
/************************************************************************/
void
SISetProcStateInvalid(SISeg *segP)
{
	int			i;

	for (i = 0; i < segP->maxBackends; i++)
	{
		if (segP->procState[i].limit == 0)
		{
			/* backend i didn't read any message    	    	    	 */
			segP->procState[i].resetState = true;

			/*
			 * XXX signal backend that it has to reset its internal cache
			 * ?
			 */
		}
	}
}

/************************************************************************/
/* SIReadEntryData(segP, backendId, function)							*/
/*						- marks messages to be read by id				*/
/*						  and executes function							*/
/************************************************************************/
void
SIReadEntryData(SISeg *segP,
				int backendId,
				void (*invalFunction) (),
				void (*resetFunction) ())
{
	int			i = 0;
	SISegEntry *data;

	Assert(segP->procState[backendId - 1].tag == MyBackendTag);

	if (!segP->procState[backendId - 1].resetState)
	{
		/* invalidate data, but only those, you have not seen yet !! */
		/* therefore skip read messages */
		data = SIGetNthDataEntry(segP,
						   SIGetProcStateLimit(segP, backendId - 1) + 1);
		while (data != NULL)
		{
			i++;
			segP->procState[backendId - 1].limit++;		/* one more message read */
			invalFunction(data->entryData.cacheId,
						  data->entryData.hashIndex,
						  &data->entryData.pointerData);
			data = SIGetNextDataEntry(segP, data->next);
		}
		/* SIDelExpiredDataEntries(segP); */
	}
	else
	{
		/* backend must not read messages, its own state has to be reset	 */
		elog(NOTICE, "SIReadEntryData: cache state reset");
		resetFunction();		/* XXXX call it here, parameters? */

		/* new valid state--mark all messages "read" */
		segP->procState[backendId - 1].resetState = false;
		segP->procState[backendId - 1].limit = SIGetNumEntries(segP);
	}
	/* check whether we can remove dead messages							*/
	if (i > MAXNUMMESSAGES)
		elog(FATAL, "SIReadEntryData: Invalid segment state");
}

/************************************************************************/
/* SIDelExpiredDataEntries	(segP)	- removes irrelevant messages		*/
/************************************************************************/
void
SIDelExpiredDataEntries(SISeg *segP)
{
	int			min,
				i,
				h;

	min = 9999999;
	for (i = 0; i < segP->maxBackends; i++)
	{
		h = SIGetProcStateLimit(segP, i);
		if (h >= 0)
		{						/* backend active */
			if (h < min)
				min = h;
		}
	}
	if (min != 9999999)
	{
		/* we can remove min messages */
		for (i = 1; i <= min; i++)
		{
			/* this  adjusts also the state limits! */
			if (!SIDelDataEntry(segP))
				elog(FATAL, "SIDelExpiredDataEntries: Invalid segment state");
		}
	}
}



/************************************************************************/
/* SISegInit(segP)	- initializes the segment							*/
/************************************************************************/
static void
SISegInit(SISeg *segP, SISegOffsets *oP, int maxBackends)
{
	int			i;
	SISegEntry *eP;

	/* set semaphore ids in the segment */
	/* XXX */
	SISetStartEntrySection(segP, oP->offsetToFirstEntry);
	SISetEndEntrySection(segP, oP->offsetToEndOfSegment);
	SISetStartFreeSpace(segP, 0);
	SISetStartEntryChain(segP, InvalidOffset);
	SISetEndEntryChain(segP, InvalidOffset);
	SISetNumEntries(segP, 0);
	SISetMaxNumEntries(segP, MAXNUMMESSAGES);
	segP->maxBackends = maxBackends;
	for (i = 0; i < segP->maxBackends; i++)
	{
		segP->procState[i].limit = -1;	/* no backend active  !! */
		segP->procState[i].resetState = false;
		segP->procState[i].tag = InvalidBackendTag;
	}
	/* construct a chain of free entries							*/
	for (i = 1; i < MAXNUMMESSAGES; i++)
	{
		eP = (SISegEntry *) ((Pointer) segP +
							 SIGetStartEntrySection(segP) +
							 (i - 1) * sizeof(SISegEntry));
		eP->isfree = true;
		eP->next = i * sizeof(SISegEntry);		/* relative to B */
	}
	/* handle the last free entry separate							*/
	eP = (SISegEntry *) ((Pointer) segP +
						 SIGetStartEntrySection(segP) +
						 (MAXNUMMESSAGES - 1) * sizeof(SISegEntry));
	eP->isfree = true;
	eP->next = InvalidOffset;	/* it's the end of the chain !! */
}



/************************************************************************/
/* SISegmentKill(key)	- kill any segment								*/
/************************************************************************/
static void
SISegmentKill(int key)			/* the corresponding key for the segment */
{
	IpcMemoryKill(key);
}


/************************************************************************/
/* SISegmentGet(key, size)	- get a shared segment of size <size>		*/
/*				  returns a segment id									*/
/************************************************************************/
static IpcMemoryId
SISegmentGet(int key,			/* the corresponding key for the segment */
			 int size,			/* size of segment in bytes				 */
			 bool create)
{
	IpcMemoryId shmid;

	if (create)
		shmid = IpcMemoryCreate(key, size, IPCProtection);
	else
		shmid = IpcMemoryIdGet(key, size);
	return shmid;
}

/************************************************************************/
/* SISegmentAttach(shmid)	- attach a shared segment with id shmid		*/
/************************************************************************/
static void
SISegmentAttach(IpcMemoryId shmid)
{
	shmInvalBuffer = (struct SISeg *) IpcMemoryAttach(shmid);
	if (shmInvalBuffer == IpcMemAttachFailed)
	{
		/* XXX use validity function */
		elog(NOTICE, "SISegmentAttach: Could not attach segment");
		elog(FATAL, "SISegmentAttach: %m");
	}
}


/************************************************************************/
/* SISegmentInit()			initialize SI segment						*/
/*																		*/
/* NB: maxBackends param is only valid when killExistingSegment is true	*/
/************************************************************************/
int
SISegmentInit(bool killExistingSegment, IPCKey key, int maxBackends)
{
	SISegOffsets offsets;
	IpcMemoryId shmId;
	bool		create;

	if (killExistingSegment)
	{
		/* Kill existing segment */
		/* set semaphore */
		SISegmentKill(key);

		/* Get a shared segment */
		SIComputeSize(&offsets, maxBackends);
		create = true;
		shmId = SISegmentGet(key, offsets.offsetToEndOfSegment, create);
		if (shmId < 0)
		{
			perror("SISegmentGet: failed");
			return -1;			/* an error */
		}

		/* Attach the shared cache invalidation  segment */
		/* sets the global variable shmInvalBuffer */
		SISegmentAttach(shmId);

		/* Init shared memory table */
		SISegInit(shmInvalBuffer, &offsets, maxBackends);
	}
	else
	{
		/* use an existing segment */
		create = false;
		shmId = SISegmentGet(key, 0, create);
		if (shmId < 0)
		{
			perror("SISegmentGet: getting an existent segment failed");
			return -1;			/* an error */
		}
		/* Attach the shared cache invalidation segment */
		SISegmentAttach(shmId);
	}
	return 1;
}