Skip to content

ESQL: CATEGORIZE as a BlockHash #114317

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 85 commits into from
Nov 27, 2024
Merged

ESQL: CATEGORIZE as a BlockHash #114317

merged 85 commits into from
Nov 27, 2024

Conversation

nik9000
Copy link
Member

@nik9000 nik9000 commented Oct 8, 2024

Re-implement CATEGORIZE in a way that works for multi-node clusters.

This requires that data is first categorized on each data node in a first pass, then the categorizers from each data node are merged on the coordinator node and previously categorized rows are re-categorized.

BlockHashes, used in HashAggregations, already work in a very similar way. E.g. for queries like ... | STATS ... BY field1, field2 they map values for field1 and field2 to unique integer ids that are then passed to the actual aggregate functions to identify which "bucket" a row belongs to. When passed from the data nodes to the coordinator, the BlockHashes are also merged to obtain unique ids for every value in field1, field2 that is seen on the coordinator (not only on the local data nodes).

Therefore, we re-implement CATEGORIZE as a special BlockHash.

To choose the correct BlockHash when a query plan is mapped to physical operations, the AggregateExec query plan node needs to know that we will be categorizing the field message in a query containing ... | STATS ... BY c = CATEGORIZE(message). For this reason, we do not extract the expression c = CATEGORIZE(message) into an EVAL node, in contrast to e.g. STATS ... BY b = BUCKET(field, 10). The expression c = CATEGORIZE(message) simply remains inside the AggregateExec's groupings.

Important limitation: For now, to use CATEGORIZE in a STATS command, there can be only 1 grouping (the CATEGORIZE) overall.

@elasticsearchmachine elasticsearchmachine added v9.0.0 needs:triage Requires assignment of a team area label labels Oct 8, 2024
@elasticsearchmachine elasticsearchmachine added Team:Analytics Meta label for analytical engine team (ESQL/Aggs/Geo) and removed needs:triage Requires assignment of a team area label labels Oct 8, 2024
@elasticsearchmachine
Copy link
Collaborator

Pinging @elastic/es-analytical-engine (Team:Analytics)

@elasticsearchmachine
Copy link
Collaborator

Hi @nik9000, I've created a changelog YAML for you.

Copy link
Contributor

@alex-spies alex-spies left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Heya, let's get this green and into main. The only work before merging is, IMO, getting the mutes/capabilities right and having correct expectations for the csv test with nulls - even if that means it needs muting, see below.

There is some immediate follow-up work that needs to be done, but that can be in subsequent PRs. I'm summarizing it here because the many comments are hard to navigate.

Remaining csv test cases:

  • STATS a = c, b = c BY c = CATEGORIZE(message)
  • from test | STATS MV_COUNT(cat), COUNT(*) BY cat = CATEGORIZE(first_name)
  • | stats mv_count(categorize(message)) by categorize(message)

Correct hashing of multivalues (+ test against regressions), see #114317 (comment).

Block hash tests:

FoldNull: check if this change is necessary and add a comment as to why if that's still the case #114317 (comment)

Ideally:

Comment on lines 168 to 175
FROM sample_data
| EVAL x = null
| STATS COUNT() BY category=CATEGORIZE(x)
| SORT category
;

COUNT():long | category:keyword
;
Copy link
Contributor

@alex-spies alex-spies Nov 27, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's mark this with -Ignore but let's put the correct expectation here - and in the test below it.

