diff --git a/contrib/postgres_fdw/expected/postgres_fdw.out b/contrib/postgres_fdw/expected/postgres_fdw.out index 0659656b6b..7b7c0db16c 100644 --- a/contrib/postgres_fdw/expected/postgres_fdw.out +++ b/contrib/postgres_fdw/expected/postgres_fdw.out @@ -9973,6 +9973,48 @@ SELECT * FROM join_tbl ORDER BY a1; DELETE FROM join_tbl; RESET enable_partitionwise_join; +-- Test rescan of an async Append node with do_exec_prune=false +SET enable_hashjoin TO false; +EXPLAIN (VERBOSE, COSTS OFF) +INSERT INTO join_tbl SELECT * FROM async_p1 t1, async_pt t2 WHERE t1.a = t2.a AND t1.b = t2.b AND t1.b % 100 = 0; + QUERY PLAN +---------------------------------------------------------------------------------------- + Insert on public.join_tbl + -> Nested Loop + Output: t1.a, t1.b, t1.c, t2.a, t2.b, t2.c + Join Filter: ((t1.a = t2.a) AND (t1.b = t2.b)) + -> Foreign Scan on public.async_p1 t1 + Output: t1.a, t1.b, t1.c + Remote SQL: SELECT a, b, c FROM public.base_tbl1 WHERE (((b % 100) = 0)) + -> Append + -> Async Foreign Scan on public.async_p1 t2_1 + Output: t2_1.a, t2_1.b, t2_1.c + Remote SQL: SELECT a, b, c FROM public.base_tbl1 + -> Async Foreign Scan on public.async_p2 t2_2 + Output: t2_2.a, t2_2.b, t2_2.c + Remote SQL: SELECT a, b, c FROM public.base_tbl2 + -> Seq Scan on public.async_p3 t2_3 + Output: t2_3.a, t2_3.b, t2_3.c +(16 rows) + +INSERT INTO join_tbl SELECT * FROM async_p1 t1, async_pt t2 WHERE t1.a = t2.a AND t1.b = t2.b AND t1.b % 100 = 0; +SELECT * FROM join_tbl ORDER BY a1; + a1 | b1 | c1 | a2 | b2 | c2 +------+-----+------+------+-----+------ + 1000 | 0 | 0000 | 1000 | 0 | 0000 + 1100 | 100 | 0100 | 1100 | 100 | 0100 + 1200 | 200 | 0200 | 1200 | 200 | 0200 + 1300 | 300 | 0300 | 1300 | 300 | 0300 + 1400 | 400 | 0400 | 1400 | 400 | 0400 + 1500 | 500 | 0500 | 1500 | 500 | 0500 + 1600 | 600 | 0600 | 1600 | 600 | 0600 + 1700 | 700 | 0700 | 1700 | 700 | 0700 + 1800 | 800 | 0800 | 1800 | 800 | 0800 + 1900 | 900 | 0900 | 1900 | 900 | 0900 +(10 rows) + +DELETE FROM join_tbl; +RESET enable_hashjoin; -- Test interaction of async execution with plan-time partition pruning EXPLAIN (VERBOSE, COSTS OFF) SELECT * FROM async_pt WHERE a < 3000; diff --git a/contrib/postgres_fdw/sql/postgres_fdw.sql b/contrib/postgres_fdw/sql/postgres_fdw.sql index 79295e996d..191efbf7c2 100644 --- a/contrib/postgres_fdw/sql/postgres_fdw.sql +++ b/contrib/postgres_fdw/sql/postgres_fdw.sql @@ -3157,6 +3157,18 @@ DELETE FROM join_tbl; RESET enable_partitionwise_join; +-- Test rescan of an async Append node with do_exec_prune=false +SET enable_hashjoin TO false; + +EXPLAIN (VERBOSE, COSTS OFF) +INSERT INTO join_tbl SELECT * FROM async_p1 t1, async_pt t2 WHERE t1.a = t2.a AND t1.b = t2.b AND t1.b % 100 = 0; +INSERT INTO join_tbl SELECT * FROM async_p1 t1, async_pt t2 WHERE t1.a = t2.a AND t1.b = t2.b AND t1.b % 100 = 0; + +SELECT * FROM join_tbl ORDER BY a1; +DELETE FROM join_tbl; + +RESET enable_hashjoin; + -- Test interaction of async execution with plan-time partition pruning EXPLAIN (VERBOSE, COSTS OFF) SELECT * FROM async_pt WHERE a < 3000; diff --git a/src/backend/executor/nodeAppend.c b/src/backend/executor/nodeAppend.c index 62335ed4c4..755c1392f0 100644 --- a/src/backend/executor/nodeAppend.c +++ b/src/backend/executor/nodeAppend.c @@ -240,10 +240,12 @@ ExecInitAppend(Append *node, EState *estate, int eflags) appendstate->as_asyncplans = asyncplans; appendstate->as_nasyncplans = nasyncplans; appendstate->as_asyncrequests = NULL; - appendstate->as_asyncresults = (TupleTableSlot **) - palloc0(nasyncplans * sizeof(TupleTableSlot *)); + appendstate->as_asyncresults = NULL; + appendstate->as_nasyncresults = 0; + appendstate->as_nasyncremain = 0; appendstate->as_needrequest = NULL; appendstate->as_eventset = NULL; + appendstate->as_valid_asyncplans = NULL; if (nasyncplans > 0) { @@ -265,6 +267,12 @@ ExecInitAppend(Append *node, EState *estate, int eflags) appendstate->as_asyncrequests[i] = areq; } + + appendstate->as_asyncresults = (TupleTableSlot **) + palloc0(nasyncplans * sizeof(TupleTableSlot *)); + + if (appendstate->as_valid_subplans != NULL) + classify_matching_subplans(appendstate); } /* @@ -459,6 +467,8 @@ ExecReScanAppend(AppendState *node) areq->result = NULL; } + node->as_nasyncresults = 0; + node->as_nasyncremain = 0; bms_free(node->as_needrequest); node->as_needrequest = NULL; } @@ -861,15 +871,24 @@ ExecAppendAsyncBegin(AppendState *node) /* Backward scan is not supported by async-aware Appends. */ Assert(ScanDirectionIsForward(node->ps.state->es_direction)); + /* We should never be called when there are no subplans */ + Assert(node->as_nplans > 0); + /* We should never be called when there are no async subplans. */ Assert(node->as_nasyncplans > 0); /* If we've yet to determine the valid subplans then do so now. */ if (node->as_valid_subplans == NULL) + { node->as_valid_subplans = ExecFindMatchingSubPlans(node->as_prune_state); - classify_matching_subplans(node); + classify_matching_subplans(node); + } + + /* Initialize state variables. */ + node->as_syncdone = bms_is_empty(node->as_valid_subplans); + node->as_nasyncremain = bms_num_members(node->as_valid_asyncplans); /* Nothing to do if there are no valid async subplans. */ if (node->as_nasyncremain == 0) @@ -1148,9 +1167,7 @@ classify_matching_subplans(AppendState *node) /* Adjust the valid subplans to contain sync subplans only. */ node->as_valid_subplans = bms_del_members(node->as_valid_subplans, valid_asyncplans); - node->as_syncdone = bms_is_empty(node->as_valid_subplans); /* Save valid async subplans. */ node->as_valid_asyncplans = valid_asyncplans; - node->as_nasyncremain = bms_num_members(valid_asyncplans); }