summaryrefslogtreecommitdiff
path: root/src/backend/replication/logical/snapbuild.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/replication/logical/snapbuild.c')
-rw-r--r--src/backend/replication/logical/snapbuild.c138
1 files changed, 70 insertions, 68 deletions
diff --git a/src/backend/replication/logical/snapbuild.c b/src/backend/replication/logical/snapbuild.c
index 36034dbec9d..cb45f906fc1 100644
--- a/src/backend/replication/logical/snapbuild.c
+++ b/src/backend/replication/logical/snapbuild.c
@@ -57,27 +57,27 @@
*
* The snapbuild machinery is starting up in several stages, as illustrated
* by the following graph:
- * +-------------------------+
- * +----|SNAPBUILD_START |-------------+
- * | +-------------------------+ |
- * | | |
- * | | |
- * | running_xacts with running xacts |
- * | | |
- * | | |
- * | v |
- * | +-------------------------+ v
- * | |SNAPBUILD_FULL_SNAPSHOT |------------>|
- * | +-------------------------+ |
- * running_xacts | saved snapshot
- * with zero xacts | at running_xacts's lsn
- * | | |
- * | all running toplevel TXNs finished |
- * | | |
- * | v |
- * | +-------------------------+ |
- * +--->|SNAPBUILD_CONSISTENT |<------------+
- * +-------------------------+
+ * +-------------------------+
+ * +----|SNAPBUILD_START |-------------+
+ * | +-------------------------+ |
+ * | | |
+ * | | |
+ * | running_xacts with running xacts |
+ * | | |
+ * | | |
+ * | v |
+ * | +-------------------------+ v
+ * | |SNAPBUILD_FULL_SNAPSHOT |------------>|
+ * | +-------------------------+ |
+ * running_xacts | saved snapshot
+ * with zero xacts | at running_xacts's lsn
+ * | | |
+ * | all running toplevel TXNs finished |
+ * | | |
+ * | v |
+ * | +-------------------------+ |
+ * +--->|SNAPBUILD_CONSISTENT |<------------+
+ * +-------------------------+
*
* Initially the machinery is in the START stage. When a xl_running_xacts
* record is read that is sufficiently new (above the safe xmin horizon),
@@ -184,7 +184,7 @@ struct SnapBuild
* Information about initially running transactions
*
* When we start building a snapshot there already may be transactions in
- * progress. Those are stored in running.xip. We don't have enough
+ * progress. Those are stored in running.xip. We don't have enough
* information about those to decode their contents, so until they are
* finished (xcnt=0) we cannot switch to a CONSISTENT state.
*/
@@ -244,7 +244,7 @@ struct SnapBuild
* removes knowledge about the previously used resowner, so we save it here.
*/
ResourceOwner SavedResourceOwnerDuringExport = NULL;
-bool ExportInProgress = false;
+bool ExportInProgress = false;
/* transaction state manipulation functions */
static void SnapBuildEndTxn(SnapBuild *builder, XLogRecPtr lsn, TransactionId xid);
@@ -496,7 +496,7 @@ SnapBuildBuildSnapshot(SnapBuild *builder, TransactionId xid)
snapshot->copied = false;
snapshot->curcid = FirstCommandId;
snapshot->active_count = 0;
- snapshot->regd_count = 1; /* mark as registered so nobody frees it */
+ snapshot->regd_count = 1; /* mark as registered so nobody frees it */
return snapshot;
}
@@ -635,7 +635,7 @@ SnapBuildClearExportedSnapshot()
bool
SnapBuildProcessChange(SnapBuild *builder, TransactionId xid, XLogRecPtr lsn)
{
- bool is_old_tx;
+ bool is_old_tx;
/*
* We can't handle data in transactions if we haven't built a snapshot
@@ -692,10 +692,10 @@ SnapBuildProcessNewCid(SnapBuild *builder, TransactionId xid,
CommandId cid;
/*
- * we only log new_cid's if a catalog tuple was modified, so mark
- * the transaction as containing catalog modifications
+ * we only log new_cid's if a catalog tuple was modified, so mark the
+ * transaction as containing catalog modifications
*/
- ReorderBufferXidSetCatalogChanges(builder->reorder, xid,lsn);
+ ReorderBufferXidSetCatalogChanges(builder->reorder, xid, lsn);
ReorderBufferAddNewTupleCids(builder->reorder, xlrec->top_xid, lsn,
xlrec->target.node, xlrec->target.tid,
@@ -712,7 +712,7 @@ SnapBuildProcessNewCid(SnapBuild *builder, TransactionId xid,
cid = xlrec->cmin;
else
{
- cid = InvalidCommandId; /* silence compiler */
+ cid = InvalidCommandId; /* silence compiler */
elog(ERROR, "xl_heap_new_cid record without a valid CommandId");
}
@@ -818,7 +818,7 @@ SnapBuildAddCommittedTxn(SnapBuild *builder, TransactionId xid)
(uint32) builder->committed.xcnt_space);
builder->committed.xip = repalloc(builder->committed.xip,
- builder->committed.xcnt_space * sizeof(TransactionId));
+ builder->committed.xcnt_space * sizeof(TransactionId));
}
/*
@@ -900,10 +900,10 @@ SnapBuildEndTxn(SnapBuild *builder, XLogRecPtr lsn, TransactionId xid)
* so our incrementaly built snapshot now is consistent.
*/
ereport(LOG,
- (errmsg("logical decoding found consistent point at %X/%X",
- (uint32)(lsn >> 32), (uint32)lsn),
- errdetail("xid %u finished, no running transactions anymore",
- xid)));
+ (errmsg("logical decoding found consistent point at %X/%X",
+ (uint32) (lsn >> 32), (uint32) lsn),
+ errdetail("xid %u finished, no running transactions anymore",
+ xid)));
builder->state = SNAPBUILD_CONSISTENT;
}
}
@@ -1170,15 +1170,16 @@ SnapBuildProcessRunningXacts(SnapBuild *builder, XLogRecPtr lsn, xl_running_xact
*/
if (txn != NULL && txn->restart_decoding_lsn != InvalidXLogRecPtr)
LogicalIncreaseRestartDecodingForSlot(lsn, txn->restart_decoding_lsn);
+
/*
* No in-progress transaction, can reuse the last serialized snapshot if
* we have one.
*/
else if (txn == NULL &&
- builder->reorder->current_restart_decoding_lsn != InvalidXLogRecPtr &&
+ builder->reorder->current_restart_decoding_lsn != InvalidXLogRecPtr &&
builder->last_serialized_snapshot != InvalidXLogRecPtr)
LogicalIncreaseRestartDecodingForSlot(lsn,
- builder->last_serialized_snapshot);
+ builder->last_serialized_snapshot);
}
@@ -1199,23 +1200,23 @@ SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *runn
* the currently running transactions. There are several ways to do that:
*
* a) There were no running transactions when the xl_running_xacts record
- * was inserted, jump to CONSISTENT immediately. We might find such a
- * state we were waiting for b) and c).
+ * was inserted, jump to CONSISTENT immediately. We might find such a
+ * state we were waiting for b) and c).
*
* b) Wait for all toplevel transactions that were running to end. We
- * simply track the number of in-progress toplevel transactions and
- * lower it whenever one commits or aborts. When that number
- * (builder->running.xcnt) reaches zero, we can go from FULL_SNAPSHOT
- * to CONSISTENT.
+ * simply track the number of in-progress toplevel transactions and
+ * lower it whenever one commits or aborts. When that number
+ * (builder->running.xcnt) reaches zero, we can go from FULL_SNAPSHOT
+ * to CONSISTENT.
* NB: We need to search running.xip when seeing a transaction's end to
- * make sure it's a toplevel transaction and it's been one of the
- * intially running ones.
+ * make sure it's a toplevel transaction and it's been one of the
+ * intially running ones.
* Interestingly, in contrast to HS, this allows us not to care about
* subtransactions - and by extension suboverflowed xl_running_xacts -
* at all.
*
* c) This (in a previous run) or another decoding slot serialized a
- * snapshot to disk that we can use.
+ * snapshot to disk that we can use.
* ---
*/
@@ -1231,7 +1232,7 @@ SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *runn
(errmsg("skipping snapshot at %X/%X while building logical decoding snapshot, xmin horizon too low",
(uint32) (lsn >> 32), (uint32) lsn),
errdetail("initial xmin horizon of %u vs the snapshot's %u",
- builder->initial_xmin_horizon, running->oldestRunningXid)));
+ builder->initial_xmin_horizon, running->oldestRunningXid)));
return true;
}
@@ -1263,7 +1264,7 @@ SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *runn
ereport(LOG,
(errmsg("logical decoding found consistent point at %X/%X",
- (uint32)(lsn >> 32), (uint32)lsn),
+ (uint32) (lsn >> 32), (uint32) lsn),
errdetail("running xacts with xcnt == 0")));
return false;
@@ -1274,15 +1275,16 @@ SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *runn
/* there won't be any state to cleanup */
return false;
}
+
/*
* b) first encounter of a useable xl_running_xacts record. If we had
- * found one earlier we would either track running transactions
- * (i.e. builder->running.xcnt != 0) or be consistent (this function
- * wouldn't get called).
+ * found one earlier we would either track running transactions (i.e.
+ * builder->running.xcnt != 0) or be consistent (this function wouldn't
+ * get called).
*/
else if (!builder->running.xcnt)
{
- int off;
+ int off;
/*
* We only care about toplevel xids as those are the ones we
@@ -1302,7 +1304,7 @@ SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *runn
builder->running.xcnt_space = running->xcnt;
builder->running.xip =
MemoryContextAlloc(builder->context,
- builder->running.xcnt * sizeof(TransactionId));
+ builder->running.xcnt * sizeof(TransactionId));
memcpy(builder->running.xip, running->xids,
builder->running.xcnt * sizeof(TransactionId));
@@ -1320,9 +1322,9 @@ SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *runn
builder->state = SNAPBUILD_FULL_SNAPSHOT;
ereport(LOG,
- (errmsg("logical decoding found initial starting point at %X/%X",
- (uint32)(lsn >> 32), (uint32)lsn),
- errdetail("%u xacts need to finish", (uint32) builder->running.xcnt)));
+ (errmsg("logical decoding found initial starting point at %X/%X",
+ (uint32) (lsn >> 32), (uint32) lsn),
+ errdetail("%u xacts need to finish", (uint32) builder->running.xcnt)));
/*
* Iterate through all xids, wait for them to finish.
@@ -1331,7 +1333,7 @@ SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *runn
* isolationtester to notice that we're currently waiting for
* something.
*/
- for(off = 0; off < builder->running.xcnt; off++)
+ for (off = 0; off < builder->running.xcnt; off++)
{
TransactionId xid = builder->running.xip[off];
@@ -1471,9 +1473,9 @@ SnapBuildSerialize(SnapBuild *builder, XLogRecPtr lsn)
* but remember location, so we don't need to read old data again.
*
* To be sure it has been synced to disk after the rename() from the
- * tempfile filename to the real filename, we just repeat the
- * fsync. That ought to be cheap because in most scenarios it should
- * already be safely on disk.
+ * tempfile filename to the real filename, we just repeat the fsync.
+ * That ought to be cheap because in most scenarios it should already
+ * be safely on disk.
*/
fsync_fname(path, false);
fsync_fname("pg_llog/snapshots", true);
@@ -1504,7 +1506,7 @@ SnapBuildSerialize(SnapBuild *builder, XLogRecPtr lsn)
if (unlink(tmppath) != 0 && errno != ENOENT)
ereport(ERROR,
(errcode_for_file_access(),
- errmsg("could not unlink file \"%s\": %m", path)));
+ errmsg("could not unlink file \"%s\": %m", path)));
needed_length = sizeof(SnapBuildOnDisk) +
sizeof(TransactionId) * builder->running.xcnt_space +
@@ -1518,7 +1520,7 @@ SnapBuildSerialize(SnapBuild *builder, XLogRecPtr lsn)
INIT_CRC32(ondisk->checksum);
COMP_CRC32(ondisk->checksum,
((char *) ondisk) + SnapBuildOnDiskNotChecksummedSize,
- SnapBuildOnDiskConstantSize - SnapBuildOnDiskNotChecksummedSize);
+ SnapBuildOnDiskConstantSize - SnapBuildOnDiskNotChecksummedSize);
ondisk_c += sizeof(SnapBuildOnDisk);
memcpy(&ondisk->builder, builder, sizeof(SnapBuild));
@@ -1597,8 +1599,8 @@ SnapBuildSerialize(SnapBuild *builder, XLogRecPtr lsn)
fsync_fname("pg_llog/snapshots", true);
/*
- * Now there's no way we can loose the dumped state anymore, remember
- * this as a serialization point.
+ * Now there's no way we can loose the dumped state anymore, remember this
+ * as a serialization point.
*/
builder->last_serialized_snapshot = lsn;
@@ -1673,7 +1675,7 @@ SnapBuildRestore(SnapBuild *builder, XLogRecPtr lsn)
INIT_CRC32(checksum);
COMP_CRC32(checksum,
((char *) &ondisk) + SnapBuildOnDiskNotChecksummedSize,
- SnapBuildOnDiskConstantSize - SnapBuildOnDiskNotChecksummedSize);
+ SnapBuildOnDiskConstantSize - SnapBuildOnDiskNotChecksummedSize);
/* read SnapBuild */
readBytes = read(fd, &ondisk.builder, sizeof(SnapBuild));
@@ -1781,7 +1783,7 @@ SnapBuildRestore(SnapBuild *builder, XLogRecPtr lsn)
ereport(LOG,
(errmsg("logical decoding found consistent point at %X/%X",
- (uint32)(lsn >> 32), (uint32)lsn),
+ (uint32) (lsn >> 32), (uint32) lsn),
errdetail("found initial snapshot in snapbuild file")));
return true;
@@ -1829,7 +1831,7 @@ CheckPointSnapBuild(void)
uint32 hi;
uint32 lo;
XLogRecPtr lsn;
- struct stat statbuf;
+ struct stat statbuf;
if (strcmp(snap_de->d_name, ".") == 0 ||
strcmp(snap_de->d_name, "..") == 0)
@@ -1846,8 +1848,8 @@ CheckPointSnapBuild(void)
/*
* temporary filenames from SnapBuildSerialize() include the LSN and
* everything but are postfixed by .$pid.tmp. We can just remove them
- * the same as other files because there can be none that are currently
- * being written that are older than cutoff.
+ * the same as other files because there can be none that are
+ * currently being written that are older than cutoff.
*
* We just log a message if a file doesn't fit the pattern, it's
* probably some editors lock/state file or similar...