summaryrefslogtreecommitdiff
path: root/src/backend/replication/walsender.c
diff options
context:
space:
mode:
authorThomas Munro2021-04-08 11:03:23 +0000
committerThomas Munro2021-04-08 11:20:42 +0000
commit323cbe7c7ddcf18aaf24b7f6d682a45a61d4e31b (patch)
tree5290af3834511b9bd1773841b1068e485ba52fe6 /src/backend/replication/walsender.c
parent5ac9c4307337313bedeafc21dbbab93ba809241c (diff)
Remove read_page callback from XLogReader.
Previously, the XLogReader module would fetch new input data using a callback function. Redesign the interface so that it tells the caller to insert more data with a special return value instead. This API suits later patches for prefetching, encryption and maybe other future projects that would otherwise require continually extending the callback interface. As incidental cleanup work, move global variables readOff, readLen and readSegNo inside XlogReaderState. Author: Kyotaro HORIGUCHI <[email protected]> Author: Heikki Linnakangas <[email protected]> (parts of earlier version) Reviewed-by: Antonin Houska <[email protected]> Reviewed-by: Alvaro Herrera <[email protected]> Reviewed-by: Takashi Menjo <[email protected]> Reviewed-by: Andres Freund <[email protected]> Reviewed-by: Thomas Munro <[email protected]> Discussion: https://postgr.es/m/20190418.210257.43726183.horiguchi.kyotaro%40lab.ntt.co.jp
Diffstat (limited to 'src/backend/replication/walsender.c')
-rw-r--r--src/backend/replication/walsender.c42
1 files changed, 24 insertions, 18 deletions
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 4bf8a18e01e..52fe9aba660 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -580,10 +580,7 @@ StartReplication(StartReplicationCmd *cmd)
/* create xlogreader for physical replication */
xlogreader =
- XLogReaderAllocate(wal_segment_size, NULL,
- XL_ROUTINE(.segment_open = WalSndSegmentOpen,
- .segment_close = wal_segment_close),
- NULL);
+ XLogReaderAllocate(wal_segment_size, NULL, wal_segment_close);
if (!xlogreader)
ereport(ERROR,
@@ -806,10 +803,12 @@ StartReplication(StartReplicationCmd *cmd)
* which has to do a plain sleep/busy loop, because the walsender's latch gets
* set every time WAL is flushed.
*/
-static int
-logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen,
- XLogRecPtr targetRecPtr, char *cur_page)
+static bool
+logical_read_xlog_page(XLogReaderState *state)
{
+ XLogRecPtr targetPagePtr = state->readPagePtr;
+ int reqLen = state->reqLen;
+ char *cur_page = state->readBuf;
XLogRecPtr flushptr;
int count;
WALReadError errinfo;
@@ -826,7 +825,10 @@ logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int req
/* fail if not (implies we are going to shut down) */
if (flushptr < targetPagePtr + reqLen)
- return -1;
+ {
+ XLogReaderSetInputData(state, -1);
+ return false;
+ }
if (targetPagePtr + XLOG_BLCKSZ <= flushptr)
count = XLOG_BLCKSZ; /* more than one block available */
@@ -834,7 +836,7 @@ logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int req
count = flushptr - targetPagePtr; /* part of the page available */
/* now actually read the data, we know it's there */
- if (!WALRead(state,
+ if (!WALRead(state, WalSndSegmentOpen, wal_segment_close,
cur_page,
targetPagePtr,
XLOG_BLCKSZ,
@@ -854,7 +856,8 @@ logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int req
XLByteToSeg(targetPagePtr, segno, state->segcxt.ws_segsize);
CheckXLogRemoved(segno, state->seg.ws_tli);
- return count;
+ XLogReaderSetInputData(state, count);
+ return true;
}
/*
@@ -1007,9 +1010,8 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
ctx = CreateInitDecodingContext(cmd->plugin, NIL, need_full_snapshot,
InvalidXLogRecPtr,
- XL_ROUTINE(.page_read = logical_read_xlog_page,
- .segment_open = WalSndSegmentOpen,
- .segment_close = wal_segment_close),
+ logical_read_xlog_page,
+ wal_segment_close,
WalSndPrepareWrite, WalSndWriteData,
WalSndUpdateProgress);
@@ -1167,9 +1169,8 @@ StartLogicalReplication(StartReplicationCmd *cmd)
*/
logical_decoding_ctx =
CreateDecodingContext(cmd->startpoint, cmd->options, false,
- XL_ROUTINE(.page_read = logical_read_xlog_page,
- .segment_open = WalSndSegmentOpen,
- .segment_close = wal_segment_close),
+ logical_read_xlog_page,
+ wal_segment_close,
WalSndPrepareWrite, WalSndWriteData,
WalSndUpdateProgress);
xlogreader = logical_decoding_ctx->reader;
@@ -2745,7 +2746,7 @@ XLogSendPhysical(void)
enlargeStringInfo(&output_message, nbytes);
retry:
- if (!WALRead(xlogreader,
+ if (!WALRead(xlogreader, WalSndSegmentOpen, wal_segment_close,
&output_message.data[output_message.len],
startptr,
nbytes,
@@ -2843,7 +2844,12 @@ XLogSendLogical(void)
*/
WalSndCaughtUp = false;
- record = XLogReadRecord(logical_decoding_ctx->reader, &errm);
+ while (XLogReadRecord(logical_decoding_ctx->reader, &record, &errm) ==
+ XLREAD_NEED_DATA)
+ {
+ if (!logical_decoding_ctx->page_read(logical_decoding_ctx->reader))
+ break;
+ }
/* xlog record was invalid */
if (errm != NULL)