summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorEtsuro Fujita2021-03-31 09:45:00 +0000
committerEtsuro Fujita2021-03-31 09:45:00 +0000
commit27e1f14563cf982f1f4d71e21ef247866662a052 (patch)
tree4e1a17a61abbfc67a471760cc84e1e46182bfb9d /src
parent66392d396508c91c2ec07a61568bf96acb663ad8 (diff)
Add support for asynchronous execution.
This implements asynchronous execution, which runs multiple parts of a non-parallel-aware Append concurrently rather than serially to improve performance when possible. Currently, the only node type that can be run concurrently is a ForeignScan that is an immediate child of such an Append. In the case where such ForeignScans access data on different remote servers, this would run those ForeignScans concurrently, and overlap the remote operations to be performed simultaneously, so it'll improve the performance especially when the operations involve time-consuming ones such as remote join and remote aggregation. We may extend this to other node types such as joins or aggregates over ForeignScans in the future. This also adds the support for postgres_fdw, which is enabled by the table-level/server-level option "async_capable". The default is false. Robert Haas, Kyotaro Horiguchi, Thomas Munro, and myself. This commit is mostly based on the patch proposed by Robert Haas, but also uses stuff from the patch proposed by Kyotaro Horiguchi and from the patch proposed by Thomas Munro. Reviewed by Kyotaro Horiguchi, Konstantin Knizhnik, Andrey Lepikhov, Movead Li, Thomas Munro, Justin Pryzby, and others. Discussion: https://postgr.es/m/CA%2BTgmoaXQEt4tZ03FtQhnzeDEMzBck%2BLrni0UWHVVgOTnA6C1w%40mail.gmail.com Discussion: https://postgr.es/m/CA%2BhUKGLBRyu0rHrDCMC4%3DRn3252gogyp1SjOgG8SEKKZv%3DFwfQ%40mail.gmail.com Discussion: https://postgr.es/m/20200228.170650.667613673625155850.horikyota.ntt%40gmail.com
Diffstat (limited to 'src')
-rw-r--r--src/backend/commands/explain.c3
-rw-r--r--src/backend/executor/Makefile1
-rw-r--r--src/backend/executor/README40
-rw-r--r--src/backend/executor/execAmi.c4
-rw-r--r--src/backend/executor/execAsync.c124
-rw-r--r--src/backend/executor/nodeAppend.c461
-rw-r--r--src/backend/executor/nodeForeignscan.c48
-rw-r--r--src/backend/nodes/copyfuncs.c2
-rw-r--r--src/backend/nodes/outfuncs.c2
-rw-r--r--src/backend/nodes/readfuncs.c2
-rw-r--r--src/backend/optimizer/path/costsize.c1
-rw-r--r--src/backend/optimizer/plan/createplan.c41
-rw-r--r--src/backend/postmaster/pgstat.c3
-rw-r--r--src/backend/storage/ipc/latch.c9
-rw-r--r--src/backend/utils/misc/guc.c10
-rw-r--r--src/backend/utils/misc/postgresql.conf.sample1
-rw-r--r--src/include/executor/execAsync.h25
-rw-r--r--src/include/executor/nodeAppend.h2
-rw-r--r--src/include/executor/nodeForeignscan.h4
-rw-r--r--src/include/foreign/fdwapi.h14
-rw-r--r--src/include/nodes/execnodes.h37
-rw-r--r--src/include/nodes/plannodes.h6
-rw-r--r--src/include/optimizer/cost.h1
-rw-r--r--src/include/pgstat.h3
-rw-r--r--src/include/storage/latch.h1
-rw-r--r--src/test/regress/expected/explain.out7
-rw-r--r--src/test/regress/expected/incremental_sort.out2
-rw-r--r--src/test/regress/expected/insert_conflict.out4
-rw-r--r--src/test/regress/expected/sysviews.out3
29 files changed, 841 insertions, 20 deletions
diff --git a/src/backend/commands/explain.c b/src/backend/commands/explain.c
index afc45429ba4..fe75cabdcc0 100644
--- a/src/backend/commands/explain.c
+++ b/src/backend/commands/explain.c
@@ -1394,6 +1394,8 @@ ExplainNode(PlanState *planstate, List *ancestors,
}
if (plan->parallel_aware)
appendStringInfoString(es->str, "Parallel ");
+ if (plan->async_capable)
+ appendStringInfoString(es->str, "Async ");
appendStringInfoString(es->str, pname);
es->indent++;
}
@@ -1413,6 +1415,7 @@ ExplainNode(PlanState *planstate, List *ancestors,
if (custom_name)
ExplainPropertyText("Custom Plan Provider", custom_name, es);
ExplainPropertyBool("Parallel Aware", plan->parallel_aware, es);
+ ExplainPropertyBool("Async Capable", plan->async_capable, es);
}
switch (nodeTag(plan))
diff --git a/src/backend/executor/Makefile b/src/backend/executor/Makefile
index 74ac59faa13..680fd69151b 100644
--- a/src/backend/executor/Makefile
+++ b/src/backend/executor/Makefile
@@ -14,6 +14,7 @@ include $(top_builddir)/src/Makefile.global
OBJS = \
execAmi.o \
+ execAsync.o \
execCurrent.o \
execExpr.o \
execExprInterp.o \
diff --git a/src/backend/executor/README b/src/backend/executor/README
index 18b2ac18659..3726048c4a7 100644
--- a/src/backend/executor/README
+++ b/src/backend/executor/README
@@ -359,3 +359,43 @@ query returning the same set of scan tuples multiple times. Likewise,
SRFs are disallowed in an UPDATE's targetlist. There, they would have the
effect of the same row being updated multiple times, which is not very
useful --- and updates after the first would have no effect anyway.
+
+
+Asynchronous Execution
+----------------------
+
+In cases where a node is waiting on an event external to the database system,
+such as a ForeignScan awaiting network I/O, it's desirable for the node to
+indicate that it cannot return any tuple immediately but may be able to do so
+at a later time. A process which discovers this type of situation can always
+handle it simply by blocking, but this may waste time that could be spent
+executing some other part of the plan tree where progress could be made
+immediately. This is particularly likely to occur when the plan tree contains
+an Append node. Asynchronous execution runs multiple parts of an Append node
+concurrently rather than serially to improve performance.
+
+For asynchronous execution, an Append node must first request a tuple from an
+async-capable child node using ExecAsyncRequest. Next, it must execute the
+asynchronous event loop using ExecAppendAsyncEventWait. Eventually, when a
+child node to which an asynchronous request has been made produces a tuple,
+the Append node will receive it from the event loop via ExecAsyncResponse. In
+the current implementation of asynchronous execution, the only node type that
+requests tuples from an async-capable child node is an Append, while the only
+node type that might be async-capable is a ForeignScan.
+
+Typically, the ExecAsyncResponse callback is the only one required for nodes
+that wish to request tuples asynchronously. On the other hand, async-capable
+nodes generally need to implement three methods:
+
+1. When an asynchronous request is made, the node's ExecAsyncRequest callback
+ will be invoked; it should use ExecAsyncRequestPending to indicate that the
+ request is pending for a callback described below. Alternatively, it can
+ instead use ExecAsyncRequestDone if a result is available immediately.
+
+2. When the event loop wishes to wait or poll for file descriptor events, the
+ node's ExecAsyncConfigureWait callback will be invoked to configure the
+ file descriptor event for which the node wishes to wait.
+
+3. When the file descriptor becomes ready, the node's ExecAsyncNotify callback
+ will be invoked; like #1, it should use ExecAsyncRequestPending for another
+ callback or ExecAsyncRequestDone to return a result immediately.
diff --git a/src/backend/executor/execAmi.c b/src/backend/executor/execAmi.c
index 4543ac79edf..58a8aa5ab75 100644
--- a/src/backend/executor/execAmi.c
+++ b/src/backend/executor/execAmi.c
@@ -531,6 +531,10 @@ ExecSupportsBackwardScan(Plan *node)
{
ListCell *l;
+ /* With async, tuples may be interleaved, so can't back up. */
+ if (((Append *) node)->nasyncplans > 0)
+ return false;
+
foreach(l, ((Append *) node)->appendplans)
{
if (!ExecSupportsBackwardScan((Plan *) lfirst(l)))
diff --git a/src/backend/executor/execAsync.c b/src/backend/executor/execAsync.c
new file mode 100644
index 00000000000..f1985e658c4
--- /dev/null
+++ b/src/backend/executor/execAsync.c
@@ -0,0 +1,124 @@
+/*-------------------------------------------------------------------------
+ *
+ * execAsync.c
+ * Support routines for asynchronous execution
+ *
+ * Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * IDENTIFICATION
+ * src/backend/executor/execAsync.c
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include "executor/execAsync.h"
+#include "executor/nodeAppend.h"
+#include "executor/nodeForeignscan.h"
+
+/*
+ * Asynchronously request a tuple from a designed async-capable node.
+ */
+void
+ExecAsyncRequest(AsyncRequest *areq)
+{
+ switch (nodeTag(areq->requestee))
+ {
+ case T_ForeignScanState:
+ ExecAsyncForeignScanRequest(areq);
+ break;
+ default:
+ /* If the node doesn't support async, caller messed up. */
+ elog(ERROR, "unrecognized node type: %d",
+ (int) nodeTag(areq->requestee));
+ }
+
+ ExecAsyncResponse(areq);
+}
+
+/*
+ * Give the asynchronous node a chance to configure the file descriptor event
+ * for which it wishes to wait. We expect the node-type specific callback to
+ * make a single call of the following form:
+ *
+ * AddWaitEventToSet(set, WL_SOCKET_READABLE, fd, NULL, areq);
+ */
+void
+ExecAsyncConfigureWait(AsyncRequest *areq)
+{
+ switch (nodeTag(areq->requestee))
+ {
+ case T_ForeignScanState:
+ ExecAsyncForeignScanConfigureWait(areq);
+ break;
+ default:
+ /* If the node doesn't support async, caller messed up. */
+ elog(ERROR, "unrecognized node type: %d",
+ (int) nodeTag(areq->requestee));
+ }
+}
+
+/*
+ * Call the asynchronous node back when a relevant event has occurred.
+ */
+void
+ExecAsyncNotify(AsyncRequest *areq)
+{
+ switch (nodeTag(areq->requestee))
+ {
+ case T_ForeignScanState:
+ ExecAsyncForeignScanNotify(areq);
+ break;
+ default:
+ /* If the node doesn't support async, caller messed up. */
+ elog(ERROR, "unrecognized node type: %d",
+ (int) nodeTag(areq->requestee));
+ }
+
+ ExecAsyncResponse(areq);
+}
+
+/*
+ * Call the requestor back when an asynchronous node has produced a result.
+ */
+void
+ExecAsyncResponse(AsyncRequest *areq)
+{
+ switch (nodeTag(areq->requestor))
+ {
+ case T_AppendState:
+ ExecAsyncAppendResponse(areq);
+ break;
+ default:
+ /* If the node doesn't support async, caller messed up. */
+ elog(ERROR, "unrecognized node type: %d",
+ (int) nodeTag(areq->requestor));
+ }
+}
+
+/*
+ * A requestee node should call this function to deliver the tuple to its
+ * requestor node. The requestee node can call this from its ExecAsyncRequest
+ * or ExecAsyncNotify callback.
+ */
+void
+ExecAsyncRequestDone(AsyncRequest *areq, TupleTableSlot *result)
+{
+ areq->request_complete = true;
+ areq->result = result;
+}
+
+/*
+ * A requestee node should call this function to indicate that it is pending
+ * for a callback. The requestee node can call this from its ExecAsyncRequest
+ * or ExecAsyncNotify callback.
+ */
+void
+ExecAsyncRequestPending(AsyncRequest *areq)
+{
+ areq->callback_pending = true;
+ areq->request_complete = false;
+ areq->result = NULL;
+}
diff --git a/src/backend/executor/nodeAppend.c b/src/backend/executor/nodeAppend.c
index 15e4115bd6d..7da8ffe0652 100644
--- a/src/backend/executor/nodeAppend.c
+++ b/src/backend/executor/nodeAppend.c
@@ -57,10 +57,13 @@
#include "postgres.h"
+#include "executor/execAsync.h"
#include "executor/execdebug.h"
#include "executor/execPartition.h"
#include "executor/nodeAppend.h"
#include "miscadmin.h"
+#include "pgstat.h"
+#include "storage/latch.h"
/* Shared state for parallel-aware Append. */
struct ParallelAppendState
@@ -78,12 +81,18 @@ struct ParallelAppendState
};
#define INVALID_SUBPLAN_INDEX -1
+#define EVENT_BUFFER_SIZE 16
static TupleTableSlot *ExecAppend(PlanState *pstate);
static bool choose_next_subplan_locally(AppendState *node);
static bool choose_next_subplan_for_leader(AppendState *node);
static bool choose_next_subplan_for_worker(AppendState *node);
static void mark_invalid_subplans_as_finished(AppendState *node);
+static void ExecAppendAsyncBegin(AppendState *node);
+static bool ExecAppendAsyncGetNext(AppendState *node, TupleTableSlot **result);
+static bool ExecAppendAsyncRequest(AppendState *node, TupleTableSlot **result);
+static void ExecAppendAsyncEventWait(AppendState *node);
+static void classify_matching_subplans(AppendState *node);
/* ----------------------------------------------------------------
* ExecInitAppend
@@ -102,7 +111,9 @@ ExecInitAppend(Append *node, EState *estate, int eflags)
AppendState *appendstate = makeNode(AppendState);
PlanState **appendplanstates;
Bitmapset *validsubplans;
+ Bitmapset *asyncplans;
int nplans;
+ int nasyncplans;
int firstvalid;
int i,
j;
@@ -119,6 +130,8 @@ ExecInitAppend(Append *node, EState *estate, int eflags)
/* Let choose_next_subplan_* function handle setting the first subplan */
appendstate->as_whichplan = INVALID_SUBPLAN_INDEX;
+ appendstate->as_syncdone = false;
+ appendstate->as_begun = false;
/* If run-time partition pruning is enabled, then set that up now */
if (node->part_prune_info != NULL)
@@ -191,6 +204,8 @@ ExecInitAppend(Append *node, EState *estate, int eflags)
* While at it, find out the first valid partial plan.
*/
j = 0;
+ asyncplans = NULL;
+ nasyncplans = 0;
firstvalid = nplans;
i = -1;
while ((i = bms_next_member(validsubplans, i)) >= 0)
@@ -198,6 +213,17 @@ ExecInitAppend(Append *node, EState *estate, int eflags)
Plan *initNode = (Plan *) list_nth(node->appendplans, i);
/*
+ * Record async subplans. When executing EvalPlanQual, we treat them
+ * as sync ones; don't do this when initializing an EvalPlanQual plan
+ * tree.
+ */
+ if (initNode->async_capable && estate->es_epq_active == NULL)
+ {
+ asyncplans = bms_add_member(asyncplans, j);
+ nasyncplans++;
+ }
+
+ /*
* Record the lowest appendplans index which is a valid partial plan.
*/
if (i >= node->first_partial_plan && j < firstvalid)
@@ -210,6 +236,37 @@ ExecInitAppend(Append *node, EState *estate, int eflags)
appendstate->appendplans = appendplanstates;
appendstate->as_nplans = nplans;
+ /* Initialize async state */
+ appendstate->as_asyncplans = asyncplans;
+ appendstate->as_nasyncplans = nasyncplans;
+ appendstate->as_asyncrequests = NULL;
+ appendstate->as_asyncresults = (TupleTableSlot **)
+ palloc0(nasyncplans * sizeof(TupleTableSlot *));
+ appendstate->as_needrequest = NULL;
+ appendstate->as_eventset = NULL;
+
+ if (nasyncplans > 0)
+ {
+ appendstate->as_asyncrequests = (AsyncRequest **)
+ palloc0(nplans * sizeof(AsyncRequest *));
+
+ i = -1;
+ while ((i = bms_next_member(asyncplans, i)) >= 0)
+ {
+ AsyncRequest *areq;
+
+ areq = palloc(sizeof(AsyncRequest));
+ areq->requestor = (PlanState *) appendstate;
+ areq->requestee = appendplanstates[i];
+ areq->request_index = i;
+ areq->callback_pending = false;
+ areq->request_complete = false;
+ areq->result = NULL;
+
+ appendstate->as_asyncrequests[i] = areq;
+ }
+ }
+
/*
* Miscellaneous initialization
*/
@@ -232,31 +289,59 @@ static TupleTableSlot *
ExecAppend(PlanState *pstate)
{
AppendState *node = castNode(AppendState, pstate);
+ TupleTableSlot *result;
- if (node->as_whichplan < 0)
+ /*
+ * If this is the first call after Init or ReScan, we need to do the
+ * initialization work.
+ */
+ if (!node->as_begun)
{
+ Assert(node->as_whichplan == INVALID_SUBPLAN_INDEX);
+ Assert(!node->as_syncdone);
+
/* Nothing to do if there are no subplans */
if (node->as_nplans == 0)
return ExecClearTuple(node->ps.ps_ResultTupleSlot);
+ /* If there are any async subplans, begin executing them. */
+ if (node->as_nasyncplans > 0)
+ ExecAppendAsyncBegin(node);
+
/*
- * If no subplan has been chosen, we must choose one before
+ * If no sync subplan has been chosen, we must choose one before
* proceeding.
*/
- if (node->as_whichplan == INVALID_SUBPLAN_INDEX &&
- !node->choose_next_subplan(node))
+ if (!node->choose_next_subplan(node) && node->as_nasyncremain == 0)
return ExecClearTuple(node->ps.ps_ResultTupleSlot);
+
+ Assert(node->as_syncdone ||
+ (node->as_whichplan >= 0 &&
+ node->as_whichplan < node->as_nplans));
+
+ /* And we're initialized. */
+ node->as_begun = true;
}
for (;;)
{
PlanState *subnode;
- TupleTableSlot *result;
CHECK_FOR_INTERRUPTS();
/*
- * figure out which subplan we are currently processing
+ * try to get a tuple from an async subplan if any
+ */
+ if (node->as_syncdone || !bms_is_empty(node->as_needrequest))
+ {
+ if (ExecAppendAsyncGetNext(node, &result))
+ return result;
+ Assert(!node->as_syncdone);
+ Assert(bms_is_empty(node->as_needrequest));
+ }
+
+ /*
+ * figure out which sync subplan we are currently processing
*/
Assert(node->as_whichplan >= 0 && node->as_whichplan < node->as_nplans);
subnode = node->appendplans[node->as_whichplan];
@@ -276,8 +361,16 @@ ExecAppend(PlanState *pstate)
return result;
}
- /* choose new subplan; if none, we're done */
- if (!node->choose_next_subplan(node))
+ /*
+ * wait or poll async events if any. We do this before checking for
+ * the end of iteration, because it might drain the remaining async
+ * subplans.
+ */
+ if (node->as_nasyncremain > 0)
+ ExecAppendAsyncEventWait(node);
+
+ /* choose new sync subplan; if no sync/async subplans, we're done */
+ if (!node->choose_next_subplan(node) && node->as_nasyncremain == 0)
return ExecClearTuple(node->ps.ps_ResultTupleSlot);
}
}
@@ -313,6 +406,7 @@ ExecEndAppend(AppendState *node)
void
ExecReScanAppend(AppendState *node)
{
+ int nasyncplans = node->as_nasyncplans;
int i;
/*
@@ -326,6 +420,11 @@ ExecReScanAppend(AppendState *node)
{
bms_free(node->as_valid_subplans);
node->as_valid_subplans = NULL;
+ if (nasyncplans > 0)
+ {
+ bms_free(node->as_valid_asyncplans);
+ node->as_valid_asyncplans = NULL;
+ }
}
for (i = 0; i < node->as_nplans; i++)
@@ -347,8 +446,27 @@ ExecReScanAppend(AppendState *node)
ExecReScan(subnode);
}
+ /* Reset async state */
+ if (nasyncplans > 0)
+ {
+ i = -1;
+ while ((i = bms_next_member(node->as_asyncplans, i)) >= 0)
+ {
+ AsyncRequest *areq = node->as_asyncrequests[i];
+
+ areq->callback_pending = false;
+ areq->request_complete = false;
+ areq->result = NULL;
+ }
+
+ bms_free(node->as_needrequest);
+ node->as_needrequest = NULL;
+ }
+
/* Let choose_next_subplan_* function handle setting the first subplan */
node->as_whichplan = INVALID_SUBPLAN_INDEX;
+ node->as_syncdone = false;
+ node->as_begun = false;
}
/* ----------------------------------------------------------------
@@ -429,7 +547,7 @@ ExecAppendInitializeWorker(AppendState *node, ParallelWorkerContext *pwcxt)
/* ----------------------------------------------------------------
* choose_next_subplan_locally
*
- * Choose next subplan for a non-parallel-aware Append,
+ * Choose next sync subplan for a non-parallel-aware Append,
* returning false if there are no more.
* ----------------------------------------------------------------
*/
@@ -442,16 +560,25 @@ choose_next_subplan_locally(AppendState *node)
/* We should never be called when there are no subplans */
Assert(node->as_nplans > 0);
+ /* Nothing to do if syncdone */
+ if (node->as_syncdone)
+ return false;
+
/*
* If first call then have the bms member function choose the first valid
- * subplan by initializing whichplan to -1. If there happen to be no
- * valid subplans then the bms member function will handle that by
- * returning a negative number which will allow us to exit returning a
+ * sync subplan by initializing whichplan to -1. If there happen to be
+ * no valid sync subplans then the bms member function will handle that
+ * by returning a negative number which will allow us to exit returning a
* false value.
*/
if (whichplan == INVALID_SUBPLAN_INDEX)
{
- if (node->as_valid_subplans == NULL)
+ if (node->as_nasyncplans > 0)
+ {
+ /* We'd have filled as_valid_subplans already */
+ Assert(node->as_valid_subplans);
+ }
+ else if (node->as_valid_subplans == NULL)
node->as_valid_subplans =
ExecFindMatchingSubPlans(node->as_prune_state);
@@ -467,7 +594,12 @@ choose_next_subplan_locally(AppendState *node)
nextplan = bms_prev_member(node->as_valid_subplans, whichplan);
if (nextplan < 0)
+ {
+ /* Set as_syncdone if in async mode */
+ if (node->as_nasyncplans > 0)
+ node->as_syncdone = true;
return false;
+ }
node->as_whichplan = nextplan;
@@ -709,3 +841,306 @@ mark_invalid_subplans_as_finished(AppendState *node)
node->as_pstate->pa_finished[i] = true;
}
}
+
+/* ----------------------------------------------------------------
+ * Asynchronous Append Support
+ * ----------------------------------------------------------------
+ */
+
+/* ----------------------------------------------------------------
+ * ExecAppendAsyncBegin
+ *
+ * Begin executing designed async-capable subplans.
+ * ----------------------------------------------------------------
+ */
+static void
+ExecAppendAsyncBegin(AppendState *node)
+{
+ int i;
+
+ /* Backward scan is not supported by async-aware Appends. */
+ Assert(ScanDirectionIsForward(node->ps.state->es_direction));
+
+ /* We should never be called when there are no async subplans. */
+ Assert(node->as_nasyncplans > 0);
+
+ /* If we've yet to determine the valid subplans then do so now. */
+ if (node->as_valid_subplans == NULL)
+ node->as_valid_subplans =
+ ExecFindMatchingSubPlans(node->as_prune_state);
+
+ classify_matching_subplans(node);
+
+ /* Nothing to do if there are no valid async subplans. */
+ if (node->as_nasyncremain == 0)
+ return;
+
+ /* Make a request for each of the valid async subplans. */
+ i = -1;
+ while ((i = bms_next_member(node->as_valid_asyncplans, i)) >= 0)
+ {
+ AsyncRequest *areq = node->as_asyncrequests[i];
+
+ Assert(areq->request_index == i);
+ Assert(!areq->callback_pending);
+
+ /* Do the actual work. */
+ ExecAsyncRequest(areq);
+ }
+}
+
+/* ----------------------------------------------------------------
+ * ExecAppendAsyncGetNext
+ *
+ * Get the next tuple from any of the asynchronous subplans.
+ * ----------------------------------------------------------------
+ */
+static bool
+ExecAppendAsyncGetNext(AppendState *node, TupleTableSlot **result)
+{
+ *result = NULL;
+
+ /* We should never be called when there are no valid async subplans. */
+ Assert(node->as_nasyncremain > 0);
+
+ /* Request a tuple asynchronously. */
+ if (ExecAppendAsyncRequest(node, result))
+ return true;
+
+ while (node->as_nasyncremain > 0)
+ {
+ CHECK_FOR_INTERRUPTS();
+
+ /* Wait or poll async events. */
+ ExecAppendAsyncEventWait(node);
+
+ /* Request a tuple asynchronously. */
+ if (ExecAppendAsyncRequest(node, result))
+ return true;
+
+ /* Break from loop if there's any sync subplan that isn't complete. */
+ if (!node->as_syncdone)
+ break;
+ }
+
+ /*
+ * If all sync subplans are complete, we're totally done scanning the
+ * given node. Otherwise, we're done with the asynchronous stuff but
+ * must continue scanning the sync subplans.
+ */
+ if (node->as_syncdone)
+ {
+ Assert(node->as_nasyncremain == 0);
+ *result = ExecClearTuple(node->ps.ps_ResultTupleSlot);
+ return true;
+ }
+
+ return false;
+}
+
+/* ----------------------------------------------------------------
+ * ExecAppendAsyncRequest
+ *
+ * Request a tuple asynchronously.
+ * ----------------------------------------------------------------
+ */
+static bool
+ExecAppendAsyncRequest(AppendState *node, TupleTableSlot **result)
+{
+ Bitmapset *needrequest;
+ int i;
+
+ /* Nothing to do if there are no async subplans needing a new request. */
+ if (bms_is_empty(node->as_needrequest))
+ return false;
+
+ /*
+ * If there are any asynchronously-generated results that have not yet
+ * been returned, we have nothing to do; just return one of them.
+ */
+ if (node->as_nasyncresults > 0)
+ {
+ --node->as_nasyncresults;
+ *result = node->as_asyncresults[node->as_nasyncresults];
+ return true;
+ }
+
+ /* Make a new request for each of the async subplans that need it. */
+ needrequest = node->as_needrequest;
+ node->as_needrequest = NULL;
+ i = -1;
+ while ((i = bms_next_member(needrequest, i)) >= 0)
+ {
+ AsyncRequest *areq = node->as_asyncrequests[i];
+
+ /* Do the actual work. */
+ ExecAsyncRequest(areq);
+ }
+ bms_free(needrequest);
+
+ /* Return one of the asynchronously-generated results if any. */
+ if (node->as_nasyncresults > 0)
+ {
+ --node->as_nasyncresults;
+ *result = node->as_asyncresults[node->as_nasyncresults];
+ return true;
+ }
+
+ return false;
+}
+
+/* ----------------------------------------------------------------
+ * ExecAppendAsyncEventWait
+ *
+ * Wait or poll for file descriptor events and fire callbacks.
+ * ----------------------------------------------------------------
+ */
+static void
+ExecAppendAsyncEventWait(AppendState *node)
+{
+ long timeout = node->as_syncdone ? -1 : 0;
+ WaitEvent occurred_event[EVENT_BUFFER_SIZE];
+ int noccurred;
+ int i;
+
+ /* We should never be called when there are no valid async subplans. */
+ Assert(node->as_nasyncremain > 0);
+
+ node->as_eventset = CreateWaitEventSet(CurrentMemoryContext,
+ node->as_nasyncplans + 1);
+ AddWaitEventToSet(node->as_eventset, WL_EXIT_ON_PM_DEATH, PGINVALID_SOCKET,
+ NULL, NULL);
+
+ /* Give each waiting subplan a chance to add an event. */
+ i = -1;
+ while ((i = bms_next_member(node->as_asyncplans, i)) >= 0)
+ {
+ AsyncRequest *areq = node->as_asyncrequests[i];
+
+ if (areq->callback_pending)
+ ExecAsyncConfigureWait(areq);
+ }
+
+ /* Wait for at least one event to occur. */
+ noccurred = WaitEventSetWait(node->as_eventset, timeout, occurred_event,
+ EVENT_BUFFER_SIZE, WAIT_EVENT_APPEND_READY);
+ FreeWaitEventSet(node->as_eventset);
+ node->as_eventset = NULL;
+ if (noccurred == 0)
+ return;
+
+ /* Deliver notifications. */
+ for (i = 0; i < noccurred; i++)
+ {
+ WaitEvent *w = &occurred_event[i];
+
+ /*
+ * Each waiting subplan should have registered its wait event with
+ * user_data pointing back to its AsyncRequest.
+ */
+ if ((w->events & WL_SOCKET_READABLE) != 0)
+ {
+ AsyncRequest *areq = (AsyncRequest *) w->user_data;
+
+ /*
+ * Mark it as no longer needing a callback. We must do this
+ * before dispatching the callback in case the callback resets
+ * the flag.
+ */
+ Assert(areq->callback_pending);
+ areq->callback_pending = false;
+
+ /* Do the actual work. */
+ ExecAsyncNotify(areq);
+ }
+ }
+}
+
+/* ----------------------------------------------------------------
+ * ExecAsyncAppendResponse
+ *
+ * Receive a response from an asynchronous request we made.
+ * ----------------------------------------------------------------
+ */
+void
+ExecAsyncAppendResponse(AsyncRequest *areq)
+{
+ AppendState *node = (AppendState *) areq->requestor;
+ TupleTableSlot *slot = areq->result;
+
+ /* The result should be a TupleTableSlot or NULL. */
+ Assert(slot == NULL || IsA(slot, TupleTableSlot));
+
+ /* Nothing to do if the request is pending. */
+ if (!areq->request_complete)
+ {
+ /* The request would have been pending for a callback */
+ Assert(areq->callback_pending);
+ return;
+ }
+
+ /* If the result is NULL or an empty slot, there's nothing more to do. */
+ if (TupIsNull(slot))
+ {
+ /* The ending subplan wouldn't have been pending for a callback. */
+ Assert(!areq->callback_pending);
+ --node->as_nasyncremain;
+ return;
+ }
+
+ /* Save result so we can return it. */
+ Assert(node->as_nasyncresults < node->as_nasyncplans);
+ node->as_asyncresults[node->as_nasyncresults++] = slot;
+
+ /*
+ * Mark the subplan that returned a result as ready for a new request. We
+ * don't launch another one here immediately because it might complete.
+ */
+ node->as_needrequest = bms_add_member(node->as_needrequest,
+ areq->request_index);
+}
+
+/* ----------------------------------------------------------------
+ * classify_matching_subplans
+ *
+ * Classify the node's as_valid_subplans into sync ones and
+ * async ones, adjust it to contain sync ones only, and save
+ * async ones in the node's as_valid_asyncplans.
+ * ----------------------------------------------------------------
+ */
+static void
+classify_matching_subplans(AppendState *node)
+{
+ Bitmapset *valid_asyncplans;
+
+ Assert(node->as_valid_asyncplans == NULL);
+
+ /* Nothing to do if there are no valid subplans. */
+ if (bms_is_empty(node->as_valid_subplans))
+ {
+ node->as_syncdone = true;
+ node->as_nasyncremain = 0;
+ return;
+ }
+
+ /* Nothing to do if there are no valid async subplans. */
+ if (!bms_overlap(node->as_valid_subplans, node->as_asyncplans))
+ {
+ node->as_nasyncremain = 0;
+ return;
+ }
+
+ /* Get valid async subplans. */
+ valid_asyncplans = bms_copy(node->as_asyncplans);
+ valid_asyncplans = bms_int_members(valid_asyncplans,
+ node->as_valid_subplans);
+
+ /* Adjust the valid subplans to contain sync subplans only. */
+ node->as_valid_subplans = bms_del_members(node->as_valid_subplans,
+ valid_asyncplans);
+ node->as_syncdone = bms_is_empty(node->as_valid_subplans);
+
+ /* Save valid async subplans. */
+ node->as_valid_asyncplans = valid_asyncplans;
+ node->as_nasyncremain = bms_num_members(valid_asyncplans);
+}
diff --git a/src/backend/executor/nodeForeignscan.c b/src/backend/executor/nodeForeignscan.c
index 0969e53c3a4..898890fb08f 100644
--- a/src/backend/executor/nodeForeignscan.c
+++ b/src/backend/executor/nodeForeignscan.c
@@ -391,3 +391,51 @@ ExecShutdownForeignScan(ForeignScanState *node)
if (fdwroutine->ShutdownForeignScan)
fdwroutine->ShutdownForeignScan(node);
}
+
+/* ----------------------------------------------------------------
+ * ExecAsyncForeignScanRequest
+ *
+ * Asynchronously request a tuple from a designed async-capable node
+ * ----------------------------------------------------------------
+ */
+void
+ExecAsyncForeignScanRequest(AsyncRequest *areq)
+{
+ ForeignScanState *node = (ForeignScanState *) areq->requestee;
+ FdwRoutine *fdwroutine = node->fdwroutine;
+
+ Assert(fdwroutine->ForeignAsyncRequest != NULL);
+ fdwroutine->ForeignAsyncRequest(areq);
+}
+
+/* ----------------------------------------------------------------
+ * ExecAsyncForeignScanConfigureWait
+ *
+ * In async mode, configure for a wait
+ * ----------------------------------------------------------------
+ */
+void
+ExecAsyncForeignScanConfigureWait(AsyncRequest *areq)
+{
+ ForeignScanState *node = (ForeignScanState *) areq->requestee;
+ FdwRoutine *fdwroutine = node->fdwroutine;
+
+ Assert(fdwroutine->ForeignAsyncConfigureWait != NULL);
+ fdwroutine->ForeignAsyncConfigureWait(areq);
+}
+
+/* ----------------------------------------------------------------
+ * ExecAsyncForeignScanNotify
+ *
+ * Callback invoked when a relevant event has occurred
+ * ----------------------------------------------------------------
+ */
+void
+ExecAsyncForeignScanNotify(AsyncRequest *areq)
+{
+ ForeignScanState *node = (ForeignScanState *) areq->requestee;
+ FdwRoutine *fdwroutine = node->fdwroutine;
+
+ Assert(fdwroutine->ForeignAsyncNotify != NULL);
+ fdwroutine->ForeignAsyncNotify(areq);
+}
diff --git a/src/backend/nodes/copyfuncs.c b/src/backend/nodes/copyfuncs.c
index 1d0bb6e2e74..d58b79d525c 100644
--- a/src/backend/nodes/copyfuncs.c
+++ b/src/backend/nodes/copyfuncs.c
@@ -120,6 +120,7 @@ CopyPlanFields(const Plan *from, Plan *newnode)
COPY_SCALAR_FIELD(plan_width);
COPY_SCALAR_FIELD(parallel_aware);
COPY_SCALAR_FIELD(parallel_safe);
+ COPY_SCALAR_FIELD(async_capable);
COPY_SCALAR_FIELD(plan_node_id);
COPY_NODE_FIELD(targetlist);
COPY_NODE_FIELD(qual);
@@ -241,6 +242,7 @@ _copyAppend(const Append *from)
*/
COPY_BITMAPSET_FIELD(apprelids);
COPY_NODE_FIELD(appendplans);
+ COPY_SCALAR_FIELD(nasyncplans);
COPY_SCALAR_FIELD(first_partial_plan);
COPY_NODE_FIELD(part_prune_info);
diff --git a/src/backend/nodes/outfuncs.c b/src/backend/nodes/outfuncs.c
index 301fa304902..ff127a19adf 100644
--- a/src/backend/nodes/outfuncs.c
+++ b/src/backend/nodes/outfuncs.c
@@ -333,6 +333,7 @@ _outPlanInfo(StringInfo str, const Plan *node)
WRITE_INT_FIELD(plan_width);
WRITE_BOOL_FIELD(parallel_aware);
WRITE_BOOL_FIELD(parallel_safe);
+ WRITE_BOOL_FIELD(async_capable);
WRITE_INT_FIELD(plan_node_id);
WRITE_NODE_FIELD(targetlist);
WRITE_NODE_FIELD(qual);
@@ -431,6 +432,7 @@ _outAppend(StringInfo str, const Append *node)
WRITE_BITMAPSET_FIELD(apprelids);
WRITE_NODE_FIELD(appendplans);
+ WRITE_INT_FIELD(nasyncplans);
WRITE_INT_FIELD(first_partial_plan);
WRITE_NODE_FIELD(part_prune_info);
}
diff --git a/src/backend/nodes/readfuncs.c b/src/backend/nodes/readfuncs.c
index 377185f7c67..6a563e99033 100644
--- a/src/backend/nodes/readfuncs.c
+++ b/src/backend/nodes/readfuncs.c
@@ -1615,6 +1615,7 @@ ReadCommonPlan(Plan *local_node)
READ_INT_FIELD(plan_width);
READ_BOOL_FIELD(parallel_aware);
READ_BOOL_FIELD(parallel_safe);
+ READ_BOOL_FIELD(async_capable);
READ_INT_FIELD(plan_node_id);
READ_NODE_FIELD(targetlist);
READ_NODE_FIELD(qual);
@@ -1711,6 +1712,7 @@ _readAppend(void)
READ_BITMAPSET_FIELD(apprelids);
READ_NODE_FIELD(appendplans);
+ READ_INT_FIELD(nasyncplans);
READ_INT_FIELD(first_partial_plan);
READ_NODE_FIELD(part_prune_info);
diff --git a/src/backend/optimizer/path/costsize.c b/src/backend/optimizer/path/costsize.c
index b92c9485882..0c016a03dd9 100644
--- a/src/backend/optimizer/path/costsize.c
+++ b/src/backend/optimizer/path/costsize.c
@@ -147,6 +147,7 @@ bool enable_partitionwise_aggregate = false;
bool enable_parallel_append = true;
bool enable_parallel_hash = true;
bool enable_partition_pruning = true;
+bool enable_async_append = true;
typedef struct
{
diff --git a/src/backend/optimizer/plan/createplan.c b/src/backend/optimizer/plan/createplan.c
index 906cab70532..78ef068fb7b 100644
--- a/src/backend/optimizer/plan/createplan.c
+++ b/src/backend/optimizer/plan/createplan.c
@@ -81,6 +81,7 @@ static List *get_gating_quals(PlannerInfo *root, List *quals);
static Plan *create_gating_plan(PlannerInfo *root, Path *path, Plan *plan,
List *gating_quals);
static Plan *create_join_plan(PlannerInfo *root, JoinPath *best_path);
+static bool is_async_capable_path(Path *path);
static Plan *create_append_plan(PlannerInfo *root, AppendPath *best_path,
int flags);
static Plan *create_merge_append_plan(PlannerInfo *root, MergeAppendPath *best_path,
@@ -1081,6 +1082,31 @@ create_join_plan(PlannerInfo *root, JoinPath *best_path)
}
/*
+ * is_async_capable_path
+ * Check whether a given Path node is async-capable.
+ */
+static bool
+is_async_capable_path(Path *path)
+{
+ switch (nodeTag(path))
+ {
+ case T_ForeignPath:
+ {
+ FdwRoutine *fdwroutine = path->parent->fdwroutine;
+
+ Assert(fdwroutine != NULL);
+ if (fdwroutine->IsForeignPathAsyncCapable != NULL &&
+ fdwroutine->IsForeignPathAsyncCapable((ForeignPath *) path))
+ return true;
+ }
+ break;
+ default:
+ break;
+ }
+ return false;
+}
+
+/*
* create_append_plan
* Create an Append plan for 'best_path' and (recursively) plans
* for its subpaths.
@@ -1097,6 +1123,7 @@ create_append_plan(PlannerInfo *root, AppendPath *best_path, int flags)
List *pathkeys = best_path->path.pathkeys;
List *subplans = NIL;
ListCell *subpaths;
+ int nasyncplans = 0;
RelOptInfo *rel = best_path->path.parent;
PartitionPruneInfo *partpruneinfo = NULL;
int nodenumsortkeys = 0;
@@ -1104,6 +1131,7 @@ create_append_plan(PlannerInfo *root, AppendPath *best_path, int flags)
Oid *nodeSortOperators = NULL;
Oid *nodeCollations = NULL;
bool *nodeNullsFirst = NULL;
+ bool consider_async = false;
/*
* The subpaths list could be empty, if every child was proven empty by
@@ -1167,6 +1195,11 @@ create_append_plan(PlannerInfo *root, AppendPath *best_path, int flags)
tlist_was_changed = (orig_tlist_length != list_length(plan->plan.targetlist));
}
+ /* If appropriate, consider async append */
+ consider_async = (enable_async_append && pathkeys == NIL &&
+ !best_path->path.parallel_safe &&
+ list_length(best_path->subpaths) > 1);
+
/* Build the plan for each child */
foreach(subpaths, best_path->subpaths)
{
@@ -1234,6 +1267,13 @@ create_append_plan(PlannerInfo *root, AppendPath *best_path, int flags)
}
subplans = lappend(subplans, subplan);
+
+ /* Check to see if subplan can be executed asynchronously */
+ if (consider_async && is_async_capable_path(subpath))
+ {
+ subplan->async_capable = true;
+ ++nasyncplans;
+ }
}
/*
@@ -1266,6 +1306,7 @@ create_append_plan(PlannerInfo *root, AppendPath *best_path, int flags)
}
plan->appendplans = subplans;
+ plan->nasyncplans = nasyncplans;
plan->first_partial_plan = best_path->first_partial_path;
plan->part_prune_info = partpruneinfo;
diff --git a/src/backend/postmaster/pgstat.c b/src/backend/postmaster/pgstat.c
index 60f45ccc4ea..4b9bcd2b41a 100644
--- a/src/backend/postmaster/pgstat.c
+++ b/src/backend/postmaster/pgstat.c
@@ -3995,6 +3995,9 @@ pgstat_get_wait_ipc(WaitEventIPC w)
switch (w)
{
+ case WAIT_EVENT_APPEND_READY:
+ event_name = "AppendReady";
+ break;
case WAIT_EVENT_BACKUP_WAIT_WAL_ARCHIVE:
event_name = "BackupWaitWalArchive";
break;
diff --git a/src/backend/storage/ipc/latch.c b/src/backend/storage/ipc/latch.c
index 43a5fded103..5f3318fa8f1 100644
--- a/src/backend/storage/ipc/latch.c
+++ b/src/backend/storage/ipc/latch.c
@@ -2020,6 +2020,15 @@ WaitEventSetWaitBlock(WaitEventSet *set, int cur_timeout,
}
#endif
+/*
+ * Get the number of wait events registered in a given WaitEventSet.
+ */
+int
+GetNumRegisteredWaitEvents(WaitEventSet *set)
+{
+ return set->nevents;
+}
+
#if defined(WAIT_USE_POLL)
/*
diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c
index 0c5dc4d3e84..03daec9a085 100644
--- a/src/backend/utils/misc/guc.c
+++ b/src/backend/utils/misc/guc.c
@@ -1129,6 +1129,16 @@ static struct config_bool ConfigureNamesBool[] =
NULL, NULL, NULL
},
{
+ {"enable_async_append", PGC_USERSET, QUERY_TUNING_METHOD,
+ gettext_noop("Enables the planner's use of async append plans."),
+ NULL,
+ GUC_EXPLAIN
+ },
+ &enable_async_append,
+ true,
+ NULL, NULL, NULL
+ },
+ {
{"geqo", PGC_USERSET, QUERY_TUNING_GEQO,
gettext_noop("Enables genetic query optimization."),
gettext_noop("This algorithm attempts to do planning without "
diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample
index b234a6bfe64..791d39cf078 100644
--- a/src/backend/utils/misc/postgresql.conf.sample
+++ b/src/backend/utils/misc/postgresql.conf.sample
@@ -371,6 +371,7 @@
#enable_partitionwise_aggregate = off
#enable_parallel_hash = on
#enable_partition_pruning = on
+#enable_async_append = on
# - Planner Cost Constants -
diff --git a/src/include/executor/execAsync.h b/src/include/executor/execAsync.h
new file mode 100644
index 00000000000..724034f2265
--- /dev/null
+++ b/src/include/executor/execAsync.h
@@ -0,0 +1,25 @@
+/*-------------------------------------------------------------------------
+ * execAsync.h
+ * Support functions for asynchronous execution
+ *
+ * Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * IDENTIFICATION
+ * src/include/executor/execAsync.h
+ *-------------------------------------------------------------------------
+ */
+
+#ifndef EXECASYNC_H
+#define EXECASYNC_H
+
+#include "nodes/execnodes.h"
+
+extern void ExecAsyncRequest(AsyncRequest *areq);
+extern void ExecAsyncConfigureWait(AsyncRequest *areq);
+extern void ExecAsyncNotify(AsyncRequest *areq);
+extern void ExecAsyncResponse(AsyncRequest *areq);
+extern void ExecAsyncRequestDone(AsyncRequest *areq, TupleTableSlot *result);
+extern void ExecAsyncRequestPending(AsyncRequest *areq);
+
+#endif /* EXECASYNC_H */
diff --git a/src/include/executor/nodeAppend.h b/src/include/executor/nodeAppend.h
index cafd410a5da..fa54ac6ad23 100644
--- a/src/include/executor/nodeAppend.h
+++ b/src/include/executor/nodeAppend.h
@@ -25,4 +25,6 @@ extern void ExecAppendInitializeDSM(AppendState *node, ParallelContext *pcxt);
extern void ExecAppendReInitializeDSM(AppendState *node, ParallelContext *pcxt);
extern void ExecAppendInitializeWorker(AppendState *node, ParallelWorkerContext *pwcxt);
+extern void ExecAsyncAppendResponse(AsyncRequest *areq);
+
#endif /* NODEAPPEND_H */
diff --git a/src/include/executor/nodeForeignscan.h b/src/include/executor/nodeForeignscan.h
index 6ae7733e25b..8ffc0ca5bf3 100644
--- a/src/include/executor/nodeForeignscan.h
+++ b/src/include/executor/nodeForeignscan.h
@@ -31,4 +31,8 @@ extern void ExecForeignScanInitializeWorker(ForeignScanState *node,
ParallelWorkerContext *pwcxt);
extern void ExecShutdownForeignScan(ForeignScanState *node);
+extern void ExecAsyncForeignScanRequest(AsyncRequest *areq);
+extern void ExecAsyncForeignScanConfigureWait(AsyncRequest *areq);
+extern void ExecAsyncForeignScanNotify(AsyncRequest *areq);
+
#endif /* NODEFOREIGNSCAN_H */
diff --git a/src/include/foreign/fdwapi.h b/src/include/foreign/fdwapi.h
index 248f78da452..7c89d081c76 100644
--- a/src/include/foreign/fdwapi.h
+++ b/src/include/foreign/fdwapi.h
@@ -178,6 +178,14 @@ typedef List *(*ReparameterizeForeignPathByChild_function) (PlannerInfo *root,
List *fdw_private,
RelOptInfo *child_rel);
+typedef bool (*IsForeignPathAsyncCapable_function) (ForeignPath *path);
+
+typedef void (*ForeignAsyncRequest_function) (AsyncRequest *areq);
+
+typedef void (*ForeignAsyncConfigureWait_function) (AsyncRequest *areq);
+
+typedef void (*ForeignAsyncNotify_function) (AsyncRequest *areq);
+
/*
* FdwRoutine is the struct returned by a foreign-data wrapper's handler
* function. It provides pointers to the callback functions needed by the
@@ -256,6 +264,12 @@ typedef struct FdwRoutine
/* Support functions for path reparameterization. */
ReparameterizeForeignPathByChild_function ReparameterizeForeignPathByChild;
+
+ /* Support functions for asynchronous execution */
+ IsForeignPathAsyncCapable_function IsForeignPathAsyncCapable;
+ ForeignAsyncRequest_function ForeignAsyncRequest;
+ ForeignAsyncConfigureWait_function ForeignAsyncConfigureWait;
+ ForeignAsyncNotify_function ForeignAsyncNotify;
} FdwRoutine;
diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h
index e31ad6204e6..09ea7ef6a6b 100644
--- a/src/include/nodes/execnodes.h
+++ b/src/include/nodes/execnodes.h
@@ -516,6 +516,22 @@ typedef struct ResultRelInfo
} ResultRelInfo;
/* ----------------
+ * AsyncRequest
+ *
+ * State for an asynchronous tuple request.
+ * ----------------
+ */
+typedef struct AsyncRequest
+{
+ struct PlanState *requestor; /* Node that wants a tuple */
+ struct PlanState *requestee; /* Node from which a tuple is wanted */
+ int request_index; /* Scratch space for requestor */
+ bool callback_pending; /* Callback is needed */
+ bool request_complete; /* Request complete, result valid */
+ TupleTableSlot *result; /* Result (NULL if no more tuples) */
+} AsyncRequest;
+
+/* ----------------
* EState information
*
* Working state for an Executor invocation
@@ -1199,12 +1215,12 @@ typedef struct ModifyTableState
* AppendState information
*
* nplans how many plans are in the array
- * whichplan which plan is being executed (0 .. n-1), or a
- * special negative value. See nodeAppend.c.
+ * whichplan which synchronous plan is being executed (0 .. n-1)
+ * or a special negative value. See nodeAppend.c.
* prune_state details required to allow partitions to be
* eliminated from the scan, or NULL if not possible.
- * valid_subplans for runtime pruning, valid appendplans indexes to
- * scan.
+ * valid_subplans for runtime pruning, valid synchronous appendplans
+ * indexes to scan.
* ----------------
*/
@@ -1220,12 +1236,25 @@ struct AppendState
PlanState **appendplans; /* array of PlanStates for my inputs */
int as_nplans;
int as_whichplan;
+ bool as_begun; /* false means need to initialize */
+ Bitmapset *as_asyncplans; /* asynchronous plans indexes */
+ int as_nasyncplans; /* # of asynchronous plans */
+ AsyncRequest **as_asyncrequests; /* array of AsyncRequests */
+ TupleTableSlot **as_asyncresults; /* unreturned results of async plans */
+ int as_nasyncresults; /* # of valid entries in as_asyncresults */
+ bool as_syncdone; /* true if all synchronous plans done in
+ * asynchronous mode, else false */
+ int as_nasyncremain; /* # of remaining asynchronous plans */
+ Bitmapset *as_needrequest; /* asynchronous plans needing a new request */
+ struct WaitEventSet *as_eventset; /* WaitEventSet used to configure
+ * file descriptor wait events */
int as_first_partial_plan; /* Index of 'appendplans' containing
* the first partial plan */
ParallelAppendState *as_pstate; /* parallel coordination info */
Size pstate_len; /* size of parallel coordination info */
struct PartitionPruneState *as_prune_state;
Bitmapset *as_valid_subplans;
+ Bitmapset *as_valid_asyncplans; /* valid asynchronous plans indexes */
bool (*choose_next_subplan) (AppendState *);
};
diff --git a/src/include/nodes/plannodes.h b/src/include/nodes/plannodes.h
index 6e62104d0b7..24ca616740b 100644
--- a/src/include/nodes/plannodes.h
+++ b/src/include/nodes/plannodes.h
@@ -130,6 +130,11 @@ typedef struct Plan
bool parallel_safe; /* OK to use as part of parallel plan? */
/*
+ * information needed for asynchronous execution
+ */
+ bool async_capable; /* engage asynchronous-capable logic? */
+
+ /*
* Common structural data for all Plan types.
*/
int plan_node_id; /* unique across entire final plan tree */
@@ -245,6 +250,7 @@ typedef struct Append
Plan plan;
Bitmapset *apprelids; /* RTIs of appendrel(s) formed by this node */
List *appendplans;
+ int nasyncplans; /* # of asynchronous plans */
/*
* All 'appendplans' preceding this index are non-partial plans. All
diff --git a/src/include/optimizer/cost.h b/src/include/optimizer/cost.h
index 1be93be0983..a3fd93fe07f 100644
--- a/src/include/optimizer/cost.h
+++ b/src/include/optimizer/cost.h
@@ -65,6 +65,7 @@ extern PGDLLIMPORT bool enable_partitionwise_aggregate;
extern PGDLLIMPORT bool enable_parallel_append;
extern PGDLLIMPORT bool enable_parallel_hash;
extern PGDLLIMPORT bool enable_partition_pruning;
+extern PGDLLIMPORT bool enable_async_append;
extern PGDLLIMPORT int constraint_exclusion;
extern double index_pages_fetched(double tuples_fetched, BlockNumber pages,
diff --git a/src/include/pgstat.h b/src/include/pgstat.h
index 87672e6f302..d699502cd9a 100644
--- a/src/include/pgstat.h
+++ b/src/include/pgstat.h
@@ -966,7 +966,8 @@ typedef enum
*/
typedef enum
{
- WAIT_EVENT_BACKUP_WAIT_WAL_ARCHIVE = PG_WAIT_IPC,
+ WAIT_EVENT_APPEND_READY = PG_WAIT_IPC,
+ WAIT_EVENT_BACKUP_WAIT_WAL_ARCHIVE,
WAIT_EVENT_BGWORKER_SHUTDOWN,
WAIT_EVENT_BGWORKER_STARTUP,
WAIT_EVENT_BTREE_PAGE,
diff --git a/src/include/storage/latch.h b/src/include/storage/latch.h
index 9e94fcaec24..44f9368c644 100644
--- a/src/include/storage/latch.h
+++ b/src/include/storage/latch.h
@@ -179,5 +179,6 @@ extern int WaitLatch(Latch *latch, int wakeEvents, long timeout,
extern int WaitLatchOrSocket(Latch *latch, int wakeEvents,
pgsocket sock, long timeout, uint32 wait_event_info);
extern void InitializeLatchWaitSet(void);
+extern int GetNumRegisteredWaitEvents(WaitEventSet *set);
#endif /* LATCH_H */
diff --git a/src/test/regress/expected/explain.out b/src/test/regress/expected/explain.out
index 791eba85111..b89b99fb020 100644
--- a/src/test/regress/expected/explain.out
+++ b/src/test/regress/expected/explain.out
@@ -87,6 +87,7 @@ select explain_filter('explain (analyze, buffers, format json) select * from int
"Plan": { +
"Node Type": "Seq Scan", +
"Parallel Aware": false, +
+ "Async Capable": false, +
"Relation Name": "int8_tbl",+
"Alias": "i8", +
"Startup Cost": N.N, +
@@ -136,6 +137,7 @@ select explain_filter('explain (analyze, buffers, format xml) select * from int8
<Plan> +
<Node-Type>Seq Scan</Node-Type> +
<Parallel-Aware>false</Parallel-Aware> +
+ <Async-Capable>false</Async-Capable> +
<Relation-Name>int8_tbl</Relation-Name> +
<Alias>i8</Alias> +
<Startup-Cost>N.N</Startup-Cost> +
@@ -183,6 +185,7 @@ select explain_filter('explain (analyze, buffers, format yaml) select * from int
- Plan: +
Node Type: "Seq Scan" +
Parallel Aware: false +
+ Async Capable: false +
Relation Name: "int8_tbl"+
Alias: "i8" +
Startup Cost: N.N +
@@ -233,6 +236,7 @@ select explain_filter('explain (buffers, format json) select * from int8_tbl i8'
"Plan": { +
"Node Type": "Seq Scan", +
"Parallel Aware": false, +
+ "Async Capable": false, +
"Relation Name": "int8_tbl",+
"Alias": "i8", +
"Startup Cost": N.N, +
@@ -346,6 +350,7 @@ select jsonb_pretty(
"Actual Rows": 0, +
"Actual Loops": 0, +
"Startup Cost": 0.0, +
+ "Async Capable": false, +
"Relation Name": "tenk1", +
"Parallel Aware": true, +
"Local Hit Blocks": 0, +
@@ -391,6 +396,7 @@ select jsonb_pretty(
"Actual Rows": 0, +
"Actual Loops": 0, +
"Startup Cost": 0.0, +
+ "Async Capable": false, +
"Parallel Aware": false, +
"Sort Space Used": 0, +
"Local Hit Blocks": 0, +
@@ -433,6 +439,7 @@ select jsonb_pretty(
"Actual Rows": 0, +
"Actual Loops": 0, +
"Startup Cost": 0.0, +
+ "Async Capable": false, +
"Parallel Aware": false, +
"Workers Planned": 0, +
"Local Hit Blocks": 0, +
diff --git a/src/test/regress/expected/incremental_sort.out b/src/test/regress/expected/incremental_sort.out
index 68ca321163b..a417b566d95 100644
--- a/src/test/regress/expected/incremental_sort.out
+++ b/src/test/regress/expected/incremental_sort.out
@@ -558,6 +558,7 @@ select jsonb_pretty(explain_analyze_inc_sort_nodes_without_memory('select * from
"Node Type": "Incremental Sort", +
"Actual Rows": 55, +
"Actual Loops": 1, +
+ "Async Capable": false, +
"Presorted Key": [ +
"t.a" +
], +
@@ -760,6 +761,7 @@ select jsonb_pretty(explain_analyze_inc_sort_nodes_without_memory('select * from
"Node Type": "Incremental Sort", +
"Actual Rows": 70, +
"Actual Loops": 1, +
+ "Async Capable": false, +
"Presorted Key": [ +
"t.a" +
], +
diff --git a/src/test/regress/expected/insert_conflict.out b/src/test/regress/expected/insert_conflict.out
index ff157ceb1c1..499245068a5 100644
--- a/src/test/regress/expected/insert_conflict.out
+++ b/src/test/regress/expected/insert_conflict.out
@@ -204,6 +204,7 @@ explain (costs off, format json) insert into insertconflicttest values (0, 'Bilb
"Node Type": "ModifyTable", +
"Operation": "Insert", +
"Parallel Aware": false, +
+ "Async Capable": false, +
"Relation Name": "insertconflicttest", +
"Alias": "insertconflicttest", +
"Conflict Resolution": "UPDATE", +
@@ -213,7 +214,8 @@ explain (costs off, format json) insert into insertconflicttest values (0, 'Bilb
{ +
"Node Type": "Result", +
"Parent Relationship": "Member", +
- "Parallel Aware": false +
+ "Parallel Aware": false, +
+ "Async Capable": false +
} +
] +
} +
diff --git a/src/test/regress/expected/sysviews.out b/src/test/regress/expected/sysviews.out
index 6d048e309cb..98dde452e62 100644
--- a/src/test/regress/expected/sysviews.out
+++ b/src/test/regress/expected/sysviews.out
@@ -95,6 +95,7 @@ select count(*) = 0 as ok from pg_stat_wal_receiver;
select name, setting from pg_settings where name like 'enable%';
name | setting
--------------------------------+---------
+ enable_async_append | on
enable_bitmapscan | on
enable_gathermerge | on
enable_hashagg | on
@@ -113,7 +114,7 @@ select name, setting from pg_settings where name like 'enable%';
enable_seqscan | on
enable_sort | on
enable_tidscan | on
-(18 rows)
+(19 rows)
-- Test that the pg_timezone_names and pg_timezone_abbrevs views are
-- more-or-less working. We can't test their contents in any great detail