diff options
author | Amit Kapila | 2020-08-08 02:04:39 +0000 |
---|---|---|
committer | Amit Kapila | 2020-08-08 02:17:06 +0000 |
commit | 7259736a6e5b7c7588fff9578370736a6648acbb (patch) | |
tree | a2261d4ed09124a00d9ed8c0082f22256364aa77 /contrib/test_decoding | |
parent | 0a7d771f0f63eb120e7f0a60aecd543ab25ba197 (diff) |
Implement streaming mode in ReorderBuffer.
Instead of serializing the transaction to disk after reaching the
logical_decoding_work_mem limit in memory, we consume the changes we have
in memory and invoke stream API methods added by commit 45fdc9738b.
However, sometimes if we have incomplete toast or speculative insert we
spill to the disk because we can't generate the complete tuple and stream.
And, as soon as we get the complete tuple we stream the transaction
including the serialized changes.
We can do this incremental processing thanks to having assignments
(associating subxact with toplevel xacts) in WAL right away, and
thanks to logging the invalidation messages at each command end. These
features are added by commits 0bead9af48 and c55040ccd0 respectively.
Now that we can stream in-progress transactions, the concurrent aborts
may cause failures when the output plugin consults catalogs (both system
and user-defined).
We handle such failures by returning ERRCODE_TRANSACTION_ROLLBACK
sqlerrcode from system table scan APIs to the backend or WALSender
decoding a specific uncommitted transaction. The decoding logic on the
receipt of such a sqlerrcode aborts the decoding of the current
transaction and continue with the decoding of other transactions.
We have ReorderBufferTXN pointer in each ReorderBufferChange by which we
know which xact it belongs to. The output plugin can use this to decide
which changes to discard in case of stream_abort_cb (e.g. when a subxact
gets discarded).
We also provide a new option via SQL APIs to fetch the changes being
streamed.
Author: Dilip Kumar, Tomas Vondra, Amit Kapila, Nikhil Sontakke
Reviewed-by: Amit Kapila, Kuntal Ghosh, Ajin Cherian
Tested-by: Neha Sharma, Mahendra Singh Thalor and Ajin Cherian
Discussion: https://postgr.es/m/[email protected]
Diffstat (limited to 'contrib/test_decoding')
-rw-r--r-- | contrib/test_decoding/Makefile | 2 | ||||
-rw-r--r-- | contrib/test_decoding/expected/stream.out | 94 | ||||
-rw-r--r-- | contrib/test_decoding/expected/truncate.out | 6 | ||||
-rw-r--r-- | contrib/test_decoding/sql/stream.sql | 30 | ||||
-rw-r--r-- | contrib/test_decoding/sql/truncate.sql | 1 | ||||
-rw-r--r-- | contrib/test_decoding/test_decoding.c | 13 |
6 files changed, 145 insertions, 1 deletions
diff --git a/contrib/test_decoding/Makefile b/contrib/test_decoding/Makefile index f439c582a5f..ed9a3d6c0ed 100644 --- a/contrib/test_decoding/Makefile +++ b/contrib/test_decoding/Makefile @@ -5,7 +5,7 @@ PGFILEDESC = "test_decoding - example of a logical decoding output plugin" REGRESS = ddl xact rewrite toast permissions decoding_in_xact \ decoding_into_rel binary prepared replorigin time messages \ - spill slot truncate + spill slot truncate stream ISOLATION = mxact delayed_startup ondisk_startup concurrent_ddl_dml \ oldest_xmin snapshot_transfer subxact_without_top diff --git a/contrib/test_decoding/expected/stream.out b/contrib/test_decoding/expected/stream.out new file mode 100644 index 00000000000..9a5d7e7c439 --- /dev/null +++ b/contrib/test_decoding/expected/stream.out @@ -0,0 +1,94 @@ +SET synchronous_commit = on; +SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding'); + ?column? +---------- + init +(1 row) + +CREATE TABLE stream_test(data text); +-- consume DDL +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); + data +------ +(0 rows) + +-- streaming test with sub-transaction +BEGIN; +savepoint s1; +SELECT 'msg5' FROM pg_logical_emit_message(true, 'test', repeat('a', 50)); + ?column? +---------- + msg5 +(1 row) + +INSERT INTO stream_test SELECT repeat('a', 2000) || g.i FROM generate_series(1, 35) g(i); +TRUNCATE table stream_test; +rollback to s1; +INSERT INTO stream_test SELECT repeat('a', 10) || g.i FROM generate_series(1, 20) g(i); +COMMIT; +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'include-xids', '0', 'stream-changes', '1'); + data +---------------------------------------------------------- + opening a streamed block for transaction + streaming message: transactional: 1 prefix: test, sz: 50 + closing a streamed block for transaction + aborting streamed (sub)transaction + opening a streamed block for transaction + streaming change for transaction + streaming change for transaction + streaming change for transaction + streaming change for transaction + streaming change for transaction + streaming change for transaction + streaming change for transaction + streaming change for transaction + streaming change for transaction + streaming change for transaction + streaming change for transaction + streaming change for transaction + streaming change for transaction + streaming change for transaction + streaming change for transaction + streaming change for transaction + streaming change for transaction + streaming change for transaction + streaming change for transaction + streaming change for transaction + closing a streamed block for transaction + committing streamed transaction +(27 rows) + +-- streaming test for toast changes +ALTER TABLE stream_test ALTER COLUMN data set storage external; +-- consume DDL +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); + data +------ +(0 rows) + +INSERT INTO stream_test SELECT repeat('a', 6000) || g.i FROM generate_series(1, 10) g(i); +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'include-xids', '0', 'stream-changes', '1'); + data +------------------------------------------ + opening a streamed block for transaction + streaming change for transaction + streaming change for transaction + streaming change for transaction + streaming change for transaction + streaming change for transaction + streaming change for transaction + streaming change for transaction + streaming change for transaction + streaming change for transaction + streaming change for transaction + closing a streamed block for transaction + committing streamed transaction +(13 rows) + +DROP TABLE stream_test; +SELECT pg_drop_replication_slot('regression_slot'); + pg_drop_replication_slot +-------------------------- + +(1 row) + diff --git a/contrib/test_decoding/expected/truncate.out b/contrib/test_decoding/expected/truncate.out index 1cf2ae835c8..e64d377214a 100644 --- a/contrib/test_decoding/expected/truncate.out +++ b/contrib/test_decoding/expected/truncate.out @@ -25,3 +25,9 @@ SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'inc COMMIT (9 rows) +SELECT pg_drop_replication_slot('regression_slot'); + pg_drop_replication_slot +-------------------------- + +(1 row) + diff --git a/contrib/test_decoding/sql/stream.sql b/contrib/test_decoding/sql/stream.sql new file mode 100644 index 00000000000..8abc30de0af --- /dev/null +++ b/contrib/test_decoding/sql/stream.sql @@ -0,0 +1,30 @@ +SET synchronous_commit = on; +SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding'); + +CREATE TABLE stream_test(data text); + +-- consume DDL +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); + +-- streaming test with sub-transaction +BEGIN; +savepoint s1; +SELECT 'msg5' FROM pg_logical_emit_message(true, 'test', repeat('a', 50)); +INSERT INTO stream_test SELECT repeat('a', 2000) || g.i FROM generate_series(1, 35) g(i); +TRUNCATE table stream_test; +rollback to s1; +INSERT INTO stream_test SELECT repeat('a', 10) || g.i FROM generate_series(1, 20) g(i); +COMMIT; + +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'include-xids', '0', 'stream-changes', '1'); + +-- streaming test for toast changes +ALTER TABLE stream_test ALTER COLUMN data set storage external; +-- consume DDL +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); + +INSERT INTO stream_test SELECT repeat('a', 6000) || g.i FROM generate_series(1, 10) g(i); +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'include-xids', '0', 'stream-changes', '1'); + +DROP TABLE stream_test; +SELECT pg_drop_replication_slot('regression_slot'); diff --git a/contrib/test_decoding/sql/truncate.sql b/contrib/test_decoding/sql/truncate.sql index 5aecdf0881f..5633854e0df 100644 --- a/contrib/test_decoding/sql/truncate.sql +++ b/contrib/test_decoding/sql/truncate.sql @@ -11,3 +11,4 @@ TRUNCATE tab1, tab1 RESTART IDENTITY CASCADE; TRUNCATE tab1, tab2; SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); +SELECT pg_drop_replication_slot('regression_slot'); diff --git a/contrib/test_decoding/test_decoding.c b/contrib/test_decoding/test_decoding.c index dbef52a3af4..34745150e9b 100644 --- a/contrib/test_decoding/test_decoding.c +++ b/contrib/test_decoding/test_decoding.c @@ -122,6 +122,7 @@ pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt, { ListCell *option; TestDecodingData *data; + bool enable_streaming = false; data = palloc0(sizeof(TestDecodingData)); data->context = AllocSetContextCreate(ctx->context, @@ -212,6 +213,16 @@ pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt, errmsg("could not parse value \"%s\" for parameter \"%s\"", strVal(elem->arg), elem->defname))); } + else if (strcmp(elem->defname, "stream-changes") == 0) + { + if (elem->arg == NULL) + continue; + else if (!parse_bool(strVal(elem->arg), &enable_streaming)) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("could not parse value \"%s\" for parameter \"%s\"", + strVal(elem->arg), elem->defname))); + } else { ereport(ERROR, @@ -221,6 +232,8 @@ pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt, elem->arg ? strVal(elem->arg) : "(null)"))); } } + + ctx->streaming &= enable_streaming; } /* cleanup this plugin's resources */ |