Skip to content

ESQL: Push down filter passed lookup join #118410

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 12 commits into from
Dec 13, 2024
5 changes: 5 additions & 0 deletions docs/changelog/118410.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 118410
summary: Push down filter passed lookup join
area: ES|QL
type: enhancement
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ public static Expression combineAnd(List<Expression> exps) {
*
* using the given combiner.
*
* While a bit longer, this method creates a balanced tree as oppose to a plain
* While a bit longer, this method creates a balanced tree as opposed to a plain
* recursive approach which creates an unbalanced one (either to the left or right).
*/
private static Expression combine(List<Expression> exps, BiFunction<Expression, Expression, Expression> combiner) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -418,3 +418,68 @@ count:long | type:keyword
3 | Success
1 | Disconnected
;

//
// Filtering tests
//

lookupWithFilterOnLeftSideField
required_capability: join_lookup_v5

FROM employees
| EVAL language_code = languages
| LOOKUP JOIN languages_lookup ON language_code
| SORT emp_no
| KEEP emp_no, language_code, language_name
| WHERE emp_no >= 10091 AND emp_no < 10094
;

emp_no:integer | language_code:integer | language_name:keyword
10091 | 3 | Spanish
10092 | 1 | English
10093 | 3 | Spanish
;

lookupMessageWithFilterOnRightSideField-Ignore
required_capability: join_lookup_v5

FROM sample_data
| LOOKUP JOIN message_types_lookup ON message
| WHERE type == "Error"
| KEEP @timestamp, client_ip, event_duration, message, type
| SORT @timestamp DESC
;

@timestamp:date | client_ip:ip | event_duration:long | message:keyword | type:keyword
2023-10-23T13:53:55.832Z | 172.21.3.15 | 5033755 | Connection error | Error
2023-10-23T13:52:55.015Z | 172.21.3.15 | 8268153 | Connection error | Error
2023-10-23T13:51:54.732Z | 172.21.3.15 | 725448 | Connection error | Error
;

lookupWithFieldAndRightSideAfterStats
required_capability: join_lookup_v5

FROM sample_data
| LOOKUP JOIN message_types_lookup ON message
| STATS count = count(message) BY type
| WHERE type == "Error"
;

count:long | type:keyword
3 | Error
;

lookupWithFieldOnJoinKey-Ignore
required_capability: join_lookup_v5

FROM employees
| EVAL language_code = languages
| LOOKUP JOIN languages_lookup ON language_code
| WHERE language_code > 1 AND language_name IS NOT NULL
| KEEP emp_no, language_code, language_name
;

emp_no:integer | language_code:integer | language_name:keyword
10001 | 2 | French
10003 | 4 | German
;
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import org.elasticsearch.xpack.esql.core.expression.Expressions;
import org.elasticsearch.xpack.esql.core.expression.ReferenceAttribute;
import org.elasticsearch.xpack.esql.core.expression.predicate.Predicates;
import org.elasticsearch.xpack.esql.core.util.CollectionUtils;
import org.elasticsearch.xpack.esql.plan.logical.Enrich;
import org.elasticsearch.xpack.esql.plan.logical.Eval;
import org.elasticsearch.xpack.esql.plan.logical.Filter;
Expand All @@ -23,6 +24,8 @@
import org.elasticsearch.xpack.esql.plan.logical.Project;
import org.elasticsearch.xpack.esql.plan.logical.RegexExtract;
import org.elasticsearch.xpack.esql.plan.logical.UnaryPlan;
import org.elasticsearch.xpack.esql.plan.logical.join.Join;
import org.elasticsearch.xpack.esql.plan.logical.join.JoinTypes;

import java.util.ArrayList;
import java.util.List;
Expand Down Expand Up @@ -76,11 +79,63 @@ protected LogicalPlan rule(Filter filter) {
} else if (child instanceof OrderBy orderBy) {
// swap the filter with its child
plan = orderBy.replaceChild(filter.with(orderBy.child(), condition));
} else if (child instanceof Join join) {
return pushDownPastJoin(filter, join);
}
// cannot push past a Limit, this could change the tailing result set returned
return plan;
}

private record ScopedFilter(List<Expression> commonFilters, List<Expression> leftFilters, List<Expression> rightFilters) {}

// split the filter condition in 3 parts:
// 1. filter scoped to the left
// 2. filter scoped to the right
// 3. filter that requires both sides to be evaluated
private static ScopedFilter scopeFilter(List<Expression> filters, LogicalPlan left, LogicalPlan right) {
List<Expression> rest = new ArrayList<>(filters);
List<Expression> leftFilters = new ArrayList<>();
List<Expression> rightFilters = new ArrayList<>();

AttributeSet leftOutput = left.outputSet();
AttributeSet rightOutput = right.outputSet();

// first remove things that are left scoped only
rest.removeIf(f -> f.references().subsetOf(leftOutput) && leftFilters.add(f));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Funky use of && :D

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

// followed by right scoped only
rest.removeIf(f -> f.references().subsetOf(rightOutput) && rightFilters.add(f));
return new ScopedFilter(rest, leftFilters, rightFilters);
}

private static LogicalPlan pushDownPastJoin(Filter filter, Join join) {
LogicalPlan plan = filter;
// pushdown only through LEFT joins
// TODO: generalize this for other join types
if (join.config().type() == JoinTypes.LEFT) {
LogicalPlan left = join.left();
LogicalPlan right = join.right();

// split the filter condition in 3 parts:
// 1. filter scoped to the left
// 2. filter scoped to the right
// 3. filter that requires both sides to be evaluated
Comment on lines +118 to +121
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: I don't think this duplicated comment here is necessary, since the method itself has the same javadoc.

ScopedFilter scoped = scopeFilter(Predicates.splitAnd(filter.condition()), left, right);
// push the left scoped filter down to the left child, keep the rest intact
if (scoped.leftFilters.size() > 0) {
// push the filter down to the left child
left = new Filter(left.source(), left, Predicates.combineAnd(scoped.leftFilters));
// update the join with the new left child
join = (Join) join.replaceLeft(left);

// keep the remaining filters in place, otherwise return the new join;
Expression remainingFilter = Predicates.combineAnd(CollectionUtils.combine(scoped.commonFilters, scoped.rightFilters));
plan = remainingFilter != null ? filter.with(join, remainingFilter) : join;
}
}
// ignore the rest of the join
return plan;
}

private static Function<Expression, Expression> NO_OP = expression -> expression;

private static LogicalPlan maybePushDownPastUnary(
Expand Down
Loading