* Base BlockHash implementation for {@code Categorize} grouping function.
*/
public abstract class AbstractCategorizeBlockHash extends BlockHash {
// TODO: this should probably also take an emitBatchSize
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@nik9000 Some info on this?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TLDR: It's probably not important for the single-element BlockHash implementations like this.

So emitBatchSize is a request to call AddInput#add every emitBatchSize entries. It's designed to prevent building a huge page of ordinals when processing STATS BY a, b and a and b are no single valued - especially if they are both multivaued. There it's the contract to put the row into an ordering for all combinations of a and b values. Since that can explode into a huge number of rows, we batch it.

This is much much less important for single element BlockHash implementations. They don't change the number of output rows. That's true even for CATEGORIZE. And if the incoming page already "wasn't too big" then the page of ordinals passed to the aggs can't be that big either.

When I first built this I thought I might apply this to single valued BlockHash implementations as well. It'd be consistent. It's lame to ignore this request. But it isn't important so I never got to it.

Comment on lines +83 to +88
out.writeVInt(categorizer.getCategoryCount());
for (SerializableTokenListCategory category : categorizer.toCategoriesById()) {
category.writeTo(out);
}
// We're returning a block with N positions just because the Page must have all blocks with the same position count!
return blockFactory.newConstantBytesRefBlockWith(out.bytes().toBytesRef(), categorizer.getCategoryCount());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we really need to wirite the vInt and the Pages positions hack? Can't we just write a position per category? To be more like ESQL

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure what you exactly mean. The number of categories is not equal to the number of inputs texts, meaning you still have a mismatch in number of positions.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We're building here the intermediate state to pass to the CategorizeIntermediateHashBlock, with 1 row/position per category. So I imagine we can do this in 2 ways:

The current one:

Serialize into a BytesRef with:
- int (# of categories)
- every category

And send this BytesRef in a block with N "simulated" rows

Instead, do:

Write a block with a category (Serialized in a BytesRef) per position/row.
So we don't simulate anything, and we could even consume just 2 categories later and discard the rest (Maybe? Just a _funny_ possibility, not sure if it's possible)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My memory was that the state was one blob of bytes and not a blob per category. There's, like, shared state. But it's been a month since I thought a lot about this. And I'm wrong about lots of stuff.

) {
if (groups.stream().anyMatch(GroupSpec::isCategorize)) {
if (groups.size() != 1) {
throw new IllegalArgumentException("only a single CATEGORIZE group can used");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Typo. Maybe also something like:

Suggested change
throw new IllegalArgumentException("only a single CATEGORIZE group can used");
throw new IllegalArgumentException("if a CATEGORIZE group is present, no other groups are allowed");

Comment on lines 101 to 104
int end = first + count;
for (int i = first; i < end; i++) {
result.appendInt(process(vBlock.getBytesRef(i, vScratch)));
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indeed, it's broken. We didn't see it because all of our tests with multivalues use COUNT(), which just increments 1 and doesn't use any other field 💀

Fixing now and added extra tests and functions

List<GroupingAggregator.Factory> aggregators,
int maxPageSize
) implements OperatorFactory {
@Override
public Operator get(DriverContext driverContext) {
return new HashAggregationOperator(
aggregators,
() -> BlockHash.build(groups, driverContext.blockFactory(), maxPageSize, false),
() -> BlockHash.build(groups, aggregatorMode, driverContext.blockFactory(), maxPageSize, false),
Copy link
Contributor

@ivancea ivancea Nov 27, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should probably make this change in the follow-up!

Copy link
Contributor

@Copilot Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copilot reviewed 20 out of 35 changed files in this pull request and generated no suggestions.

Files not reviewed (15)
  • docs/reference/esql/functions/kibana/definition/categorize.json: Language not supported
  • docs/reference/esql/functions/types/categorize.asciidoc: Language not supported
  • x-pack/plugin/esql/qa/testFixtures/src/main/resources/mapping-mv_sample_data.json: Language not supported
  • x-pack/plugin/esql/qa/testFixtures/src/main/resources/mv_sample_data.csv: Language not supported
  • muted-tests.yml: Evaluated as low risk
  • x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/FoldNull.java: Evaluated as low risk
  • x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/CombineProjections.java: Evaluated as low risk
  • x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/grouping/Categorize.java: Evaluated as low risk
  • x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/HashAggregationOperator.java: Evaluated as low risk
  • x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/GroupingAggregatorFunctionTestCase.java: Evaluated as low risk
  • x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/blockhash/BlockHashTestCase.java: Evaluated as low risk
  • x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/CsvTestsDataLoader.java: Evaluated as low risk
  • x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/HashAggregationOperatorTests.java: Evaluated as low risk
  • x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/blockhash/BlockHashTests.java: Evaluated as low risk
  • x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java: Evaluated as low risk

@ivancea ivancea merged commit 9022ccc into elastic:main Nov 27, 2024
16 checks passed
@elasticsearchmachine
Copy link
Collaborator

💔 Backport failed

Status Branch Result
8.x Commit could not be cherrypicked due to conflicts

You can use sqren/backport to manually backport by running backport --upstream elastic/elasticsearch --pr 114317

alexey-ivanov-es pushed a commit to alexey-ivanov-es/elasticsearch that referenced this pull request Nov 28, 2024
…lastic#117367)

Set/Collection#add() is supposed to return `true` if the collection changed (If it actually added something).
In this case, it must return if the old value is null.

Extracted from elastic#114317 (Where it's being used)
alexey-ivanov-es pushed a commit to alexey-ivanov-es/elasticsearch that referenced this pull request Nov 28, 2024
Re-implement `CATEGORIZE` in a way that works for multi-node clusters.

This requires that data is first categorized on each data node in a first pass, then the categorizers from each data node are merged on the coordinator node and previously categorized rows are re-categorized.

BlockHashes, used in HashAggregations, already work in a very similar way. E.g. for queries like `... | STATS ... BY field1, field2` they map values for `field1` and `field2` to unique integer ids that are then passed to the actual aggregate functions to identify which "bucket" a row belongs to. When passed from the data nodes to the coordinator, the BlockHashes are also merged to obtain unique ids for every value in `field1, field2` that is seen on the coordinator (not only on the local data nodes).

Therefore, we re-implement `CATEGORIZE` as a special BlockHash.

To choose the correct BlockHash when a query plan is mapped to physical operations, the `AggregateExec` query plan node needs to know that we will be categorizing the field `message` in a query containing `... | STATS ... BY c = CATEGORIZE(message)`. For this reason, _we do not extract the expression_ `c = CATEGORIZE(message)` into an `EVAL` node, in contrast to e.g. `STATS ... BY b = BUCKET(field, 10)`. The expression `c = CATEGORIZE(message)` simply remains inside the `AggregateExec`'s groupings.

**Important limitation:** For now, to use `CATEGORIZE` in a `STATS` command, there can be only 1 grouping (the `CATEGORIZE`) overall.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
:Analytics/ES|QL AKA ESQL auto-backport Automatically create backport pull requests when merged >enhancement Team:Analytics Meta label for analytical engine team (ESQL/Aggs/Geo) v8.18.0 v9.0.0
Projects
None yet
Development

Successfully merging this pull request may close these issues.

7 participants