Skip to content

ESQL: CATEGORIZE as a BlockHash #114317

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 85 commits into from
Nov 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
85 commits
Select commit Hold shift + click to select a range
a4647fc
WIP
nik9000 Oct 4, 2024
9cb4425
Merge branch 'main' into esql_blockh2
nik9000 Oct 8, 2024
95f767b
Stay plz:
nik9000 Oct 8, 2024
e4b9e4b
Update docs/changelog/114317.yaml
nik9000 Oct 8, 2024
f2d1806
Make things compile
alex-spies Oct 17, 2024
a309133
Move new block hashes to typical location
alex-spies Oct 22, 2024
e50d5b9
Add almost passing test
alex-spies Oct 22, 2024
5674046
Close stuff (makes unit test pass)
jan-elastic Oct 24, 2024
31e9e20
undo making BytesRefBlockHash public
jan-elastic Oct 24, 2024
82cc74a
Move Categorize BlockHash tests to separate file
jan-elastic Oct 28, 2024
a234326
Unit test for CategorizedIntermediateBlockHash.
jan-elastic Oct 28, 2024
3f94143
Unit test with Driver
jan-elastic Oct 31, 2024
239d159
Add aggregator to unit test
jan-elastic Nov 4, 2024
260b572
Passing unit test with driver and aggregators
jan-elastic Nov 6, 2024
975d6ef
Improve code
jan-elastic Nov 6, 2024
f44774d
Fix ES compile errors
jan-elastic Nov 6, 2024
f7264f2
Update Categorize function/evaluator (works end2end in discover)
jan-elastic Nov 7, 2024
5b1cfc1
fix CategorizeTests
jan-elastic Nov 8, 2024
f05a3c2
Fix
jan-elastic Nov 8, 2024
fe3d536
Misc fixes to make CSV test work
jan-elastic Nov 8, 2024
1b7705b
Release blocks in CategorizedIntermediateBlockHash
alex-spies Nov 8, 2024
c3c0c68
multivalued fields
nik9000 Nov 12, 2024
f207321
ROW MV
nik9000 Nov 12, 2024
790770b
More cases
nik9000 Nov 12, 2024
3f65d30
unicode
nik9000 Nov 12, 2024
28d0012
More cases
nik9000 Nov 12, 2024
1af1c68
Drop all columns
nik9000 Nov 12, 2024
2b00cc2
Decode category
jan-elastic Nov 11, 2024
cc9f84a
Add test case with expression inside CATEGORIZE
alex-spies Nov 12, 2024
8c54acd
Disallow evaluating CATEGORIZE outside aggs
alex-spies Nov 12, 2024
6040f6b
WIP: wire in CATEGORIZE without new datatype
alex-spies Nov 12, 2024
8c96df9
Wire in CATEGORIZE without needing new datatype
alex-spies Nov 12, 2024
4485b0e
ON
nik9000 Nov 13, 2024
6bf3b31
Move BlockHash creation to Grouping function
ivancea Nov 18, 2024
64ca567
Remove CATEGORY DataType and ElementType
ivancea Nov 18, 2024
b5ad72a
Format all
ivancea Nov 18, 2024
46ac649
Removed unused code and added new test using CATEGORY() return value …
ivancea Nov 19, 2024
e311014
Remove ToBlockHash interface, and simplify logic for Categorize
ivancea Nov 19, 2024
5cd145d
Merge branch 'main' into esql_blockh2
ivancea Nov 19, 2024
3da877d
Remove missing CATEGORY enum constant from Csv tests
ivancea Nov 19, 2024
1bad621
Keep alias on ReplaceAggNested Categorize grouping
ivancea Nov 19, 2024
51dc274
Do not null/propagate fold Categorize
ivancea Nov 20, 2024
d421281
Fixed Categorize layout using source field, and added failing tests w…
ivancea Nov 20, 2024
bf466f7
Added failing alias CSV tests for Categorize
ivancea Nov 21, 2024
b5b387c
Make CombineProjections work with expressions in the grouping, like C…
ivancea Nov 21, 2024
3b75c65
Added Categorize tests for FoldNull and nested aggs rules
ivancea Nov 21, 2024
006768d
Format with spotlessApply
ivancea Nov 21, 2024
30a25b9
Fix benchmark compilation
ivancea Nov 21, 2024
147023d
Fix AttributeSet add() return value
ivancea Nov 21, 2024
8cb0e26
Fix/Remove NOCOMMITs
ivancea Nov 21, 2024
043f99d
Fix Javadoc HTML p tag not allowing nested blocks
ivancea Nov 22, 2024
9dd8973
Fixed tests compilation and added more csv tests for other commands
ivancea Nov 22, 2024
df39b40
Remove remaining NOCOMMITs
ivancea Nov 22, 2024
cbffe6d
Format
ivancea Nov 22, 2024
483cbfe
Updated Categorize tests to not fail on evaluator creation
ivancea Nov 22, 2024
0aeb50c
Merge branch 'main' into esql_blockh2
ivancea Nov 22, 2024
6ad0804
Fixed union_types CSV tests, failing after new dataset was addition
ivancea Nov 25, 2024
32fb924
Merge branch 'main' into esql_blockh2
ivancea Nov 25, 2024
5ad52de
Mute eval const failing tests until issue is fixed
ivancea Nov 26, 2024
27da936
Restored pre-DataType-change pieces of code (Extractors, BlockHash Ag…
ivancea Nov 26, 2024
0372701
Added javadocs and some tests with mixed cases (nulls, multivalue...)
ivancea Nov 26, 2024
c6cda24
Some docs and refactors on CategorizedIntermediateHashBlock
ivancea Nov 26, 2024
f833ba0
Make Categorize non-foldable
ivancea Nov 26, 2024
ddacdc3
Format
ivancea Nov 26, 2024
9ed6262
ReplaceNestedAgg ruie cleanup
ivancea Nov 26, 2024
ea13baf
Merge branch 'main' into esql_blockh2
ivancea Nov 26, 2024
dc8ecfd
Added failing tests for empty strings throwing NPE
ivancea Nov 26, 2024
c1c70fc
Rename sample_data_mv
ivancea Nov 26, 2024
aecacde
Ignore failing empty string tests
ivancea Nov 26, 2024
7c7b604
Undo union-types tests changes
ivancea Nov 26, 2024
9dc6ad1
Format
ivancea Nov 26, 2024
e0b84f0
Fixed failing test after indices change
ivancea Nov 26, 2024
9d1bfd3
Merge branch 'main' into esql_blockh2
ivancea Nov 26, 2024
6cf6ec1
Unmute Categorize tests
ivancea Nov 26, 2024
f677b47
Added a bunch of other tests from PR suggestions
ivancea Nov 26, 2024
f37d2bd
Removed semantic text index, which is failing in cross cluster
ivancea Nov 26, 2024
d10709b
Improved optimizer tests on categorize
ivancea Nov 27, 2024
b622e6c
Unmute last categorize test
ivancea Nov 27, 2024
30412ad
Rename capability to CATEGORIZE_V2
ivancea Nov 27, 2024
74aeae3
Ignore and fix null tests
ivancea Nov 27, 2024
3771532
Fixed multivalues on Categorize and added tests using other fields
ivancea Nov 27, 2024
599b02b
Use checked circuit breaker and fix CategorizeBlockHashTests
ivancea Nov 27, 2024
3763f74
Restore Aggregate computeReferences()
ivancea Nov 27, 2024
d2325aa
Remove AggregatorMode parameter from BlockHash.build(), and format all
ivancea Nov 27, 2024
43a335f
Merge branch 'main' into esql_blockh2
ivancea Nov 27, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/114317.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 114317
summary: "ESQL: CATEGORIZE as a `BlockHash`"
area: ES|QL
type: enhancement
issues: []

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions docs/reference/esql/functions/types/categorize.asciidoc

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

18 changes: 0 additions & 18 deletions muted-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,6 @@ tests:
- class: org.elasticsearch.xpack.transform.integration.TransformIT
method: testStopWaitForCheckpoint
issue: https://github.com/elastic/elasticsearch/issues/106113
- class: org.elasticsearch.xpack.esql.qa.mixed.MixedClusterEsqlSpecIT
method: test {categorize.Categorize SYNC}
issue: https://github.com/elastic/elasticsearch/issues/113722
- class: org.elasticsearch.kibana.KibanaThreadPoolIT
method: testBlockedThreadPoolsRejectUserRequests
issue: https://github.com/elastic/elasticsearch/issues/113939
Expand Down Expand Up @@ -126,12 +123,6 @@ tests:
- class: org.elasticsearch.xpack.ml.integration.DatafeedJobsRestIT
method: testLookbackWithIndicesOptions
issue: https://github.com/elastic/elasticsearch/issues/116127
- class: org.elasticsearch.xpack.esql.qa.multi_node.EsqlSpecIT
method: test {categorize.Categorize SYNC}
issue: https://github.com/elastic/elasticsearch/issues/113054
- class: org.elasticsearch.xpack.esql.qa.multi_node.EsqlSpecIT
method: test {categorize.Categorize ASYNC}
issue: https://github.com/elastic/elasticsearch/issues/113055
- class: org.elasticsearch.xpack.test.rest.XPackRestIT
method: test {p0=transform/transforms_start_stop/Test start already started transform}
issue: https://github.com/elastic/elasticsearch/issues/98802
Expand All @@ -153,9 +144,6 @@ tests:
- class: org.elasticsearch.xpack.shutdown.NodeShutdownIT
method: testAllocationPreventedForRemoval
issue: https://github.com/elastic/elasticsearch/issues/116363
- class: org.elasticsearch.xpack.esql.qa.mixed.MixedClusterEsqlSpecIT
method: test {categorize.Categorize ASYNC}
issue: https://github.com/elastic/elasticsearch/issues/116373
- class: org.elasticsearch.threadpool.SimpleThreadPoolIT
method: testThreadPoolMetrics
issue: https://github.com/elastic/elasticsearch/issues/108320
Expand All @@ -168,9 +156,6 @@ tests:
- class: org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshotsCanMatchOnCoordinatorIntegTests
method: testSearchableSnapshotShardsAreSkippedBySearchRequestWithoutQueryingAnyNodeWhenTheyAreOutsideOfTheQueryRange
issue: https://github.com/elastic/elasticsearch/issues/116523
- class: org.elasticsearch.xpack.esql.ccq.MultiClusterSpecIT
method: test {categorize.Categorize}
issue: https://github.com/elastic/elasticsearch/issues/116434
- class: org.elasticsearch.upgrades.SearchStatesIT
method: testBWCSearchStates
issue: https://github.com/elastic/elasticsearch/issues/116617
Expand Down Expand Up @@ -229,9 +214,6 @@ tests:
- class: org.elasticsearch.xpack.test.rest.XPackRestIT
method: test {p0=transform/transforms_reset/Test reset running transform}
issue: https://github.com/elastic/elasticsearch/issues/117473
- class: org.elasticsearch.xpack.esql.qa.single_node.FieldExtractorIT
method: testConstantKeywordField
issue: https://github.com/elastic/elasticsearch/issues/117524
- class: org.elasticsearch.xpack.esql.qa.multi_node.FieldExtractorIT
method: testConstantKeywordField
issue: https://github.com/elastic/elasticsearch/issues/117524
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.compute.aggregation.blockhash;

import org.apache.lucene.util.BytesRefBuilder;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.BitArray;
import org.elasticsearch.common.util.BytesRefHash;
import org.elasticsearch.compute.data.Block;
import org.elasticsearch.compute.data.BlockFactory;
import org.elasticsearch.compute.data.BytesRefVector;
import org.elasticsearch.compute.data.IntBlock;
import org.elasticsearch.compute.data.IntVector;
import org.elasticsearch.compute.data.Page;
import org.elasticsearch.core.ReleasableIterator;
import org.elasticsearch.xpack.ml.aggs.categorization.CategorizationBytesRefHash;
import org.elasticsearch.xpack.ml.aggs.categorization.CategorizationPartOfSpeechDictionary;
import org.elasticsearch.xpack.ml.aggs.categorization.SerializableTokenListCategory;
import org.elasticsearch.xpack.ml.aggs.categorization.TokenListCategorizer;

import java.io.IOException;

/**
* Base BlockHash implementation for {@code Categorize} grouping function.
*/
public abstract class AbstractCategorizeBlockHash extends BlockHash {
// TODO: this should probably also take an emitBatchSize
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@nik9000 Some info on this?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TLDR: It's probably not important for the single-element BlockHash implementations like this.

So emitBatchSize is a request to call AddInput#add every emitBatchSize entries. It's designed to prevent building a huge page of ordinals when processing STATS BY a, b and a and b are no single valued - especially if they are both multivaued. There it's the contract to put the row into an ordering for all combinations of a and b values. Since that can explode into a huge number of rows, we batch it.

This is much much less important for single element BlockHash implementations. They don't change the number of output rows. That's true even for CATEGORIZE. And if the incoming page already "wasn't too big" then the page of ordinals passed to the aggs can't be that big either.

When I first built this I thought I might apply this to single valued BlockHash implementations as well. It'd be consistent. It's lame to ignore this request. But it isn't important so I never got to it.

private final int channel;
private final boolean outputPartial;
protected final TokenListCategorizer.CloseableTokenListCategorizer categorizer;

AbstractCategorizeBlockHash(BlockFactory blockFactory, int channel, boolean outputPartial) {
super(blockFactory);
this.channel = channel;
this.outputPartial = outputPartial;
this.categorizer = new TokenListCategorizer.CloseableTokenListCategorizer(
new CategorizationBytesRefHash(new BytesRefHash(2048, blockFactory.bigArrays())),
CategorizationPartOfSpeechDictionary.getInstance(),
0.70f
);
}

protected int channel() {
return channel;
}

@Override
public Block[] getKeys() {
return new Block[] { outputPartial ? buildIntermediateBlock() : buildFinalBlock() };
}

@Override
public IntVector nonEmpty() {
return IntVector.range(0, categorizer.getCategoryCount(), blockFactory);
}

@Override
public BitArray seenGroupIds(BigArrays bigArrays) {
throw new UnsupportedOperationException();
}

@Override
public final ReleasableIterator<IntBlock> lookup(Page page, ByteSizeValue targetBlockSize) {
throw new UnsupportedOperationException();
Comment on lines +66 to +71
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: let's give these some useful error messages if we ever end up here - and maybe an explanation as a comment why it's fine that we do not support these.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@nik9000 You may have more context about this. Is this something we didn't "want" to implement yet, or something that can't be done with this HashBlock?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is powered hash joins. Originally when I built the infrastructure for hash joins I thought any BlockHash could power the joins via this lookup. But we've been spending a lot less time on hash join lately. Maybe we'll get back to it soon? Anyway, as it is, only PackedValuesBlockHash actually implements lookup. I think eventually we will make the lookup method work for the other BlockHash implementations.

Except maybe the CATEGORIZE one. I don't know how it'd work or if it'd make sense. I think this method is confusing and it'd be nice to remove it from BlockHash for now and fix the compilation errors.

}

/**
* Serializes the intermediate state into a single BytesRef block, or an empty Null block if there are no categories.
*/
private Block buildIntermediateBlock() {
if (categorizer.getCategoryCount() == 0) {
return blockFactory.newConstantNullBlock(0);
}
try (BytesStreamOutput out = new BytesStreamOutput()) {
// TODO be more careful here.
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I totally hacked this thing together. I haven't a clue if it's properly correct. I mean, it works, but it's worth another set of eyes on it.

Copy link
Contributor

@jan-elastic jan-elastic Nov 13, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wasn't sure what you meant with "be more careful here", but it lgtm

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we worried about memory usage with this? These are untracked bytes. We're tracking the memory usage on the agg itself, but not the serialization. But if it's super small we don't have to worry.

out.writeVInt(categorizer.getCategoryCount());
for (SerializableTokenListCategory category : categorizer.toCategoriesById()) {
category.writeTo(out);
}
// We're returning a block with N positions just because the Page must have all blocks with the same position count!
return blockFactory.newConstantBytesRefBlockWith(out.bytes().toBytesRef(), categorizer.getCategoryCount());
Comment on lines +83 to +88
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we really need to wirite the vInt and the Pages positions hack? Can't we just write a position per category? To be more like ESQL

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure what you exactly mean. The number of categories is not equal to the number of inputs texts, meaning you still have a mismatch in number of positions.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We're building here the intermediate state to pass to the CategorizeIntermediateHashBlock, with 1 row/position per category. So I imagine we can do this in 2 ways:

The current one:

Serialize into a BytesRef with:
- int (# of categories)
- every category

And send this BytesRef in a block with N "simulated" rows

Instead, do:

Write a block with a category (Serialized in a BytesRef) per position/row.
So we don't simulate anything, and we could even consume just 2 categories later and discard the rest (Maybe? Just a _funny_ possibility, not sure if it's possible)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My memory was that the state was one blob of bytes and not a blob per category. There's, like, shared state. But it's been a month since I thought a lot about this. And I'm wrong about lots of stuff.

} catch (IOException e) {
throw new RuntimeException(e);
}
}

private Block buildFinalBlock() {
try (BytesRefVector.Builder result = blockFactory.newBytesRefVectorBuilder(categorizer.getCategoryCount())) {
BytesRefBuilder scratch = new BytesRefBuilder();
for (SerializableTokenListCategory category : categorizer.toCategoriesById()) {
scratch.copyChars(category.getRegex());
result.appendBytesRef(scratch.get());
scratch.clear();
}
return result.build().asBlock();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import org.elasticsearch.common.util.Int3Hash;
import org.elasticsearch.common.util.LongHash;
import org.elasticsearch.common.util.LongLongHash;
import org.elasticsearch.compute.aggregation.AggregatorMode;
import org.elasticsearch.compute.aggregation.GroupingAggregatorFunction;
import org.elasticsearch.compute.aggregation.SeenGroupIds;
import org.elasticsearch.compute.data.Block;
Expand Down Expand Up @@ -58,9 +59,7 @@
* leave a big gap, even if we never see {@code null}.
* </p>
*/
public abstract sealed class BlockHash implements Releasable, SeenGroupIds //
permits BooleanBlockHash, BytesRefBlockHash, DoubleBlockHash, IntBlockHash, LongBlockHash, BytesRef2BlockHash, BytesRef3BlockHash, //
NullBlockHash, PackedValuesBlockHash, BytesRefLongBlockHash, LongLongBlockHash, TimeSeriesBlockHash {
public abstract class BlockHash implements Releasable, SeenGroupIds {

protected final BlockFactory blockFactory;

Expand Down Expand Up @@ -107,7 +106,15 @@ public abstract sealed class BlockHash implements Releasable, SeenGroupIds //
@Override
public abstract BitArray seenGroupIds(BigArrays bigArrays);

public record GroupSpec(int channel, ElementType elementType) {}
/**
* @param isCategorize Whether this group is a CATEGORIZE() or not.
* May be changed in the future when more stateful grouping functions are added.
*/
public record GroupSpec(int channel, ElementType elementType, boolean isCategorize) {
public GroupSpec(int channel, ElementType elementType) {
this(channel, elementType, false);
}
}

/**
* Creates a specialized hash table that maps one or more {@link Block}s to ids.
Expand Down Expand Up @@ -159,6 +166,19 @@ public static BlockHash buildPackedValuesBlockHash(List<GroupSpec> groups, Block
return new PackedValuesBlockHash(groups, blockFactory, emitBatchSize);
}

/**
* Builds a BlockHash for the Categorize grouping function.
*/
public static BlockHash buildCategorizeBlockHash(List<GroupSpec> groups, AggregatorMode aggregatorMode, BlockFactory blockFactory) {
if (groups.size() != 1) {
throw new IllegalArgumentException("only a single CATEGORIZE group can used");
}

return aggregatorMode.isInputPartial()
? new CategorizedIntermediateBlockHash(groups.get(0).channel, blockFactory, aggregatorMode.isOutputPartial())
: new CategorizeRawBlockHash(groups.get(0).channel, blockFactory, aggregatorMode.isOutputPartial());
}

/**
* Creates a specialized hash table that maps a {@link Block} of the given input element type to ids.
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.compute.aggregation.blockhash;

import org.apache.lucene.analysis.core.WhitespaceTokenizer;
import org.apache.lucene.util.BytesRef;
import org.elasticsearch.compute.aggregation.GroupingAggregatorFunction;
import org.elasticsearch.compute.data.Block;
import org.elasticsearch.compute.data.BlockFactory;
import org.elasticsearch.compute.data.BytesRefBlock;
import org.elasticsearch.compute.data.BytesRefVector;
import org.elasticsearch.compute.data.IntBlock;
import org.elasticsearch.compute.data.IntVector;
import org.elasticsearch.compute.data.Page;
import org.elasticsearch.core.Releasable;
import org.elasticsearch.core.Releasables;
import org.elasticsearch.index.analysis.CharFilterFactory;
import org.elasticsearch.index.analysis.CustomAnalyzer;
import org.elasticsearch.index.analysis.TokenFilterFactory;
import org.elasticsearch.index.analysis.TokenizerFactory;
import org.elasticsearch.xpack.ml.aggs.categorization.TokenListCategorizer;
import org.elasticsearch.xpack.ml.job.categorization.CategorizationAnalyzer;

/**
* BlockHash implementation for {@code Categorize} grouping function.
* <p>
* This implementation expects rows, and can't deserialize intermediate states coming from other nodes.
* </p>
*/
public class CategorizeRawBlockHash extends AbstractCategorizeBlockHash {
private final CategorizeEvaluator evaluator;

CategorizeRawBlockHash(int channel, BlockFactory blockFactory, boolean outputPartial) {
super(blockFactory, channel, outputPartial);
CategorizationAnalyzer analyzer = new CategorizationAnalyzer(
// TODO: should be the same analyzer as used in Production
new CustomAnalyzer(
TokenizerFactory.newFactory("whitespace", WhitespaceTokenizer::new),
new CharFilterFactory[0],
new TokenFilterFactory[0]
),
true
);
this.evaluator = new CategorizeEvaluator(analyzer, categorizer, blockFactory);
}

@Override
public void add(Page page, GroupingAggregatorFunction.AddInput addInput) {
try (IntBlock result = (IntBlock) evaluator.eval(page.getBlock(channel()))) {
addInput.add(0, result);
}
}

@Override
public void close() {
evaluator.close();
}

/**
* Similar implementation to an Evaluator.
*/
public static final class CategorizeEvaluator implements Releasable {
private final CategorizationAnalyzer analyzer;

private final TokenListCategorizer.CloseableTokenListCategorizer categorizer;

private final BlockFactory blockFactory;

public CategorizeEvaluator(
CategorizationAnalyzer analyzer,
TokenListCategorizer.CloseableTokenListCategorizer categorizer,
BlockFactory blockFactory
) {
this.analyzer = analyzer;
this.categorizer = categorizer;
this.blockFactory = blockFactory;
}

public Block eval(BytesRefBlock vBlock) {
BytesRefVector vVector = vBlock.asVector();
if (vVector == null) {
return eval(vBlock.getPositionCount(), vBlock);
}
IntVector vector = eval(vBlock.getPositionCount(), vVector);
return vector.asBlock();
}

public IntBlock eval(int positionCount, BytesRefBlock vBlock) {
try (IntBlock.Builder result = blockFactory.newIntBlockBuilder(positionCount)) {
BytesRef vScratch = new BytesRef();
for (int p = 0; p < positionCount; p++) {
if (vBlock.isNull(p)) {
result.appendNull();
continue;
}
int first = vBlock.getFirstValueIndex(p);
int count = vBlock.getValueCount(p);
if (count == 1) {
result.appendInt(process(vBlock.getBytesRef(first, vScratch)));
continue;
}
int end = first + count;
result.beginPositionEntry();
for (int i = first; i < end; i++) {
result.appendInt(process(vBlock.getBytesRef(i, vScratch)));
}
result.endPositionEntry();
}
return result.build();
}
}

public IntVector eval(int positionCount, BytesRefVector vVector) {
try (IntVector.FixedBuilder result = blockFactory.newIntVectorFixedBuilder(positionCount)) {
BytesRef vScratch = new BytesRef();
for (int p = 0; p < positionCount; p++) {
result.appendInt(p, process(vVector.getBytesRef(p, vScratch)));
}
return result.build();
}
}

private int process(BytesRef v) {
return categorizer.computeCategory(v.utf8ToString(), analyzer).getId();
}

@Override
public void close() {
Releasables.closeExpectNoException(analyzer, categorizer);
}
}
}
Loading