summaryrefslogtreecommitdiff
path: root/src/backend/replication/logical
diff options
context:
space:
mode:
authorThomas Munro2021-04-08 11:03:23 +0000
committerThomas Munro2021-04-08 11:20:42 +0000
commit323cbe7c7ddcf18aaf24b7f6d682a45a61d4e31b (patch)
tree5290af3834511b9bd1773841b1068e485ba52fe6 /src/backend/replication/logical
parent5ac9c4307337313bedeafc21dbbab93ba809241c (diff)
Remove read_page callback from XLogReader.
Previously, the XLogReader module would fetch new input data using a callback function. Redesign the interface so that it tells the caller to insert more data with a special return value instead. This API suits later patches for prefetching, encryption and maybe other future projects that would otherwise require continually extending the callback interface. As incidental cleanup work, move global variables readOff, readLen and readSegNo inside XlogReaderState. Author: Kyotaro HORIGUCHI <[email protected]> Author: Heikki Linnakangas <[email protected]> (parts of earlier version) Reviewed-by: Antonin Houska <[email protected]> Reviewed-by: Alvaro Herrera <[email protected]> Reviewed-by: Takashi Menjo <[email protected]> Reviewed-by: Andres Freund <[email protected]> Reviewed-by: Thomas Munro <[email protected]> Discussion: https://postgr.es/m/20190418.210257.43726183.horiguchi.kyotaro%40lab.ntt.co.jp
Diffstat (limited to 'src/backend/replication/logical')
-rw-r--r--src/backend/replication/logical/logical.c26
-rw-r--r--src/backend/replication/logical/logicalfuncs.c13
2 files changed, 27 insertions, 12 deletions
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index 2f6803637bf..4f6e87f18d3 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -148,7 +148,8 @@ StartupDecodingContext(List *output_plugin_options,
TransactionId xmin_horizon,
bool need_full_snapshot,
bool fast_forward,
- XLogReaderRoutine *xl_routine,
+ LogicalDecodingXLogPageReadCB page_read,
+ WALSegmentCleanupCB cleanup_cb,
LogicalOutputPluginWriterPrepareWrite prepare_write,
LogicalOutputPluginWriterWrite do_write,
LogicalOutputPluginWriterUpdateProgress update_progress)
@@ -198,11 +199,12 @@ StartupDecodingContext(List *output_plugin_options,
ctx->slot = slot;
- ctx->reader = XLogReaderAllocate(wal_segment_size, NULL, xl_routine, ctx);
+ ctx->reader = XLogReaderAllocate(wal_segment_size, NULL, cleanup_cb);
if (!ctx->reader)
ereport(ERROR,
(errcode(ERRCODE_OUT_OF_MEMORY),
errmsg("out of memory")));
+ ctx->page_read = page_read;
ctx->reorder = ReorderBufferAllocate();
ctx->snapshot_builder =
@@ -319,7 +321,8 @@ CreateInitDecodingContext(const char *plugin,
List *output_plugin_options,
bool need_full_snapshot,
XLogRecPtr restart_lsn,
- XLogReaderRoutine *xl_routine,
+ LogicalDecodingXLogPageReadCB page_read,
+ WALSegmentCleanupCB cleanup_cb,
LogicalOutputPluginWriterPrepareWrite prepare_write,
LogicalOutputPluginWriterWrite do_write,
LogicalOutputPluginWriterUpdateProgress update_progress)
@@ -422,7 +425,7 @@ CreateInitDecodingContext(const char *plugin,
ctx = StartupDecodingContext(NIL, restart_lsn, xmin_horizon,
need_full_snapshot, false,
- xl_routine, prepare_write, do_write,
+ page_read, cleanup_cb, prepare_write, do_write,
update_progress);
/* call output plugin initialization callback */
@@ -476,7 +479,8 @@ LogicalDecodingContext *
CreateDecodingContext(XLogRecPtr start_lsn,
List *output_plugin_options,
bool fast_forward,
- XLogReaderRoutine *xl_routine,
+ LogicalDecodingXLogPageReadCB page_read,
+ WALSegmentCleanupCB cleanup_cb,
LogicalOutputPluginWriterPrepareWrite prepare_write,
LogicalOutputPluginWriterWrite do_write,
LogicalOutputPluginWriterUpdateProgress update_progress)
@@ -528,8 +532,8 @@ CreateDecodingContext(XLogRecPtr start_lsn,
ctx = StartupDecodingContext(output_plugin_options,
start_lsn, InvalidTransactionId, false,
- fast_forward, xl_routine, prepare_write,
- do_write, update_progress);
+ fast_forward, page_read, cleanup_cb,
+ prepare_write, do_write, update_progress);
/* call output plugin initialization callback */
old_context = MemoryContextSwitchTo(ctx->context);
@@ -585,7 +589,13 @@ DecodingContextFindStartpoint(LogicalDecodingContext *ctx)
char *err = NULL;
/* the read_page callback waits for new WAL */
- record = XLogReadRecord(ctx->reader, &err);
+ while (XLogReadRecord(ctx->reader, &record, &err) ==
+ XLREAD_NEED_DATA)
+ {
+ if (!ctx->page_read(ctx->reader))
+ break;
+ }
+
if (err)
elog(ERROR, "%s", err);
if (!record)
diff --git a/src/backend/replication/logical/logicalfuncs.c b/src/backend/replication/logical/logicalfuncs.c
index 01d354829b9..8f8c129620f 100644
--- a/src/backend/replication/logical/logicalfuncs.c
+++ b/src/backend/replication/logical/logicalfuncs.c
@@ -233,9 +233,8 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin
ctx = CreateDecodingContext(InvalidXLogRecPtr,
options,
false,
- XL_ROUTINE(.page_read = read_local_xlog_page,
- .segment_open = wal_segment_open,
- .segment_close = wal_segment_close),
+ read_local_xlog_page,
+ wal_segment_close,
LogicalOutputPrepareWrite,
LogicalOutputWrite, NULL);
@@ -284,7 +283,13 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin
XLogRecord *record;
char *errm = NULL;
- record = XLogReadRecord(ctx->reader, &errm);
+ while (XLogReadRecord(ctx->reader, &record, &errm) ==
+ XLREAD_NEED_DATA)
+ {
+ if (!ctx->page_read(ctx->reader))
+ break;
+ }
+
if (errm)
elog(ERROR, "%s", errm);