Skip to content

ESQL: Limit memory usage of fold #118602

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 30 commits into from
Jan 13, 2025
Merged
Show file tree
Hide file tree
Changes from 19 commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
8fa64b1
WIP
nik9000 Dec 6, 2024
8ccd678
Merge branch 'main' into fold_ctx_2
nik9000 Dec 10, 2024
a126e84
WIP:
nik9000 Dec 10, 2024
bc661ce
Merge branch 'main' into fold_ctx_2
nik9000 Dec 12, 2024
28ac84f
Fixup
nik9000 Dec 12, 2024
facd394
actual tests
nik9000 Dec 12, 2024
17d2fc4
Update docs/changelog/118602.yaml
nik9000 Dec 12, 2024
faf0e5e
Merge branch 'main' into fold_ctx_2
nik9000 Dec 13, 2024
dd9eeba
Compile again
nik9000 Dec 13, 2024
f46447e
Merge remote-tracking branch 'nik9000/fold_ctx_2' into fold_ctx_2
nik9000 Dec 13, 2024
739e9bf
Merge branch 'main' into fold_ctx_2
nik9000 Dec 27, 2024
6660b88
BigArrays too
nik9000 Dec 27, 2024
b9ea47f
Merge branch 'main' into fold_ctx_2
nik9000 Jan 3, 2025
2e24625
Update x-pack/plugin/esql-core/src/main/java/org/elasticsearch/xpack/…
nik9000 Jan 9, 2025
5a7fe2e
Merge branch 'main' into fold_ctx_2
nik9000 Jan 9, 2025
2d063c8
Merge
nik9000 Jan 9, 2025
115477a
Merge remote-tracking branch 'nik9000/fold_ctx_2' into fold_ctx_2
nik9000 Jan 9, 2025
bfbc7eb
Update foldctx
nik9000 Jan 9, 2025
f3fd791
Review
nik9000 Jan 9, 2025
fa12629
Merge branch 'main' into fold_ctx_2
nik9000 Jan 10, 2025
d790556
Make FoldContext have equality
nik9000 Jan 10, 2025
2abbeef
Helper
nik9000 Jan 10, 2025
39440d8
Catch my bug
nik9000 Jan 10, 2025
0d5afb1
Merge branch 'main' into fold_ctx_2
nik9000 Jan 10, 2025
b20f541
Contextualizification
nik9000 Jan 10, 2025
456833d
5%
nik9000 Jan 13, 2025
573a238
Fix hash
nik9000 Jan 13, 2025
5742c23
Moar tests
nik9000 Jan 13, 2025
b43b8d8
Merge branch 'main' into fold_ctx_2
nik9000 Jan 13, 2025
0651b78
Update heap attack now
nik9000 Jan 13, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.xpack.esql.core.expression.Expression;
import org.elasticsearch.xpack.esql.core.expression.FieldAttribute;
import org.elasticsearch.xpack.esql.core.expression.FoldContext;
import org.elasticsearch.xpack.esql.core.expression.Literal;
import org.elasticsearch.xpack.esql.core.expression.predicate.regex.RLikePattern;
import org.elasticsearch.xpack.esql.core.tree.Source;
Expand Down Expand Up @@ -71,12 +72,11 @@ public class EvalBenchmark {
BigArrays.NON_RECYCLING_INSTANCE
);

private static final FoldContext FOLD_CONTEXT = FoldContext.unbounded();

private static final int BLOCK_LENGTH = 8 * 1024;

static final DriverContext driverContext = new DriverContext(
BigArrays.NON_RECYCLING_INSTANCE,
BlockFactory.getInstance(new NoopCircuitBreaker("noop"), BigArrays.NON_RECYCLING_INSTANCE)
);
static final DriverContext driverContext = new DriverContext(BigArrays.NON_RECYCLING_INSTANCE, blockFactory);

