diff options
Diffstat (limited to 'src/backend/replication/walsender.c')
-rw-r--r-- | src/backend/replication/walsender.c | 42 |
1 files changed, 24 insertions, 18 deletions
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index 4bf8a18e01e..52fe9aba660 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -580,10 +580,7 @@ StartReplication(StartReplicationCmd *cmd) /* create xlogreader for physical replication */ xlogreader = - XLogReaderAllocate(wal_segment_size, NULL, - XL_ROUTINE(.segment_open = WalSndSegmentOpen, - .segment_close = wal_segment_close), - NULL); + XLogReaderAllocate(wal_segment_size, NULL, wal_segment_close); if (!xlogreader) ereport(ERROR, @@ -806,10 +803,12 @@ StartReplication(StartReplicationCmd *cmd) * which has to do a plain sleep/busy loop, because the walsender's latch gets * set every time WAL is flushed. */ -static int -logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen, - XLogRecPtr targetRecPtr, char *cur_page) +static bool +logical_read_xlog_page(XLogReaderState *state) { + XLogRecPtr targetPagePtr = state->readPagePtr; + int reqLen = state->reqLen; + char *cur_page = state->readBuf; XLogRecPtr flushptr; int count; WALReadError errinfo; @@ -826,7 +825,10 @@ logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int req /* fail if not (implies we are going to shut down) */ if (flushptr < targetPagePtr + reqLen) - return -1; + { + XLogReaderSetInputData(state, -1); + return false; + } if (targetPagePtr + XLOG_BLCKSZ <= flushptr) count = XLOG_BLCKSZ; /* more than one block available */ @@ -834,7 +836,7 @@ logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int req count = flushptr - targetPagePtr; /* part of the page available */ /* now actually read the data, we know it's there */ - if (!WALRead(state, + if (!WALRead(state, WalSndSegmentOpen, wal_segment_close, cur_page, targetPagePtr, XLOG_BLCKSZ, @@ -854,7 +856,8 @@ logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int req XLByteToSeg(targetPagePtr, segno, state->segcxt.ws_segsize); CheckXLogRemoved(segno, state->seg.ws_tli); - return count; + XLogReaderSetInputData(state, count); + return true; } /* @@ -1007,9 +1010,8 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd) ctx = CreateInitDecodingContext(cmd->plugin, NIL, need_full_snapshot, InvalidXLogRecPtr, - XL_ROUTINE(.page_read = logical_read_xlog_page, - .segment_open = WalSndSegmentOpen, - .segment_close = wal_segment_close), + logical_read_xlog_page, + wal_segment_close, WalSndPrepareWrite, WalSndWriteData, WalSndUpdateProgress); @@ -1167,9 +1169,8 @@ StartLogicalReplication(StartReplicationCmd *cmd) */ logical_decoding_ctx = CreateDecodingContext(cmd->startpoint, cmd->options, false, - XL_ROUTINE(.page_read = logical_read_xlog_page, - .segment_open = WalSndSegmentOpen, - .segment_close = wal_segment_close), + logical_read_xlog_page, + wal_segment_close, WalSndPrepareWrite, WalSndWriteData, WalSndUpdateProgress); xlogreader = logical_decoding_ctx->reader; @@ -2745,7 +2746,7 @@ XLogSendPhysical(void) enlargeStringInfo(&output_message, nbytes); retry: - if (!WALRead(xlogreader, + if (!WALRead(xlogreader, WalSndSegmentOpen, wal_segment_close, &output_message.data[output_message.len], startptr, nbytes, @@ -2843,7 +2844,12 @@ XLogSendLogical(void) */ WalSndCaughtUp = false; - record = XLogReadRecord(logical_decoding_ctx->reader, &errm); + while (XLogReadRecord(logical_decoding_ctx->reader, &record, &errm) == + XLREAD_NEED_DATA) + { + if (!logical_decoding_ctx->page_read(logical_decoding_ctx->reader)) + break; + } /* xlog record was invalid */ if (errm != NULL) |