Skip to content

Optimize IngestCtxMap construction #120833

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Jan 27, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/120833.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 120833
summary: Optimize `IngestCtxMap` construction
area: Ingest Node
type: enhancement
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -527,7 +527,7 @@ public void testAutoConvertNotString() throws Exception {
}
default -> throw new UnsupportedOperationException();
}
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), Map.of("field", randomValue));
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), new HashMap<>(Map.of("field", randomValue)));
Processor processor = new ConvertProcessor(randomAlphaOfLength(10), null, "field", "field", Type.AUTO, false);
processor.execute(ingestDocument);
Object convertedValue = ingestDocument.getFieldValue("field", Object.class);
Expand All @@ -536,7 +536,7 @@ public void testAutoConvertNotString() throws Exception {

public void testAutoConvertStringNotMatched() throws Exception {
String value = "notAnIntFloatOrBool";
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), Map.of("field", value));
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), new HashMap<>(Map.of("field", value)));
Processor processor = new ConvertProcessor(randomAlphaOfLength(10), null, "field", "field", Type.AUTO, false);
processor.execute(ingestDocument);
Object convertedValue = ingestDocument.getFieldValue("field", Object.class);
Expand All @@ -546,7 +546,7 @@ public void testAutoConvertStringNotMatched() throws Exception {
public void testAutoConvertMatchBoolean() throws Exception {
boolean randomBoolean = randomBoolean();
String booleanString = Boolean.toString(randomBoolean);
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), Map.of("field", booleanString));
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), new HashMap<>(Map.of("field", booleanString)));
Processor processor = new ConvertProcessor(randomAlphaOfLength(10), null, "field", "field", Type.AUTO, false);
processor.execute(ingestDocument);
Object convertedValue = ingestDocument.getFieldValue("field", Object.class);
Expand All @@ -556,7 +556,7 @@ public void testAutoConvertMatchBoolean() throws Exception {
public void testAutoConvertMatchInteger() throws Exception {
int randomInt = randomInt();
String randomString = Integer.toString(randomInt);
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), Map.of("field", randomString));
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), new HashMap<>(Map.of("field", randomString)));
Processor processor = new ConvertProcessor(randomAlphaOfLength(10), null, "field", "field", Type.AUTO, false);
processor.execute(ingestDocument);
Object convertedValue = ingestDocument.getFieldValue("field", Object.class);
Expand All @@ -566,7 +566,7 @@ public void testAutoConvertMatchInteger() throws Exception {
public void testAutoConvertMatchLong() throws Exception {
long randomLong = randomLong();
String randomString = Long.toString(randomLong);
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), Map.of("field", randomString));
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), new HashMap<>(Map.of("field", randomString)));
Processor processor = new ConvertProcessor(randomAlphaOfLength(10), null, "field", "field", Type.AUTO, false);
processor.execute(ingestDocument);
Object convertedValue = ingestDocument.getFieldValue("field", Object.class);
Expand All @@ -577,7 +577,7 @@ public void testAutoConvertDoubleNotMatched() throws Exception {
double randomDouble = randomDouble();
String randomString = Double.toString(randomDouble);
float randomFloat = Float.parseFloat(randomString);
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), Map.of("field", randomString));
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), new HashMap<>(Map.of("field", randomString)));
Processor processor = new ConvertProcessor(randomAlphaOfLength(10), null, "field", "field", Type.AUTO, false);
processor.execute(ingestDocument);
Object convertedValue = ingestDocument.getFieldValue("field", Object.class);
Expand All @@ -588,7 +588,7 @@ public void testAutoConvertDoubleNotMatched() throws Exception {
public void testAutoConvertMatchFloat() throws Exception {
float randomFloat = randomFloat();
String randomString = Float.toString(randomFloat);
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), Map.of("field", randomString));
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), new HashMap<>(Map.of("field", randomString)));
Processor processor = new ConvertProcessor(randomAlphaOfLength(10), null, "field", "field", Type.AUTO, false);
processor.execute(ingestDocument);
Object convertedValue = ingestDocument.getFieldValue("field", Object.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
public class DissectProcessorTests extends ESTestCase {

public void testMatch() {
IngestDocument ingestDocument = new IngestDocument("_index", "_id", 1, null, null, Map.of("message", "foo,bar,baz"));
IngestDocument ingestDocument = new IngestDocument("_index", "_id", 1, null, null, new HashMap<>(Map.of("message", "foo,bar,baz")));
DissectProcessor dissectProcessor = new DissectProcessor("", null, "message", "%{a},%{b},%{c}", "", true);
dissectProcessor.execute(ingestDocument);
assertThat(ingestDocument.getFieldValue("a", String.class), equalTo("foo"));
Expand All @@ -45,7 +45,7 @@ public void testMatchOverwrite() {
1,
null,
null,
Map.of("message", "foo,bar,baz", "a", "willgetstompped")
new HashMap<>(Map.of("message", "foo,bar,baz", "a", "willgetstompped"))
);
assertThat(ingestDocument.getFieldValue("a", String.class), equalTo("willgetstompped"));
DissectProcessor dissectProcessor = new DissectProcessor("", null, "message", "%{a},%{b},%{c}", "", true);
Expand All @@ -62,7 +62,7 @@ public void testAdvancedMatch() {
1,
null,
null,
Map.of("message", "foo bar,,,,,,,baz nope:notagain 😊 🐇 🙃")
new HashMap<>(Map.of("message", "foo bar,,,,,,,baz nope:notagain 😊 🐇 🙃"))
);
DissectProcessor dissectProcessor = new DissectProcessor(
"",
Expand All @@ -81,7 +81,7 @@ public void testAdvancedMatch() {
}

public void testMiss() {
IngestDocument ingestDocument = new IngestDocument("_index", "_id", 1, null, null, Map.of("message", "foo:bar,baz"));
IngestDocument ingestDocument = new IngestDocument("_index", "_id", 1, null, null, new HashMap<>(Map.of("message", "foo:bar,baz")));
DissectProcessor dissectProcessor = new DissectProcessor("", null, "message", "%{a},%{b},%{c}", "", true);
DissectException e = expectThrows(DissectException.class, () -> dissectProcessor.execute(ingestDocument));
assertThat(e.getMessage(), containsString("Unable to find match for dissect pattern"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public void testExecuteWithAsyncProcessor() throws Exception {
values.add("foo");
values.add("bar");
values.add("baz");
IngestDocument ingestDocument = new IngestDocument("_index", "_id", 1, null, null, Map.of("values", values));
IngestDocument ingestDocument = new IngestDocument("_index", "_id", 1, null, null, new HashMap<>(Map.of("values", values)));

ForEachProcessor processor = new ForEachProcessor("_tag", null, "values", new AsyncUpperCaseProcessor("_ingest._value"), false);
execProcessor(processor, ingestDocument, (result, e) -> {});
Expand All @@ -55,7 +55,14 @@ public void testExecuteWithAsyncProcessor() throws Exception {
}

public void testExecuteWithFailure() {
IngestDocument ingestDocument = new IngestDocument("_index", "_id", 1, null, null, Map.of("values", List.of("a", "b", "c")));
IngestDocument ingestDocument = new IngestDocument(
"_index",
"_id",
1,
null,
null,
new HashMap<>(Map.of("values", List.of("a", "b", "c")))
);

TestProcessor testProcessor = new TestProcessor(id -> {
if ("c".equals(id.getFieldValue("_ingest._value", String.class))) {
Expand Down Expand Up @@ -173,7 +180,7 @@ public String getDescription() {
int numValues = randomIntBetween(1, 10000);
List<String> values = IntStream.range(0, numValues).mapToObj(i -> "").toList();

IngestDocument ingestDocument = new IngestDocument("_index", "_id", 1, null, null, Map.of("values", values));
IngestDocument ingestDocument = new IngestDocument("_index", "_id", 1, null, null, new HashMap<>(Map.of("values", values)));

ForEachProcessor processor = new ForEachProcessor("_tag", null, "values", innerProcessor, false);
execProcessor(processor, ingestDocument, (result, e) -> {});
Expand All @@ -189,7 +196,7 @@ public void testModifyFieldsOutsideArray() {
values.add("string");
values.add(1);
values.add(null);
IngestDocument ingestDocument = new IngestDocument("_index", "_id", 1, null, null, Map.of("values", values));
IngestDocument ingestDocument = new IngestDocument("_index", "_id", 1, null, null, new HashMap<>(Map.of("values", values)));

TemplateScript.Factory template = new TestTemplateService.MockTemplateScript.Factory("errors");

Expand Down Expand Up @@ -282,7 +289,7 @@ public void testNestedForEachWithMapIteration() {
Map<String, Object> innerMap3 = Map.of("foo3", 7, "bar3", 8, "baz3", 9, "otherKey", 42);

Map<String, Object> outerMap = Map.of("foo", innerMap1, "bar", innerMap2, "baz", innerMap3);
IngestDocument ingestDocument = new IngestDocument("_index", "_id", 1, null, null, Map.of("field", outerMap));
IngestDocument ingestDocument = new IngestDocument("_index", "_id", 1, null, null, new HashMap<>(Map.of("field", outerMap)));

List<String> visitedKeys = new ArrayList<>();
List<Object> visitedValues = new ArrayList<>();
Expand Down Expand Up @@ -361,7 +368,7 @@ public void testRemovingFromTheSameField() {

public void testMapIteration() {
Map<String, Object> mapValue = Map.of("foo", 1, "bar", 2, "baz", 3);
IngestDocument ingestDocument = new IngestDocument("_index", "_id", 1, null, null, Map.of("field", mapValue));
IngestDocument ingestDocument = new IngestDocument("_index", "_id", 1, null, null, new HashMap<>(Map.of("field", mapValue)));

List<String> encounteredKeys = new ArrayList<>();
List<Object> encounteredValues = new ArrayList<>();
Expand Down Expand Up @@ -390,7 +397,7 @@ public void testMapIteration() {

public void testRemovalOfMapKey() {
Map<String, Object> mapValue = Map.of("foo", 1, "bar", 2, "baz", 3);
IngestDocument ingestDocument = new IngestDocument("_index", "_id", 1, null, null, Map.of("field", mapValue));
IngestDocument ingestDocument = new IngestDocument("_index", "_id", 1, null, null, new HashMap<>(Map.of("field", mapValue)));

List<String> encounteredKeys = new ArrayList<>();
List<Object> encounteredValues = new ArrayList<>();
Expand Down Expand Up @@ -419,7 +426,7 @@ public void testMapIterationWithAsyncProcessor() throws Exception {
Map<String, Object> innerMap3 = Map.of("foo3", 7, "bar3", 8, "baz3", 9, "otherKey", 42);

Map<String, Object> outerMap = Map.of("foo", innerMap1, "bar", innerMap2, "baz", innerMap3);
IngestDocument ingestDocument = new IngestDocument("_index", "_id", 1, null, null, Map.of("field", outerMap));
IngestDocument ingestDocument = new IngestDocument("_index", "_id", 1, null, null, new HashMap<>(Map.of("field", outerMap)));

List<String> visitedKeys = new ArrayList<>();
List<Object> visitedValues = new ArrayList<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ public void testCreateWithMissingField() throws Exception {
public void testCreateWithStrictParsingParameter() throws Exception {
String fieldName = randomAlphaOfLength(10);
String processorTag = randomAlphaOfLength(10);
IngestDocument document = new IngestDocument("_index", "_id", 1, null, null, Map.of(fieldName, "123 \"foo\""));
IngestDocument document = new IngestDocument("_index", "_id", 1, null, null, new HashMap<>(Map.of(fieldName, "123 \"foo\"")));

{
Map<String, Object> strictConfig = new HashMap<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,12 +170,9 @@ public void testDuplicateKeys() throws Exception {
String processorTag = randomAlphaOfLength(3);
JsonProcessor lenientJsonProcessor = new JsonProcessor(processorTag, null, "a", null, true, REPLACE, true);

Map<String, Object> document = new HashMap<>();
String json = "{\"a\": 1, \"a\": 2}";
document.put("a", json);
document.put("c", "see");
Map<String, Object> document = Map.of("a", "{\"a\": 1, \"a\": 2}", "c", "see");

IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), document);
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), new HashMap<>(document));
lenientJsonProcessor.execute(ingestDocument);

Map<String, Object> sourceAndMetadata = ingestDocument.getSourceAndMetadata();
Expand All @@ -185,7 +182,7 @@ public void testDuplicateKeys() throws Exception {
JsonProcessor strictJsonProcessor = new JsonProcessor(processorTag, null, "a", null, true, REPLACE, false);
Exception exception = expectThrows(
IllegalArgumentException.class,
() -> strictJsonProcessor.execute(RandomDocumentPicks.randomIngestDocument(random(), document))
() -> strictJsonProcessor.execute(RandomDocumentPicks.randomIngestDocument(random(), new HashMap<>(document)))
);
assertThat(exception.getMessage(), containsString("Duplicate field 'a'"));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public void test() throws Exception {
}

public void testRootTarget() throws Exception {
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), Map.of());
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), new HashMap<>());
ingestDocument.setFieldValue("myField", "first=hello&second=world&second=universe");
Processor processor = createKvProcessor("myField", "&", "=", null, null, null, false);
processor.execute(ingestDocument);
Expand All @@ -49,7 +49,7 @@ public void testRootTarget() throws Exception {
}

public void testKeySameAsSourceField() throws Exception {
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), Map.of());
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), new HashMap<>());
ingestDocument.setFieldValue("first", "first=hello");
Processor processor = createKvProcessor("first", "&", "=", null, null, null, false);
processor.execute(ingestDocument);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
Expand Down Expand Up @@ -266,7 +265,7 @@ public void testSortNullValue() throws Exception {
}

public void testDescendingSortWithTargetField() throws Exception {
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), Map.of());
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), new HashMap<>());
int numItems = randomIntBetween(1, 10);
List<String> fieldValue = new ArrayList<>(numItems);
List<String> expectedResult = new ArrayList<>(numItems);
Expand All @@ -286,7 +285,7 @@ public void testDescendingSortWithTargetField() throws Exception {
}

public void testAscendingSortWithTargetField() throws Exception {
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), Map.of());
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), new HashMap<>());
int numItems = randomIntBetween(1, 10);
List<String> fieldValue = new ArrayList<>(numItems);
List<String> expectedResult = new ArrayList<>(numItems);
Expand All @@ -306,7 +305,7 @@ public void testAscendingSortWithTargetField() throws Exception {
}

public void testSortWithTargetFieldLeavesOriginalUntouched() throws Exception {
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), Map.of());
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), new HashMap<>());
List<Integer> fieldValue = List.of(1, 5, 4);
List<Integer> expectedResult = new ArrayList<>(fieldValue);
Collections.sort(expectedResult);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import org.elasticsearch.ingest.ValueSource;
import org.elasticsearch.test.ESTestCase;

import java.util.HashMap;
import java.util.Map;

import static org.elasticsearch.ingest.RandomDocumentPicks.randomIngestDocument;
Expand Down Expand Up @@ -48,7 +49,7 @@ public void testTerminateInPipeline() throws Exception {
)
)
);
IngestDocument input = randomIngestDocument(random(), Map.of("foo", "bar"));
IngestDocument input = randomIngestDocument(random(), new HashMap<>(Map.of("foo", "bar")));
PipelineOutput output = new PipelineOutput();

pipeline.execute(input, output::set);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ public void test() throws Exception {
1L,
"routing",
VersionType.EXTERNAL,
Map.of("_field", "89.160.20.128")
new HashMap<>(Map.of("_field", "89.160.20.128"))
);
processor1.execute(document1);
assertThat(document1.getSourceAndMetadata().get("geoip"), notNullValue());
Expand All @@ -109,7 +109,7 @@ public void test() throws Exception {
1L,
"routing",
VersionType.EXTERNAL,
Map.of("_field", "89.160.20.128")
new HashMap<>(Map.of("_field", "89.160.20.128"))
);
processor2.execute(document2);
assertThat(document2.getSourceAndMetadata().get("geoip"), notNullValue());
Expand Down
Loading