static {
// Smoke test all the expected values and force loading subclasses more like prod
Expand Down Expand Up @@ -114,18 +114,20 @@ private static EvalOperator.ExpressionEvaluator evaluator(String operation) {
return switch (operation) {
case "abs" -> {
FieldAttribute longField = longField();
yield EvalMapper.toEvaluator(new Abs(Source.EMPTY, longField), layout(longField)).get(driverContext);
yield EvalMapper.toEvaluator(FOLD_CONTEXT, new Abs(Source.EMPTY, longField), layout(longField)).get(driverContext);
}
case "add" -> {
FieldAttribute longField = longField();
yield EvalMapper.toEvaluator(
FOLD_CONTEXT,
new Add(Source.EMPTY, longField, new Literal(Source.EMPTY, 1L, DataType.LONG)),
layout(longField)
).get(driverContext);
}
case "add_double" -> {
FieldAttribute doubleField = doubleField();
yield EvalMapper.toEvaluator(
FOLD_CONTEXT,
new Add(Source.EMPTY, doubleField, new Literal(Source.EMPTY, 1D, DataType.DOUBLE)),
layout(doubleField)
).get(driverContext);
Expand All @@ -140,7 +142,8 @@ private static EvalOperator.ExpressionEvaluator evaluator(String operation) {
lhs = new Add(Source.EMPTY, lhs, new Literal(Source.EMPTY, 1L, DataType.LONG));
rhs = new Add(Source.EMPTY, rhs, new Literal(Source.EMPTY, 1L, DataType.LONG));
}
yield EvalMapper.toEvaluator(new Case(Source.EMPTY, condition, List.of(lhs, rhs)), layout(f1, f2)).get(driverContext);
yield EvalMapper.toEvaluator(FOLD_CONTEXT, new Case(Source.EMPTY, condition, List.of(lhs, rhs)), layout(f1, f2))
.get(driverContext);
}
case "date_trunc" -> {
FieldAttribute timestamp = new FieldAttribute(
Expand All @@ -149,35 +152,37 @@ private static EvalOperator.ExpressionEvaluator evaluator(String operation) {
new EsField("timestamp", DataType.DATETIME, Map.of(), true)
);
yield EvalMapper.toEvaluator(
FOLD_CONTEXT,
new DateTrunc(Source.EMPTY, new Literal(Source.EMPTY, Duration.ofHours(24), DataType.TIME_DURATION), timestamp),
layout(timestamp)
).get(driverContext);
}
case "equal_to_const" -> {
FieldAttribute longField = longField();
yield EvalMapper.toEvaluator(
FOLD_CONTEXT,
new Equals(Source.EMPTY, longField, new Literal(Source.EMPTY, 100_000L, DataType.LONG)),
layout(longField)
).get(driverContext);
}
case "long_equal_to_long" -> {
FieldAttribute lhs = longField();
FieldAttribute rhs = longField();
yield EvalMapper.toEvaluator(new Equals(Source.EMPTY, lhs, rhs), layout(lhs, rhs)).get(driverContext);
yield EvalMapper.toEvaluator(FOLD_CONTEXT, new Equals(Source.EMPTY, lhs, rhs), layout(lhs, rhs)).get(driverContext);
}
case "long_equal_to_int" -> {
FieldAttribute lhs = longField();
FieldAttribute rhs = intField();
yield EvalMapper.toEvaluator(new Equals(Source.EMPTY, lhs, rhs), layout(lhs, rhs)).get(driverContext);
yield EvalMapper.toEvaluator(FOLD_CONTEXT, new Equals(Source.EMPTY, lhs, rhs), layout(lhs, rhs)).get(driverContext);
}
case "mv_min", "mv_min_ascending" -> {
FieldAttribute longField = longField();
yield EvalMapper.toEvaluator(new MvMin(Source.EMPTY, longField), layout(longField)).get(driverContext);
yield EvalMapper.toEvaluator(FOLD_CONTEXT, new MvMin(Source.EMPTY, longField), layout(longField)).get(driverContext);
}
case "rlike" -> {
FieldAttribute keywordField = keywordField();
RLike rlike = new RLike(Source.EMPTY, keywordField, new RLikePattern(".ar"));
yield EvalMapper.toEvaluator(rlike, layout(keywordField)).get(driverContext);
yield EvalMapper.toEvaluator(FOLD_CONTEXT, rlike, layout(keywordField)).get(driverContext);
}
default -> throw new UnsupportedOperationException();
};
Expand Down
5 changes: 5 additions & 0 deletions docs/changelog/118602.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 118602
summary: Limit memory usage of `fold`
area: ES|QL
type: bug
issues: []

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 8 additions & 1 deletion docs/reference/esql/functions/kibana/docs/match_operator.md

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -1727,7 +1727,7 @@ public static <T extends Enum<T>> Setting<T> enumSetting(
*
* @param key the key for the setting
* @param defaultValue the default value for this setting
* @param properties properties properties for this setting like scope, filtering...
* @param properties properties for this setting like scope, filtering...
* @return the setting object
*/
public static Setting<ByteSizeValue> memorySizeSetting(String key, ByteSizeValue defaultValue, Property... properties) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,16 @@ private void assertCircuitBreaks(ThrowingRunnable r) throws IOException {
);
}

private void assertFoldCircuitBreaks(ThrowingRunnable r) throws IOException {
ResponseException e = expectThrows(ResponseException.class, r);
Map<?, ?> map = responseAsMap(e.getResponse());
logger.info("expected fold circuit breaking {}", map);
assertMap(
map,
matchesMap().entry("status", 400).entry("error", matchesMap().extraOk().entry("type", "ql_fold_too_much_memory_exception"))
);
}

private void assertParseFailure(ThrowingRunnable r) throws IOException {
ResponseException e = expectThrows(ResponseException.class, r);
Map<?, ?> map = responseAsMap(e.getResponse());
Expand Down Expand Up @@ -325,11 +335,23 @@ public void testManyConcatFromRow() throws IOException {
assertManyStrings(resp, strings);
}

/**
* Hits a circuit breaker by building many moderately long strings.
*/
public void testHugeManyConcatFromRow() throws IOException {
assertFoldCircuitBreaks(
() -> manyConcat(
"ROW a=9999999999999, b=99999999999999999, c=99999999999999999, d=99999999999999999, e=99999999999999999",
5000
)
);
}

/**
* Fails to parse a huge huge query.
*/
public void testHugeHugeManyConcatFromRow() throws IOException {
assertParseFailure(() -> manyConcat("ROW a=9999, b=9999, c=9999, d=9999, e=9999", 50000));
assertParseFailure(() -> manyConcat("ROW a=9999, b=9999, c=9999, d=9999, e=9999", 6000));
}

/**
Expand Down Expand Up @@ -387,13 +409,20 @@ public void testHugeManyRepeat() throws IOException {
* Returns many moderately long strings.
*/
public void testManyRepeatFromRow() throws IOException {
int strings = 10000;
int strings = 600;
Response resp = manyRepeat("ROW a = 99", strings);
assertManyStrings(resp, strings);
}

/**
* Fails to parse a huge huge query.
* Hits a circuit breaker by building many moderately long strings.
*/
public void testHugeManyRepeatFromRow() throws IOException {
assertFoldCircuitBreaks(() -> manyRepeat("ROW a = 99", 1000));
}

/**
* Fails to parse a huge, huge query.
*/
public void testHugeHugeManyRepeatFromRow() throws IOException {
assertParseFailure(() -> manyRepeat("ROW a = 99", 100000));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,12 +78,20 @@ public Expression(Source source, List<Expression> children) {
super(source, children);
}

// whether the expression can be evaluated statically (folded) or not
/**
* Whether the expression can be evaluated statically, aka "folded", or not.
*/
public boolean foldable() {
return false;
}

public Object fold() {
/**
* Evaluate this expression statically to a constant. It is an error to call
* this if {@link #foldable} returns false.
*/
public Object fold(FoldContext ctx) {
// TODO After removing FoldContext.unbounded from non-test code examine all calls
// for places we should use instanceof Literal instead
throw new QlIllegalArgumentException("Should not fold expression");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,10 +107,10 @@ public static boolean foldable(List<? extends Expression> exps) {
return true;
}

public static List<Object> fold(List<? extends Expression> exps) {
public static List<Object> fold(FoldContext ctx, List<? extends Expression> exps) {
List<Object> folded = new ArrayList<>(exps.size());
for (Expression exp : exps) {
folded.add(exp.fold());
folded.add(exp.fold(ctx));
}

return folded;
Expand All @@ -135,7 +135,7 @@ public static String name(Expression e) {
/**
* Is this {@linkplain Expression} <strong>guaranteed</strong> to have
* only the {@code null} value. {@linkplain Expression}s that
* {@link Expression#fold()} to {@code null} <strong>may</strong>
* {@link Expression#fold} to {@code null} <strong>may</strong>
* return {@code false} here, but should <strong>eventually</strong> be folded
* into a {@link Literal} containing {@code null} which will return
* {@code true} from here.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.xpack.esql.core.expression;

import org.elasticsearch.common.breaker.CircuitBreaker;
import org.elasticsearch.common.breaker.CircuitBreakingException;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.core.Releasable;
import org.elasticsearch.xpack.esql.core.QlClientException;
import org.elasticsearch.xpack.esql.core.tree.Source;

/**
* Context passed to {@link Expression#fold}. This is not thread safe.
*/
public class FoldContext {
/**
* {@link Expression#fold} using any amount of memory. Only safe for tests.
*/
public static FoldContext unbounded() {
return new FoldContext(Long.MAX_VALUE);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we are still using it in a few places, wouldn't it make sense to default it to the same 5% of the memory as the default in the query pragmas (or a slightly higher value), rather than making it really unbounded? It's a bit paranoid maybe, I guess the final goal here is safety

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm hopeful we can remove the usages in a follow-up. Let's get this in an have a conversation about whether or not we should replace unbounded with small.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did we want to give this the 5% limit before merging this?

}

private final long initialAllowedBytes;
private long allowedBytes;

public FoldContext(long allowedBytes) {
this.initialAllowedBytes = allowedBytes;
this.allowedBytes = allowedBytes;
}

/**
* Track an allocation. Best to call this <strong>before</strong> allocating
* if possible, but after is ok if the allocation is small.
* <p>
* Note that, unlike {@link CircuitBreaker}, you don't <strong>have</strong>
* to free this allocation later. This is important because the query plan
* doesn't implement {@link Releasable} so it <strong>can't</strong> free
* consistently. But when you have to allocate big chunks of memory during
* folding and know that you are returning the memory it is kindest to
* call this with a negative number, effectively giving those bytes back.
* </p>
*/
public void trackAllocation(Source source, long bytes) {
allowedBytes -= bytes;
assert allowedBytes <= initialAllowedBytes : "returned more bytes than it used";
if (allowedBytes < 0) {
throw new FoldTooMuchMemoryException(source, bytes, initialAllowedBytes);
}
}

/**
* Adapt this into a {@link CircuitBreaker} suitable for building bounded local
* DriverContext. This is absolutely an abuse of the {@link CircuitBreaker} contract
* and only methods used by BlockFactory are implemented. And this'll throw a
* {@link FoldTooMuchMemoryException} instead of the standard {@link CircuitBreakingException}.
* This works for the common folding implementation though.
*/
public CircuitBreaker circuitBreakerView(Source source) {
return new CircuitBreaker() {
@Override
public void circuitBreak(String fieldName, long bytesNeeded) {
throw new UnsupportedOperationException();
}

@Override
public void addEstimateBytesAndMaybeBreak(long bytes, String label) throws CircuitBreakingException {
trackAllocation(source, bytes);
}

@Override
public void addWithoutBreaking(long bytes) {
assert bytes <= 0 : "we only expect this to be used for deallocation";
allowedBytes -= bytes;
assert allowedBytes <= initialAllowedBytes : "returned more bytes than it used";
}

@Override
public long getUsed() {
/*
* This isn't expected to be used by we can implement it so we may as
* well. Maybe it'll be useful for debugging one day.
*/
return initialAllowedBytes - allowedBytes;
}

@Override
public long getLimit() {
/*
* This isn't expected to be used by we can implement it so we may as
* well. Maybe it'll be useful for debugging one day.
*/
return initialAllowedBytes;
}

@Override
public double getOverhead() {
return 1.0;
}

@Override
public long getTrippedCount() {
return 0;
}

@Override
public String getName() {
return REQUEST;
}

@Override
public Durability getDurability() {
throw new UnsupportedOperationException();
}

@Override
public void setLimitAndOverhead(long limit, double overhead) {
throw new UnsupportedOperationException();
}
};
}

public static class FoldTooMuchMemoryException extends QlClientException {
protected FoldTooMuchMemoryException(Source source, long bytesForExpression, long initialAllowedBytes) {
super(
"line {}:{}: Folding query used more than {}. The expression that pushed past the limit is [{}] which needed {}.",
source.source().getLineNumber(),
source.source().getColumnNumber(),
ByteSizeValue.ofBytes(initialAllowedBytes),
source.text(),
ByteSizeValue.ofBytes(bytesForExpression)
);
}
}
}
Loading