diff options
Diffstat (limited to 'src/backend/access')
-rw-r--r-- | src/backend/access/transam/twophase.c | 14 | ||||
-rw-r--r-- | src/backend/access/transam/xlog.c | 126 | ||||
-rw-r--r-- | src/backend/access/transam/xlogreader.c | 931 | ||||
-rw-r--r-- | src/backend/access/transam/xlogutils.c | 24 |
4 files changed, 644 insertions, 451 deletions
diff --git a/src/backend/access/transam/twophase.c b/src/backend/access/transam/twophase.c index 89335b64a24..3137cb3ecc1 100644 --- a/src/backend/access/transam/twophase.c +++ b/src/backend/access/transam/twophase.c @@ -1330,11 +1330,8 @@ XlogReadTwoPhaseData(XLogRecPtr lsn, char **buf, int *len) char *errormsg; TimeLineID save_currtli = ThisTimeLineID; - xlogreader = XLogReaderAllocate(wal_segment_size, NULL, - XL_ROUTINE(.page_read = &read_local_xlog_page, - .segment_open = &wal_segment_open, - .segment_close = &wal_segment_close), - NULL); + xlogreader = XLogReaderAllocate(wal_segment_size, NULL, wal_segment_close); + if (!xlogreader) ereport(ERROR, (errcode(ERRCODE_OUT_OF_MEMORY), @@ -1342,7 +1339,12 @@ XlogReadTwoPhaseData(XLogRecPtr lsn, char **buf, int *len) errdetail("Failed while allocating a WAL reading processor."))); XLogBeginRead(xlogreader, lsn); - record = XLogReadRecord(xlogreader, &errormsg); + while (XLogReadRecord(xlogreader, &record, &errormsg) == + XLREAD_NEED_DATA) + { + if (!read_local_xlog_page(xlogreader)) + break; + } /* * Restore immediately the timeline where it was previously, as diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index c1d4415a433..7faac01bf24 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -811,17 +811,13 @@ static XLogSegNo openLogSegNo = 0; * These variables are used similarly to the ones above, but for reading * the XLOG. Note, however, that readOff generally represents the offset * of the page just read, not the seek position of the FD itself, which - * will be just past that page. readLen indicates how much of the current - * page has been read into readBuf, and readSource indicates where we got - * the currently open file from. + * will be just past that page. readSource indicates where we got the + * currently open file from. * Note: we could use Reserve/ReleaseExternalFD to track consumption of * this FD too; but it doesn't currently seem worthwhile, since the XLOG is * not read by general-purpose sessions. */ static int readFile = -1; -static XLogSegNo readSegNo = 0; -static uint32 readOff = 0; -static uint32 readLen = 0; static XLogSource readSource = XLOG_FROM_ANY; /* @@ -838,13 +834,6 @@ static XLogSource currentSource = XLOG_FROM_ANY; static bool lastSourceFailed = false; static bool pendingWalRcvRestart = false; -typedef struct XLogPageReadPrivate -{ - int emode; - bool fetching_ckpt; /* are we fetching a checkpoint record? */ - bool randAccess; -} XLogPageReadPrivate; - /* * These variables track when we last obtained some WAL data to process, * and where we got it from. (XLogReceiptSource is initially the same as @@ -920,10 +909,12 @@ static bool InstallXLogFileSegment(XLogSegNo *segno, char *tmppath, static int XLogFileRead(XLogSegNo segno, int emode, TimeLineID tli, XLogSource source, bool notfoundOk); static int XLogFileReadAnyTLI(XLogSegNo segno, int emode, XLogSource source); -static int XLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr, - int reqLen, XLogRecPtr targetRecPtr, char *readBuf); +static bool XLogPageRead(XLogReaderState *state, + bool fetching_ckpt, int emode, bool randAccess); static bool WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess, - bool fetching_ckpt, XLogRecPtr tliRecPtr); + bool fetching_ckpt, + XLogRecPtr tliRecPtr, + XLogSegNo readSegNo); static int emode_for_corrupt_record(int emode, XLogRecPtr RecPtr); static void XLogFileClose(void); static void PreallocXlogFiles(XLogRecPtr endptr); @@ -1234,8 +1225,7 @@ XLogInsertRecord(XLogRecData *rdata, appendBinaryStringInfo(&recordBuf, rdata->data, rdata->len); if (!debug_reader) - debug_reader = XLogReaderAllocate(wal_segment_size, NULL, - XL_ROUTINE(), NULL); + debug_reader = XLogReaderAllocate(wal_segment_size, NULL, NULL); if (!debug_reader) { @@ -4373,12 +4363,7 @@ ReadRecord(XLogReaderState *xlogreader, int emode, bool fetching_ckpt) { XLogRecord *record; - XLogPageReadPrivate *private = (XLogPageReadPrivate *) xlogreader->private_data; - - /* Pass through parameters to XLogPageRead */ - private->fetching_ckpt = fetching_ckpt; - private->emode = emode; - private->randAccess = (xlogreader->ReadRecPtr == InvalidXLogRecPtr); + bool randAccess = (xlogreader->ReadRecPtr == InvalidXLogRecPtr); /* This is the first attempt to read this page. */ lastSourceFailed = false; @@ -4386,8 +4371,16 @@ ReadRecord(XLogReaderState *xlogreader, int emode, for (;;) { char *errormsg; + XLogReadRecordResult result; + + while ((result = XLogReadRecord(xlogreader, &record, &errormsg)) + == XLREAD_NEED_DATA) + { + if (!XLogPageRead(xlogreader, fetching_ckpt, emode, randAccess)) + break; + + } - record = XLogReadRecord(xlogreader, &errormsg); ReadRecPtr = xlogreader->ReadRecPtr; EndRecPtr = xlogreader->EndRecPtr; if (record == NULL) @@ -6457,7 +6450,6 @@ StartupXLOG(void) bool backupFromStandby = false; DBState dbstate_at_startup; XLogReaderState *xlogreader; - XLogPageReadPrivate private; bool promoted = false; struct stat st; @@ -6616,13 +6608,9 @@ StartupXLOG(void) OwnLatch(&XLogCtl->recoveryWakeupLatch); /* Set up XLOG reader facility */ - MemSet(&private, 0, sizeof(XLogPageReadPrivate)); xlogreader = - XLogReaderAllocate(wal_segment_size, NULL, - XL_ROUTINE(.page_read = &XLogPageRead, - .segment_open = NULL, - .segment_close = wal_segment_close), - &private); + XLogReaderAllocate(wal_segment_size, NULL, wal_segment_close); + if (!xlogreader) ereport(ERROR, (errcode(ERRCODE_OUT_OF_MEMORY), @@ -7819,7 +7807,8 @@ StartupXLOG(void) XLogRecPtr pageBeginPtr; pageBeginPtr = EndOfLog - (EndOfLog % XLOG_BLCKSZ); - Assert(readOff == XLogSegmentOffset(pageBeginPtr, wal_segment_size)); + Assert(XLogSegmentOffset(xlogreader->readPagePtr, wal_segment_size) == + XLogSegmentOffset(pageBeginPtr, wal_segment_size)); firstIdx = XLogRecPtrToBufIdx(EndOfLog); @@ -12107,13 +12096,15 @@ CancelBackup(void) * XLogPageRead() to try fetching the record from another source, or to * sleep and retry. */ -static int -XLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr, int reqLen, - XLogRecPtr targetRecPtr, char *readBuf) -{ - XLogPageReadPrivate *private = - (XLogPageReadPrivate *) xlogreader->private_data; - int emode = private->emode; +static bool +XLogPageRead(XLogReaderState *state, + bool fetching_ckpt, int emode, bool randAccess) +{ + char *readBuf = state->readBuf; + XLogRecPtr targetPagePtr = state->readPagePtr; + int reqLen = state->reqLen; + int readLen = 0; + XLogRecPtr targetRecPtr = state->ReadRecPtr; uint32 targetPageOff; XLogSegNo targetSegNo PG_USED_FOR_ASSERTS_ONLY; int r; @@ -12126,7 +12117,7 @@ XLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr, int reqLen, * is not in the currently open one. */ if (readFile >= 0 && - !XLByteInSeg(targetPagePtr, readSegNo, wal_segment_size)) + !XLByteInSeg(targetPagePtr, state->seg.ws_segno, wal_segment_size)) { /* * Request a restartpoint if we've replayed too much xlog since the @@ -12134,10 +12125,10 @@ XLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr, int reqLen, */ if (bgwriterLaunched) { - if (XLogCheckpointNeeded(readSegNo)) + if (XLogCheckpointNeeded(state->seg.ws_segno)) { (void) GetRedoRecPtr(); - if (XLogCheckpointNeeded(readSegNo)) + if (XLogCheckpointNeeded(state->seg.ws_segno)) RequestCheckpoint(CHECKPOINT_CAUSE_XLOG); } } @@ -12147,7 +12138,7 @@ XLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr, int reqLen, readSource = XLOG_FROM_ANY; } - XLByteToSeg(targetPagePtr, readSegNo, wal_segment_size); + XLByteToSeg(targetPagePtr, state->seg.ws_segno, wal_segment_size); retry: /* See if we need to retrieve more data */ @@ -12156,17 +12147,15 @@ retry: flushedUpto < targetPagePtr + reqLen)) { if (!WaitForWALToBecomeAvailable(targetPagePtr + reqLen, - private->randAccess, - private->fetching_ckpt, - targetRecPtr)) + randAccess, fetching_ckpt, + targetRecPtr, state->seg.ws_segno)) { if (readFile >= 0) close(readFile); readFile = -1; - readLen = 0; readSource = XLOG_FROM_ANY; - - return -1; + XLogReaderSetInputData(state, -1); + return false; } } @@ -12193,40 +12182,36 @@ retry: else readLen = XLOG_BLCKSZ; - /* Read the requested page */ - readOff = targetPageOff; - pgstat_report_wait_start(WAIT_EVENT_WAL_READ); - r = pg_pread(readFile, readBuf, XLOG_BLCKSZ, (off_t) readOff); + r = pg_pread(readFile, readBuf, XLOG_BLCKSZ, (off_t) targetPageOff); if (r != XLOG_BLCKSZ) { char fname[MAXFNAMELEN]; int save_errno = errno; pgstat_report_wait_end(); - XLogFileName(fname, curFileTLI, readSegNo, wal_segment_size); + XLogFileName(fname, curFileTLI, state->seg.ws_segno, wal_segment_size); if (r < 0) { errno = save_errno; ereport(emode_for_corrupt_record(emode, targetPagePtr + reqLen), (errcode_for_file_access(), errmsg("could not read from log segment %s, offset %u: %m", - fname, readOff))); + fname, targetPageOff))); } else ereport(emode_for_corrupt_record(emode, targetPagePtr + reqLen), (errcode(ERRCODE_DATA_CORRUPTED), errmsg("could not read from log segment %s, offset %u: read %d of %zu", - fname, readOff, r, (Size) XLOG_BLCKSZ))); + fname, targetPageOff, r, (Size) XLOG_BLCKSZ))); goto next_record_is_invalid; } pgstat_report_wait_end(); - Assert(targetSegNo == readSegNo); - Assert(targetPageOff == readOff); - Assert(reqLen <= readLen); + Assert(targetSegNo == state->seg.ws_segno); + Assert(readLen >= reqLen); - xlogreader->seg.ws_tli = curFileTLI; + state->seg.ws_tli = curFileTLI; /* * Check the page header immediately, so that we can retry immediately if @@ -12254,14 +12239,16 @@ retry: * Validating the page header is cheap enough that doing it twice * shouldn't be a big deal from a performance point of view. */ - if (!XLogReaderValidatePageHeader(xlogreader, targetPagePtr, readBuf)) + if (!XLogReaderValidatePageHeader(state, targetPagePtr, readBuf)) { - /* reset any error XLogReaderValidatePageHeader() might have set */ - xlogreader->errormsg_buf[0] = '\0'; + /* reset any error StateValidatePageHeader() might have set */ + state->errormsg_buf[0] = '\0'; goto next_record_is_invalid; } - return readLen; + Assert(state->readPagePtr == targetPagePtr); + XLogReaderSetInputData(state, readLen); + return true; next_record_is_invalid: lastSourceFailed = true; @@ -12269,14 +12256,14 @@ next_record_is_invalid: if (readFile >= 0) close(readFile); readFile = -1; - readLen = 0; readSource = XLOG_FROM_ANY; /* In standby-mode, keep trying */ if (StandbyMode) goto retry; - else - return -1; + + XLogReaderSetInputData(state, -1); + return false; } /* @@ -12307,7 +12294,8 @@ next_record_is_invalid: */ static bool WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess, - bool fetching_ckpt, XLogRecPtr tliRecPtr) + bool fetching_ckpt, XLogRecPtr tliRecPtr, + XLogSegNo readSegNo) { static TimestampTz last_fail_time = 0; TimestampTz now; diff --git a/src/backend/access/transam/xlogreader.c b/src/backend/access/transam/xlogreader.c index 42738eb940c..02257768ec8 100644 --- a/src/backend/access/transam/xlogreader.c +++ b/src/backend/access/transam/xlogreader.c @@ -36,11 +36,11 @@ static void report_invalid_record(XLogReaderState *state, const char *fmt,...) pg_attribute_printf(2, 3); static bool allocate_recordbuf(XLogReaderState *state, uint32 reclength); -static int ReadPageInternal(XLogReaderState *state, XLogRecPtr pageptr, - int reqLen); +static bool XLogNeedData(XLogReaderState *state, XLogRecPtr pageptr, + int reqLen, bool header_inclusive); static void XLogReaderInvalReadState(XLogReaderState *state); static bool ValidXLogRecordHeader(XLogReaderState *state, XLogRecPtr RecPtr, - XLogRecPtr PrevRecPtr, XLogRecord *record, bool randAccess); + XLogRecPtr PrevRecPtr, XLogRecord *record); static bool ValidXLogRecord(XLogReaderState *state, XLogRecord *record, XLogRecPtr recptr); static void ResetDecoder(XLogReaderState *state); @@ -73,7 +73,7 @@ report_invalid_record(XLogReaderState *state, const char *fmt,...) */ XLogReaderState * XLogReaderAllocate(int wal_segment_size, const char *waldir, - XLogReaderRoutine *routine, void *private_data) + WALSegmentCleanupCB cleanup_cb) { XLogReaderState *state; @@ -84,7 +84,7 @@ XLogReaderAllocate(int wal_segment_size, const char *waldir, return NULL; /* initialize caller-provided support functions */ - state->routine = *routine; + state->cleanup_cb = cleanup_cb; state->max_block_id = -1; @@ -107,9 +107,7 @@ XLogReaderAllocate(int wal_segment_size, const char *waldir, WALOpenSegmentInit(&state->seg, &state->segcxt, wal_segment_size, waldir); - /* system_identifier initialized to zeroes above */ - state->private_data = private_data; - /* ReadRecPtr, EndRecPtr and readLen initialized to zeroes above */ + /* ReadRecPtr, EndRecPtr, reqLen and readLen initialized to zeroes above */ state->errormsg_buf = palloc_extended(MAX_ERRORMSG_LEN + 1, MCXT_ALLOC_NO_OOM); if (!state->errormsg_buf) @@ -140,8 +138,8 @@ XLogReaderFree(XLogReaderState *state) { int block_id; - if (state->seg.ws_file != -1) - state->routine.segment_close(state); + if (state->seg.ws_file >= 0) + state->cleanup_cb(state); for (block_id = 0; block_id <= XLR_MAX_BLOCK_ID; block_id++) { @@ -246,6 +244,7 @@ XLogBeginRead(XLogReaderState *state, XLogRecPtr RecPtr) /* Begin at the passed-in record pointer. */ state->EndRecPtr = RecPtr; state->ReadRecPtr = InvalidXLogRecPtr; + state->readRecordState = XLREAD_NEXT_RECORD; } /* @@ -254,303 +253,456 @@ XLogBeginRead(XLogReaderState *state, XLogRecPtr RecPtr) * XLogBeginRead() or XLogFindNextRecord() must be called before the first call * to XLogReadRecord(). * - * If the page_read callback fails to read the requested data, NULL is - * returned. The callback is expected to have reported the error; errormsg - * is set to NULL. + * This function may return XLREAD_NEED_DATA several times before returning a + * result record. The caller shall read in some new data then call this + * function again with the same parameters. * - * If the reading fails for some other reason, NULL is also returned, and - * *errormsg is set to a string with details of the failure. + * When a record is successfully read, returns XLREAD_SUCCESS with result + * record being stored in *record. Otherwise *record is NULL. * - * The returned pointer (or *errormsg) points to an internal buffer that's - * valid until the next call to XLogReadRecord. + * Returns XLREAD_NEED_DATA if more data is needed to finish decoding the + * current record. In that case, state->readPagePtr and state->reqLen inform + * the desired position and minimum length of data needed. The caller shall + * read in the requested data and set state->readBuf to point to a buffer + * containing it. The caller must also set state->seg->ws_tli and + * state->readLen to indicate the timeline that it was read from, and the + * length of data that is now available (which must be >= given reqLen), + * respectively. + * + * If invalid data is encountered, returns XLREAD_FAIL and sets *record to + * NULL. *errormsg is set to a string with details of the failure. The + * returned pointer (or *errormsg) points to an internal buffer that's valid + * until the next call to XLogReadRecord. + * + * + * This function runs a state machine consisting of the following states. + * + * XLREAD_NEXT_RECORD: + * The initial state. If called with a valid XLogRecPtr, try to read a + * record at that position. If invalid RecPtr is given try to read a record + * just after the last one read. The next state is XLREAD_TOT_LEN. + * + * XLREAD_TOT_LEN: + * Examining record header. Ends after reading record length. + * recordRemainLen and recordGotLen are initialized. The next state is + * XLREAD_FIRST_FRAGMENT. + * + * XLREAD_FIRST_FRAGMENT: + * Reading the first fragment. Goes to XLREAD_NEXT_RECORD if that's all or + * XLREAD_CONTINUATION if we need more data. + + * XLREAD_CONTINUATION: + * Reading continuation of record. If the whole record is now decoded, goes + * to XLREAD_NEXT_RECORD. During this state, recordRemainLen indicates how + * much is left. + * + * If invalid data is found in any state, the state machine stays at the + * current state. This behavior allows us to continue reading a record + * after switching to a different source, during streaming replication. */ -XLogRecord * -XLogReadRecord(XLogReaderState *state, char **errormsg) +XLogReadRecordResult +XLogReadRecord(XLogReaderState *state, XLogRecord **record, char **errormsg) { - XLogRecPtr RecPtr; - XLogRecord *record; - XLogRecPtr targetPagePtr; - bool randAccess; - uint32 len, - total_len; - uint32 targetRecOff; - uint32 pageHeaderSize; - bool gotheader; - int readOff; + XLogRecord *prec; - /* - * randAccess indicates whether to verify the previous-record pointer of - * the record we're reading. We only do this if we're reading - * sequentially, which is what we initially assume. - */ - randAccess = false; + *record = NULL; /* reset error state */ *errormsg = NULL; state->errormsg_buf[0] = '\0'; - ResetDecoder(state); + switch (state->readRecordState) + { + case XLREAD_NEXT_RECORD: + ResetDecoder(state); - RecPtr = state->EndRecPtr; + if (state->ReadRecPtr != InvalidXLogRecPtr) + { + /* read the record after the one we just read */ - if (state->ReadRecPtr != InvalidXLogRecPtr) - { - /* read the record after the one we just read */ + /* + * EndRecPtr is pointing to end+1 of the previous WAL record. + * If we're at a page boundary, no more records can fit on the + * current page. We must skip over the page header, but we + * can't do that until we've read in the page, since the + * header size is variable. + */ + state->PrevRecPtr = state->ReadRecPtr; + state->ReadRecPtr = state->EndRecPtr; + } + else + { + /* + * Caller supplied a position to start at. + * + * In this case, EndRecPtr should already be pointing to a + * valid record starting position. + */ + Assert(XRecOffIsValid(state->EndRecPtr)); + state->ReadRecPtr = state->EndRecPtr; - /* - * EndRecPtr is pointing to end+1 of the previous WAL record. If - * we're at a page boundary, no more records can fit on the current - * page. We must skip over the page header, but we can't do that until - * we've read in the page, since the header size is variable. - */ - } - else - { - /* - * Caller supplied a position to start at. - * - * In this case, EndRecPtr should already be pointing to a valid - * record starting position. - */ - Assert(XRecOffIsValid(RecPtr)); - randAccess = true; - } + /* + * We cannot verify the previous-record pointer when we're + * seeking to a particular record. Reset PrevRecPtr so that we + * won't try doing that. + */ + state->PrevRecPtr = InvalidXLogRecPtr; + state->EndRecPtr = InvalidXLogRecPtr; /* to be tidy */ + } - state->currRecPtr = RecPtr; + state->record_verified = false; + state->readRecordState = XLREAD_TOT_LEN; + /* fall through */ - targetPagePtr = RecPtr - (RecPtr % XLOG_BLCKSZ); - targetRecOff = RecPtr % XLOG_BLCKSZ; + case XLREAD_TOT_LEN: + { + uint32 total_len; + uint32 pageHeaderSize; + XLogRecPtr targetPagePtr; + uint32 targetRecOff; + XLogPageHeader pageHeader; - /* - * Read the page containing the record into state->readBuf. Request enough - * byte to cover the whole record header, or at least the part of it that - * fits on the same page. - */ - readOff = ReadPageInternal(state, targetPagePtr, - Min(targetRecOff + SizeOfXLogRecord, XLOG_BLCKSZ)); - if (readOff < 0) - goto err; + targetPagePtr = + state->ReadRecPtr - (state->ReadRecPtr % XLOG_BLCKSZ); + targetRecOff = state->ReadRecPtr % XLOG_BLCKSZ; - /* - * ReadPageInternal always returns at least the page header, so we can - * examine it now. - */ - pageHeaderSize = XLogPageHeaderSize((XLogPageHeader) state->readBuf); - if (targetRecOff == 0) - { - /* - * At page start, so skip over page header. - */ - RecPtr += pageHeaderSize; - targetRecOff = pageHeaderSize; - } - else if (targetRecOff < pageHeaderSize) - { - report_invalid_record(state, "invalid record offset at %X/%X", - LSN_FORMAT_ARGS(RecPtr)); - goto err; - } + /* + * Check if we have enough data. For the first record in the + * page, the requesting length doesn't contain page header. + */ + if (XLogNeedData(state, targetPagePtr, + Min(targetRecOff + SizeOfXLogRecord, XLOG_BLCKSZ), + targetRecOff != 0)) + return XLREAD_NEED_DATA; - if ((((XLogPageHeader) state->readBuf)->xlp_info & XLP_FIRST_IS_CONTRECORD) && - targetRecOff == pageHeaderSize) - { - report_invalid_record(state, "contrecord is requested by %X/%X", - LSN_FORMAT_ARGS(RecPtr)); - goto err; - } + /* error out if caller supplied bogus page */ + if (!state->page_verified) + goto err; - /* ReadPageInternal has verified the page header */ - Assert(pageHeaderSize <= readOff); + /* examine page header now. */ + pageHeaderSize = + XLogPageHeaderSize((XLogPageHeader) state->readBuf); + if (targetRecOff == 0) + { + /* At page start, so skip over page header. */ + state->ReadRecPtr += pageHeaderSize; + targetRecOff = pageHeaderSize; + } + else if (targetRecOff < pageHeaderSize) + { + report_invalid_record(state, "invalid record offset at %X/%X", + LSN_FORMAT_ARGS(state->ReadRecPtr)); + goto err; + } - /* - * Read the record length. - * - * NB: Even though we use an XLogRecord pointer here, the whole record - * header might not fit on this page. xl_tot_len is the first field of the - * struct, so it must be on this page (the records are MAXALIGNed), but we - * cannot access any other fields until we've verified that we got the - * whole header. - */ - record = (XLogRecord *) (state->readBuf + RecPtr % XLOG_BLCKSZ); - total_len = record->xl_tot_len; + pageHeader = (XLogPageHeader) state->readBuf; + if ((pageHeader->xlp_info & XLP_FIRST_IS_CONTRECORD) && + targetRecOff == pageHeaderSize) + { + report_invalid_record(state, "contrecord is requested by %X/%X", + (uint32) (state->ReadRecPtr >> 32), + (uint32) state->ReadRecPtr); + goto err; + } - /* - * If the whole record header is on this page, validate it immediately. - * Otherwise do just a basic sanity check on xl_tot_len, and validate the - * rest of the header after reading it from the next page. The xl_tot_len - * check is necessary here to ensure that we enter the "Need to reassemble - * record" code path below; otherwise we might fail to apply - * ValidXLogRecordHeader at all. - */ - if (targetRecOff <= XLOG_BLCKSZ - SizeOfXLogRecord) - { - if (!ValidXLogRecordHeader(state, RecPtr, state->ReadRecPtr, record, - randAccess)) - goto err; - gotheader = true; - } - else - { - /* XXX: more validation should be done here */ - if (total_len < SizeOfXLogRecord) - { - report_invalid_record(state, - "invalid record length at %X/%X: wanted %u, got %u", - LSN_FORMAT_ARGS(RecPtr), - (uint32) SizeOfXLogRecord, total_len); - goto err; - } - gotheader = false; - } + /* XLogNeedData has verified the page header */ + Assert(pageHeaderSize <= state->readLen); - len = XLOG_BLCKSZ - RecPtr % XLOG_BLCKSZ; - if (total_len > len) - { - /* Need to reassemble record */ - char *contdata; - XLogPageHeader pageHeader; - char *buffer; - uint32 gotlen; + /* + * Read the record length. + * + * NB: Even though we use an XLogRecord pointer here, the + * whole record header might not fit on this page. xl_tot_len + * is the first field of the struct, so it must be on this + * page (the records are MAXALIGNed), but we cannot access any + * other fields until we've verified that we got the whole + * header. + */ + prec = (XLogRecord *) (state->readBuf + + state->ReadRecPtr % XLOG_BLCKSZ); + total_len = prec->xl_tot_len; - /* - * Enlarge readRecordBuf as needed. - */ - if (total_len > state->readRecordBufSize && - !allocate_recordbuf(state, total_len)) - { - /* We treat this as a "bogus data" condition */ - report_invalid_record(state, "record length %u at %X/%X too long", - total_len, LSN_FORMAT_ARGS(RecPtr)); - goto err; - } + /* + * If the whole record header is on this page, validate it + * immediately. Otherwise do just a basic sanity check on + * xl_tot_len, and validate the rest of the header after + * reading it from the next page. The xl_tot_len check is + * necessary here to ensure that we enter the + * XLREAD_CONTINUATION state below; otherwise we might fail to + * apply ValidXLogRecordHeader at all. + */ + if (targetRecOff <= XLOG_BLCKSZ - SizeOfXLogRecord) + { + if (!ValidXLogRecordHeader(state, state->ReadRecPtr, + state->PrevRecPtr, prec)) + goto err; - /* Copy the first fragment of the record from the first page. */ - memcpy(state->readRecordBuf, - state->readBuf + RecPtr % XLOG_BLCKSZ, len); - buffer = state->readRecordBuf + len; - gotlen = len; + state->record_verified = true; + } + else + { + /* XXX: more validation should be done here */ + if (total_len < SizeOfXLogRecord) + { + report_invalid_record(state, + "invalid record length at %X/%X: wanted %u, got %u", + LSN_FORMAT_ARGS(state->ReadRecPtr), + (uint32) SizeOfXLogRecord, total_len); + goto err; + } + } - do - { - /* Calculate pointer to beginning of next page */ - targetPagePtr += XLOG_BLCKSZ; + /* + * Wait for the rest of the record, or the part of the record + * that fit on the first page if crossed a page boundary, to + * become available. + */ + state->recordGotLen = 0; + state->recordRemainLen = total_len; + state->readRecordState = XLREAD_FIRST_FRAGMENT; + } + /* fall through */ - /* Wait for the next page to become available */ - readOff = ReadPageInternal(state, targetPagePtr, - Min(total_len - gotlen + SizeOfXLogShortPHD, - XLOG_BLCKSZ)); + case XLREAD_FIRST_FRAGMENT: + { + uint32 total_len = state->recordRemainLen; + uint32 request_len; + uint32 record_len; + XLogRecPtr targetPagePtr; + uint32 targetRecOff; - if (readOff < 0) - goto err; + /* + * Wait for the rest of the record on the first page to become + * available + */ + targetPagePtr = + state->ReadRecPtr - (state->ReadRecPtr % XLOG_BLCKSZ); + targetRecOff = state->ReadRecPtr % XLOG_BLCKSZ; - Assert(SizeOfXLogShortPHD <= readOff); + request_len = Min(targetRecOff + total_len, XLOG_BLCKSZ); + record_len = request_len - targetRecOff; - /* Check that the continuation on next page looks valid */ - pageHeader = (XLogPageHeader) state->readBuf; - if (!(pageHeader->xlp_info & XLP_FIRST_IS_CONTRECORD)) - { - report_invalid_record(state, - "there is no contrecord flag at %X/%X", - LSN_FORMAT_ARGS(RecPtr)); - goto err; - } + /* ReadRecPtr contains page header */ + Assert(targetRecOff != 0); + if (XLogNeedData(state, targetPagePtr, request_len, true)) + return XLREAD_NEED_DATA; - /* - * Cross-check that xlp_rem_len agrees with how much of the record - * we expect there to be left. - */ - if (pageHeader->xlp_rem_len == 0 || - total_len != (pageHeader->xlp_rem_len + gotlen)) - { - report_invalid_record(state, - "invalid contrecord length %u (expected %lld) at %X/%X", - pageHeader->xlp_rem_len, - ((long long) total_len) - gotlen, - LSN_FORMAT_ARGS(RecPtr)); - goto err; - } + /* error out if caller supplied bogus page */ + if (!state->page_verified) + goto err; - /* Append the continuation from this page to the buffer */ - pageHeaderSize = XLogPageHeaderSize(pageHeader); + prec = (XLogRecord *) (state->readBuf + targetRecOff); - if (readOff < pageHeaderSize) - readOff = ReadPageInternal(state, targetPagePtr, - pageHeaderSize); + /* validate record header if not yet */ + if (!state->record_verified && record_len >= SizeOfXLogRecord) + { + if (!ValidXLogRecordHeader(state, state->ReadRecPtr, + state->PrevRecPtr, prec)) + goto err; - Assert(pageHeaderSize <= readOff); + state->record_verified = true; + } - contdata = (char *) state->readBuf + pageHeaderSize; - len = XLOG_BLCKSZ - pageHeaderSize; - if (pageHeader->xlp_rem_len < len) - len = pageHeader->xlp_rem_len; - if (readOff < pageHeaderSize + len) - readOff = ReadPageInternal(state, targetPagePtr, - pageHeaderSize + len); + if (total_len == record_len) + { + /* Record does not cross a page boundary */ + Assert(state->record_verified); - memcpy(buffer, (char *) contdata, len); - buffer += len; - gotlen += len; + if (!ValidXLogRecord(state, prec, state->ReadRecPtr)) + goto err; - /* If we just reassembled the record header, validate it. */ - if (!gotheader) - { - record = (XLogRecord *) state->readRecordBuf; - if (!ValidXLogRecordHeader(state, RecPtr, state->ReadRecPtr, - record, randAccess)) + state->record_verified = true; /* to be tidy */ + + /* We already checked the header earlier */ + state->EndRecPtr = state->ReadRecPtr + MAXALIGN(record_len); + + *record = prec; + state->readRecordState = XLREAD_NEXT_RECORD; + break; + } + + /* + * The record continues on the next page. Need to reassemble + * record + */ + Assert(total_len > record_len); + + /* Enlarge readRecordBuf as needed. */ + if (total_len > state->readRecordBufSize && + !allocate_recordbuf(state, total_len)) + { + /* We treat this as a "bogus data" condition */ + report_invalid_record(state, + "record length %u at %X/%X too long", + total_len, + LSN_FORMAT_ARGS(state->ReadRecPtr)); goto err; - gotheader = true; + } + + /* Copy the first fragment of the record from the first page. */ + memcpy(state->readRecordBuf, state->readBuf + targetRecOff, + record_len); + state->recordGotLen += record_len; + state->recordRemainLen -= record_len; + + /* Calculate pointer to beginning of next page */ + state->recordContRecPtr = state->ReadRecPtr + record_len; + Assert(state->recordContRecPtr % XLOG_BLCKSZ == 0); + + state->readRecordState = XLREAD_CONTINUATION; } - } while (gotlen < total_len); + /* fall through */ - Assert(gotheader); + case XLREAD_CONTINUATION: + { + XLogPageHeader pageHeader = NULL; + uint32 pageHeaderSize; + XLogRecPtr targetPagePtr = InvalidXLogRecPtr; - record = (XLogRecord *) state->readRecordBuf; - if (!ValidXLogRecord(state, record, RecPtr)) - goto err; + /* + * we enter this state only if we haven't read the whole + * record. + */ + Assert(state->recordRemainLen > 0); - pageHeaderSize = XLogPageHeaderSize((XLogPageHeader) state->readBuf); - state->ReadRecPtr = RecPtr; - state->EndRecPtr = targetPagePtr + pageHeaderSize - + MAXALIGN(pageHeader->xlp_rem_len); - } - else - { - /* Wait for the record data to become available */ - readOff = ReadPageInternal(state, targetPagePtr, - Min(targetRecOff + total_len, XLOG_BLCKSZ)); - if (readOff < 0) - goto err; + while (state->recordRemainLen > 0) + { + char *contdata; + uint32 request_len PG_USED_FOR_ASSERTS_ONLY; + uint32 record_len; + + /* Wait for the next page to become available */ + targetPagePtr = state->recordContRecPtr; + + /* this request contains page header */ + Assert(targetPagePtr != 0); + if (XLogNeedData(state, targetPagePtr, + Min(state->recordRemainLen, XLOG_BLCKSZ), + false)) + return XLREAD_NEED_DATA; + + if (!state->page_verified) + goto err; + + Assert(SizeOfXLogShortPHD <= state->readLen); + + /* Check that the continuation on next page looks valid */ + pageHeader = (XLogPageHeader) state->readBuf; + if (!(pageHeader->xlp_info & XLP_FIRST_IS_CONTRECORD)) + { + report_invalid_record( + state, + "there is no contrecord flag at %X/%X reading %X/%X", + (uint32) (state->recordContRecPtr >> 32), + (uint32) state->recordContRecPtr, + (uint32) (state->ReadRecPtr >> 32), + (uint32) state->ReadRecPtr); + goto err; + } + + /* + * Cross-check that xlp_rem_len agrees with how much of + * the record we expect there to be left. + */ + if (pageHeader->xlp_rem_len == 0 || + pageHeader->xlp_rem_len != state->recordRemainLen) + { + report_invalid_record( + state, + "invalid contrecord length %u at %X/%X reading %X/%X, expected %u", + pageHeader->xlp_rem_len, + (uint32) (state->recordContRecPtr >> 32), + (uint32) state->recordContRecPtr, + (uint32) (state->ReadRecPtr >> 32), + (uint32) state->ReadRecPtr, + state->recordRemainLen); + goto err; + } + + /* Append the continuation from this page to the buffer */ + pageHeaderSize = XLogPageHeaderSize(pageHeader); + + /* + * XLogNeedData should have ensured that the whole page + * header was read + */ + Assert(pageHeaderSize <= state->readLen); + + contdata = (char *) state->readBuf + pageHeaderSize; + record_len = XLOG_BLCKSZ - pageHeaderSize; + if (pageHeader->xlp_rem_len < record_len) + record_len = pageHeader->xlp_rem_len; + + request_len = record_len + pageHeaderSize; + + /* + * XLogNeedData should have ensured all needed data was + * read + */ + Assert(request_len <= state->readLen); + + memcpy(state->readRecordBuf + state->recordGotLen, + (char *) contdata, record_len); + state->recordGotLen += record_len; + state->recordRemainLen -= record_len; + + /* If we just reassembled the record header, validate it. */ + if (!state->record_verified) + { + Assert(state->recordGotLen >= SizeOfXLogRecord); + if (!ValidXLogRecordHeader(state, state->ReadRecPtr, + state->PrevRecPtr, + (XLogRecord *) state->readRecordBuf)) + goto err; + + state->record_verified = true; + } + + /* + * Calculate pointer to beginning of next page, and + * continue + */ + state->recordContRecPtr += XLOG_BLCKSZ; + } - /* Record does not cross a page boundary */ - if (!ValidXLogRecord(state, record, RecPtr)) - goto err; + /* targetPagePtr is pointing the last-read page here */ + prec = (XLogRecord *) state->readRecordBuf; + if (!ValidXLogRecord(state, prec, state->ReadRecPtr)) + goto err; - state->EndRecPtr = RecPtr + MAXALIGN(total_len); + pageHeaderSize = + XLogPageHeaderSize((XLogPageHeader) state->readBuf); + state->EndRecPtr = targetPagePtr + pageHeaderSize + + MAXALIGN(pageHeader->xlp_rem_len); - state->ReadRecPtr = RecPtr; + *record = prec; + state->readRecordState = XLREAD_NEXT_RECORD; + break; + } } /* * Special processing if it's an XLOG SWITCH record */ - if (record->xl_rmid == RM_XLOG_ID && - (record->xl_info & ~XLR_INFO_MASK) == XLOG_SWITCH) + if ((*record)->xl_rmid == RM_XLOG_ID && + ((*record)->xl_info & ~XLR_INFO_MASK) == XLOG_SWITCH) { /* Pretend it extends to end of segment */ state->EndRecPtr += state->segcxt.ws_segsize - 1; state->EndRecPtr -= XLogSegmentOffset(state->EndRecPtr, state->segcxt.ws_segsize); } - if (DecodeXLogRecord(state, record, errormsg)) - return record; - else - return NULL; + if (DecodeXLogRecord(state, *record, errormsg)) + return XLREAD_SUCCESS; + + *record = NULL; + return XLREAD_FAIL; err: /* - * Invalidate the read state. We might read from a different source after + * Invalidate the read page. We might read from a different source after * failure. */ XLogReaderInvalReadState(state); @@ -558,113 +710,141 @@ err: if (state->errormsg_buf[0] != '\0') *errormsg = state->errormsg_buf; - return NULL; + *record = NULL; + return XLREAD_FAIL; } /* - * Read a single xlog page including at least [pageptr, reqLen] of valid data - * via the page_read() callback. + * Checks that an xlog page loaded in state->readBuf is including at least + * [pageptr, reqLen] and the page is valid. header_inclusive indicates that + * reqLen is calculated including page header length. + * + * Returns false if the buffer already contains the requested data, or found + * error. state->page_verified is set to true for the former and false for the + * latter. * - * Returns -1 if the required page cannot be read for some reason; errormsg_buf - * is set in that case (unless the error occurs in the page_read callback). + * Otherwise returns true and requests data loaded onto state->readBuf by + * state->readPagePtr and state->readLen. The caller shall call this function + * again after filling the buffer at least with that portion of data and set + * state->readLen to the length of actually loaded data. * - * We fetch the page from a reader-local cache if we know we have the required - * data and if there hasn't been any error since caching the data. + * If header_inclusive is false, corrects reqLen internally by adding the + * actual page header length and may request caller for new data. */ -static int -ReadPageInternal(XLogReaderState *state, XLogRecPtr pageptr, int reqLen) +static bool +XLogNeedData(XLogReaderState *state, XLogRecPtr pageptr, int reqLen, + bool header_inclusive) { - int readLen; uint32 targetPageOff; XLogSegNo targetSegNo; - XLogPageHeader hdr; + uint32 addLen = 0; - Assert((pageptr % XLOG_BLCKSZ) == 0); + /* Some data is loaded, but page header is not verified yet. */ + if (!state->page_verified && + !XLogRecPtrIsInvalid(state->readPagePtr) && state->readLen >= 0) + { + uint32 pageHeaderSize; - XLByteToSeg(pageptr, targetSegNo, state->segcxt.ws_segsize); - targetPageOff = XLogSegmentOffset(pageptr, state->segcxt.ws_segsize); + /* just loaded new data so needs to verify page header */ - /* check whether we have all the requested data already */ - if (targetSegNo == state->seg.ws_segno && - targetPageOff == state->segoff && reqLen <= state->readLen) - return state->readLen; + /* The caller must have loaded at least page header */ + Assert(state->readLen >= SizeOfXLogShortPHD); - /* - * Data is not in our buffer. - * - * Every time we actually read the segment, even if we looked at parts of - * it before, we need to do verification as the page_read callback might - * now be rereading data from a different source. - * - * Whenever switching to a new WAL segment, we read the first page of the - * file and validate its header, even if that's not where the target - * record is. This is so that we can check the additional identification - * info that is present in the first page's "long" header. - */ - if (targetSegNo != state->seg.ws_segno && targetPageOff != 0) - { - XLogRecPtr targetSegmentPtr = pageptr - targetPageOff; + /* + * We have enough data to check the header length. Recheck the loaded + * length against the actual header length. + */ + pageHeaderSize = XLogPageHeaderSize((XLogPageHeader) state->readBuf); - readLen = state->routine.page_read(state, targetSegmentPtr, XLOG_BLCKSZ, - state->currRecPtr, - state->readBuf); - if (readLen < 0) - goto err; + /* Request more data if we don't have the full header. */ + if (state->readLen < pageHeaderSize) + { + state->reqLen = pageHeaderSize; + return true; + } - /* we can be sure to have enough WAL available, we scrolled back */ - Assert(readLen == XLOG_BLCKSZ); + /* Now that we know we have the full header, validate it. */ + if (!XLogReaderValidatePageHeader(state, state->readPagePtr, + (char *) state->readBuf)) + { + /* That's bad. Force reading the page again. */ + XLogReaderInvalReadState(state); - if (!XLogReaderValidatePageHeader(state, targetSegmentPtr, - state->readBuf)) - goto err; + return false; + } + + state->page_verified = true; + + XLByteToSeg(state->readPagePtr, state->seg.ws_segno, + state->segcxt.ws_segsize); } /* - * First, read the requested data length, but at least a short page header - * so that we can validate it. + * The loaded page may not be the one caller is supposing to read when we + * are verifying the first page of new segment. In that case, skip further + * verification and immediately load the target page. */ - readLen = state->routine.page_read(state, pageptr, Max(reqLen, SizeOfXLogShortPHD), - state->currRecPtr, - state->readBuf); - if (readLen < 0) - goto err; + if (state->page_verified && pageptr == state->readPagePtr) + { + /* + * calculate additional length for page header keeping the total + * length within the block size. + */ + if (!header_inclusive) + { + uint32 pageHeaderSize = + XLogPageHeaderSize((XLogPageHeader) state->readBuf); - Assert(readLen <= XLOG_BLCKSZ); + addLen = pageHeaderSize; + if (reqLen + pageHeaderSize <= XLOG_BLCKSZ) + addLen = pageHeaderSize; + else + addLen = XLOG_BLCKSZ - reqLen; + } - /* Do we have enough data to check the header length? */ - if (readLen <= SizeOfXLogShortPHD) - goto err; + /* Return if we already have it. */ + if (reqLen + addLen <= state->readLen) + return false; + } - Assert(readLen >= reqLen); + /* Data is not in our buffer, request the caller for it. */ + XLByteToSeg(pageptr, targetSegNo, state->segcxt.ws_segsize); + targetPageOff = XLogSegmentOffset(pageptr, state->segcxt.ws_segsize); + Assert((pageptr % XLOG_BLCKSZ) == 0); - hdr = (XLogPageHeader) state->readBuf; + /* + * Every time we request to load new data of a page to the caller, even if + * we looked at a part of it before, we need to do verification on the + * next invocation as the caller might now be rereading data from a + * different source. + */ + state->page_verified = false; - /* still not enough */ - if (readLen < XLogPageHeaderSize(hdr)) + /* + * Whenever switching to a new WAL segment, we read the first page of the + * file and validate its header, even if that's not where the target + * record is. This is so that we can check the additional identification + * info that is present in the first page's "long" header. Don't do this + * if the caller requested the first page in the segment. + */ + if (targetSegNo != state->seg.ws_segno && targetPageOff != 0) { - readLen = state->routine.page_read(state, pageptr, XLogPageHeaderSize(hdr), - state->currRecPtr, - state->readBuf); - if (readLen < 0) - goto err; + /* + * Then we'll see that the targetSegNo now matches the ws_segno, and + * will not come back here, but will request the actual target page. + */ + state->readPagePtr = pageptr - targetPageOff; + state->reqLen = XLOG_BLCKSZ; + return true; } /* - * Now that we know we have the full header, validate it. + * Request the caller to load the page. We need at least a short page + * header so that we can validate it. */ - if (!XLogReaderValidatePageHeader(state, pageptr, (char *) hdr)) - goto err; - - /* update read state information */ - state->seg.ws_segno = targetSegNo; - state->segoff = targetPageOff; - state->readLen = readLen; - - return readLen; - -err: - XLogReaderInvalReadState(state); - return -1; + state->readPagePtr = pageptr; + state->reqLen = Max(reqLen + addLen, SizeOfXLogShortPHD); + return true; } /* @@ -673,9 +853,7 @@ err: static void XLogReaderInvalReadState(XLogReaderState *state) { - state->seg.ws_segno = 0; - state->segoff = 0; - state->readLen = 0; + state->readPagePtr = InvalidXLogRecPtr; } /* @@ -683,11 +861,12 @@ XLogReaderInvalReadState(XLogReaderState *state) * * This is just a convenience subroutine to avoid duplicated code in * XLogReadRecord. It's not intended for use from anywhere else. + * + * If PrevRecPtr is valid, the xl_prev is is cross-checked with it. */ static bool ValidXLogRecordHeader(XLogReaderState *state, XLogRecPtr RecPtr, - XLogRecPtr PrevRecPtr, XLogRecord *record, - bool randAccess) + XLogRecPtr PrevRecPtr, XLogRecord *record) { if (record->xl_tot_len < SizeOfXLogRecord) { @@ -704,7 +883,7 @@ ValidXLogRecordHeader(XLogReaderState *state, XLogRecPtr RecPtr, record->xl_rmid, LSN_FORMAT_ARGS(RecPtr)); return false; } - if (randAccess) + if (PrevRecPtr == InvalidXLogRecPtr) { /* * We can't exactly verify the prev-link, but surely it should be less @@ -922,6 +1101,22 @@ XLogReaderValidatePageHeader(XLogReaderState *state, XLogRecPtr recptr, * here. */ +XLogFindNextRecordState * +InitXLogFindNextRecord(XLogReaderState *reader_state, XLogRecPtr start_ptr) +{ + XLogFindNextRecordState *state = (XLogFindNextRecordState *) + palloc_extended(sizeof(XLogFindNextRecordState), + MCXT_ALLOC_NO_OOM | MCXT_ALLOC_ZERO); + if (!state) + return NULL; + + state->reader_state = reader_state; + state->targetRecPtr = start_ptr; + state->currRecPtr = start_ptr; + + return state; +} + /* * Find the first record with an lsn >= RecPtr. * @@ -933,27 +1128,25 @@ XLogReaderValidatePageHeader(XLogReaderState *state, XLogRecPtr recptr, * This positions the reader, like XLogBeginRead(), so that the next call to * XLogReadRecord() will read the next valid record. */ -XLogRecPtr -XLogFindNextRecord(XLogReaderState *state, XLogRecPtr RecPtr) +bool +XLogFindNextRecord(XLogFindNextRecordState *state) { - XLogRecPtr tmpRecPtr; - XLogRecPtr found = InvalidXLogRecPtr; XLogPageHeader header; + XLogRecord *record; + XLogReadRecordResult result; char *errormsg; - Assert(!XLogRecPtrIsInvalid(RecPtr)); + Assert(!XLogRecPtrIsInvalid(state->currRecPtr)); /* * skip over potential continuation data, keeping in mind that it may span * multiple pages */ - tmpRecPtr = RecPtr; while (true) { XLogRecPtr targetPagePtr; int targetRecOff; uint32 pageHeaderSize; - int readLen; /* * Compute targetRecOff. It should typically be equal or greater than @@ -961,27 +1154,27 @@ XLogFindNextRecord(XLogReaderState *state, XLogRecPtr RecPtr) * that, except when caller has explicitly specified the offset that * falls somewhere there or when we are skipping multi-page * continuation record. It doesn't matter though because - * ReadPageInternal() is prepared to handle that and will read at - * least short page-header worth of data + * XLogNeedData() is prepared to handle that and will read at least + * short page-header worth of data */ - targetRecOff = tmpRecPtr % XLOG_BLCKSZ; + targetRecOff = state->currRecPtr % XLOG_BLCKSZ; /* scroll back to page boundary */ - targetPagePtr = tmpRecPtr - targetRecOff; + targetPagePtr = state->currRecPtr - targetRecOff; - /* Read the page containing the record */ - readLen = ReadPageInternal(state, targetPagePtr, targetRecOff); - if (readLen < 0) + if (XLogNeedData(state->reader_state, targetPagePtr, targetRecOff, + targetRecOff != 0)) + return true; + + if (!state->reader_state->page_verified) goto err; - header = (XLogPageHeader) state->readBuf; + header = (XLogPageHeader) state->reader_state->readBuf; pageHeaderSize = XLogPageHeaderSize(header); - /* make sure we have enough data for the page header */ - readLen = ReadPageInternal(state, targetPagePtr, pageHeaderSize); - if (readLen < 0) - goto err; + /* we should have read the page header */ + Assert(state->reader_state->readLen >= pageHeaderSize); /* skip over potential continuation data */ if (header->xlp_info & XLP_FIRST_IS_CONTRECORD) @@ -996,21 +1189,21 @@ XLogFindNextRecord(XLogReaderState *state, XLogRecPtr RecPtr) * Note that record headers are MAXALIGN'ed */ if (MAXALIGN(header->xlp_rem_len) >= (XLOG_BLCKSZ - pageHeaderSize)) - tmpRecPtr = targetPagePtr + XLOG_BLCKSZ; + state->currRecPtr = targetPagePtr + XLOG_BLCKSZ; else { /* * The previous continuation record ends in this page. Set - * tmpRecPtr to point to the first valid record + * state->currRecPtr to point to the first valid record */ - tmpRecPtr = targetPagePtr + pageHeaderSize + state->currRecPtr = targetPagePtr + pageHeaderSize + MAXALIGN(header->xlp_rem_len); break; } } else { - tmpRecPtr = targetPagePtr + pageHeaderSize; + state->currRecPtr = targetPagePtr + pageHeaderSize; break; } } @@ -1020,31 +1213,36 @@ XLogFindNextRecord(XLogReaderState *state, XLogRecPtr RecPtr) * because either we're at the first record after the beginning of a page * or we just jumped over the remaining data of a continuation. */ - XLogBeginRead(state, tmpRecPtr); - while (XLogReadRecord(state, &errormsg) != NULL) + XLogBeginRead(state->reader_state, state->currRecPtr); + while ((result = XLogReadRecord(state->reader_state, &record, &errormsg)) != + XLREAD_FAIL) { + if (result == XLREAD_NEED_DATA) + return true; + /* past the record we've found, break out */ - if (RecPtr <= state->ReadRecPtr) + if (state->targetRecPtr <= state->reader_state->ReadRecPtr) { /* Rewind the reader to the beginning of the last record. */ - found = state->ReadRecPtr; - XLogBeginRead(state, found); - return found; + state->currRecPtr = state->reader_state->ReadRecPtr; + XLogBeginRead(state->reader_state, state->currRecPtr); + return false; } } err: - XLogReaderInvalReadState(state); + XLogReaderInvalReadState(state->reader_state); - return InvalidXLogRecPtr; + state->currRecPtr = InvalidXLogRecPtr;; + return false; } #endif /* FRONTEND */ /* - * Helper function to ease writing of XLogRoutine->page_read callbacks. - * If this function is used, caller must supply a segment_open callback in - * 'state', as that is used here. + * Helper function to ease writing of routines that read raw WAL data. + * If this function is used, caller must supply a segment_open callback and + * segment_close callback as that is used here. * * Read 'count' bytes into 'buf', starting at location 'startptr', from WAL * fetched from timeline 'tli'. @@ -1057,6 +1255,7 @@ err: */ bool WALRead(XLogReaderState *state, + WALSegmentOpenCB segopenfn, WALSegmentCloseCB segclosefn, char *buf, XLogRecPtr startptr, Size count, TimeLineID tli, WALReadError *errinfo) { @@ -1088,10 +1287,10 @@ WALRead(XLogReaderState *state, XLogSegNo nextSegNo; if (state->seg.ws_file >= 0) - state->routine.segment_close(state); + segclosefn(state); XLByteToSeg(recptr, nextSegNo, state->segcxt.ws_segsize); - state->routine.segment_open(state, nextSegNo, &tli); + segopenfn(state, nextSegNo, &tli); /* This shouldn't happen -- indicates a bug in segment_open */ Assert(state->seg.ws_file >= 0); diff --git a/src/backend/access/transam/xlogutils.c b/src/backend/access/transam/xlogutils.c index d17d660f460..e5de26dce54 100644 --- a/src/backend/access/transam/xlogutils.c +++ b/src/backend/access/transam/xlogutils.c @@ -686,8 +686,7 @@ XLogTruncateRelation(RelFileNode rnode, ForkNumber forkNum, void XLogReadDetermineTimeline(XLogReaderState *state, XLogRecPtr wantPage, uint32 wantLength) { - const XLogRecPtr lastReadPage = (state->seg.ws_segno * - state->segcxt.ws_segsize + state->segoff); + const XLogRecPtr lastReadPage = state->readPagePtr; Assert(wantPage != InvalidXLogRecPtr && wantPage % XLOG_BLCKSZ == 0); Assert(wantLength <= XLOG_BLCKSZ); @@ -702,7 +701,7 @@ XLogReadDetermineTimeline(XLogReaderState *state, XLogRecPtr wantPage, uint32 wa * current TLI has since become historical. */ if (lastReadPage == wantPage && - state->readLen != 0 && + state->page_verified && lastReadPage + state->readLen >= wantPage + Min(wantLength, XLOG_BLCKSZ - 1)) return; @@ -824,10 +823,12 @@ wal_segment_close(XLogReaderState *state) * exists for normal backends, so we have to do a check/sleep/repeat style of * loop for now. */ -int -read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, - int reqLen, XLogRecPtr targetRecPtr, char *cur_page) +bool +read_local_xlog_page(XLogReaderState *state) { + XLogRecPtr targetPagePtr = state->readPagePtr; + int reqLen = state->reqLen; + char *cur_page = state->readBuf; XLogRecPtr read_upto, loc; TimeLineID tli; @@ -926,7 +927,8 @@ read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, else if (targetPagePtr + reqLen > read_upto) { /* not enough data there */ - return -1; + XLogReaderSetInputData(state, -1); + return false; } else { @@ -939,12 +941,14 @@ read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, * as 'count', read the whole page anyway. It's guaranteed to be * zero-padded up to the page boundary if it's incomplete. */ - if (!WALRead(state, cur_page, targetPagePtr, XLOG_BLCKSZ, tli, - &errinfo)) + if (!WALRead(state, wal_segment_open, wal_segment_close, + cur_page, targetPagePtr, XLOG_BLCKSZ, tli, &errinfo)) WALReadRaiseError(&errinfo); /* number of valid bytes in the buffer */ - return count; + state->readPagePtr = targetPagePtr; + XLogReaderSetInputData(state, count); + return true; } /* |