Skip to content

Handle search timeout in SuggestPhase #122357

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 9 commits into from
Feb 13, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/changelog/122357.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 122357
summary: Handle search timeout in `SuggestPhase`
area: Search
type: bug
issues:
- 122186
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import org.apache.lucene.search.ConstantScoreScorer;
import org.apache.lucene.search.ConstantScoreWeight;
import org.apache.lucene.search.DocIdSetIterator;
import org.apache.lucene.search.Explanation;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.LeafCollector;
import org.apache.lucene.search.Query;
Expand All @@ -22,8 +23,10 @@
import org.apache.lucene.search.ScoreMode;
import org.apache.lucene.search.Scorer;
import org.apache.lucene.search.ScorerSupplier;
import org.apache.lucene.search.TopDocs;
import org.apache.lucene.search.Weight;
import org.apache.lucene.util.Bits;
import org.apache.lucene.util.CharsRefBuilder;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.TransportVersion;
import org.elasticsearch.action.search.SearchRequestBuilder;
Expand All @@ -33,12 +36,23 @@
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.index.query.AbstractQueryBuilder;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryRewriteContext;
import org.elasticsearch.index.query.SearchExecutionContext;
import org.elasticsearch.index.query.TermQueryBuilder;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.plugins.SearchPlugin;
import org.elasticsearch.search.aggregations.bucket.terms.StringTerms;
import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder;
import org.elasticsearch.search.internal.ContextIndexSearcher;
import org.elasticsearch.search.rescore.RescoreContext;
import org.elasticsearch.search.rescore.Rescorer;
import org.elasticsearch.search.rescore.RescorerBuilder;
import org.elasticsearch.search.suggest.SortBy;
import org.elasticsearch.search.suggest.SuggestBuilder;
import org.elasticsearch.search.suggest.Suggester;
import org.elasticsearch.search.suggest.SuggestionSearchContext;
import org.elasticsearch.search.suggest.term.TermSuggestion;
import org.elasticsearch.search.suggest.term.TermSuggestionBuilder;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.hamcrest.ElasticsearchAssertions;
import org.elasticsearch.xcontent.XContentBuilder;
Expand All @@ -58,7 +72,7 @@ public class SearchTimeoutIT extends ESIntegTestCase {

@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return Collections.singleton(BulkScorerTimeoutQueryPlugin.class);
return Collections.singleton(SearchTimeoutPlugin.class);
}

@Override
Expand All @@ -72,6 +86,9 @@ protected void setupSuiteScopeCluster() throws Exception {
indexRandom(true, "test", randomIntBetween(20, 50));
}

/**
* Test the scenario where the query times out before starting to collect documents, verify that partial hits are not returned
*/
public void testTopHitsTimeoutBeforeCollecting() {
// setting the timeout is necessary only because we check that if a TimeExceededException is thrown, a timeout was set
SearchRequestBuilder searchRequestBuilder = prepareSearch("test").setTimeout(new TimeValue(10, TimeUnit.SECONDS))
Expand All @@ -88,6 +105,9 @@ public void testTopHitsTimeoutBeforeCollecting() {
});
}

/**
* Test the scenario where the query times out while collecting documents, verify that partial hits results are returned
*/
public void testTopHitsTimeoutWhileCollecting() {
// setting the timeout is necessary only because we check that if a TimeExceededException is thrown, a timeout was set
SearchRequestBuilder searchRequestBuilder = prepareSearch("test").setTimeout(new TimeValue(10, TimeUnit.SECONDS))
Expand All @@ -103,6 +123,9 @@ public void testTopHitsTimeoutWhileCollecting() {
});
}

/**
* Test the scenario where the query times out before starting to collect documents, verify that partial aggs results are not returned
*/
public void testAggsTimeoutBeforeCollecting() {
SearchRequestBuilder searchRequestBuilder = prepareSearch("test").setSize(0)
// setting the timeout is necessary only because we check that if a TimeExceededException is thrown, a timeout was set
Expand All @@ -123,6 +146,9 @@ public void testAggsTimeoutBeforeCollecting() {
});
}

/**
* Test the scenario where the query times out while collecting documents, verify that partial aggs results are returned
*/
public void testAggsTimeoutWhileCollecting() {
SearchRequestBuilder searchRequestBuilder = prepareSearch("test").setSize(0)
// setting the timeout is necessary only because we check that if a TimeExceededException is thrown, a timeout was set
Expand All @@ -145,6 +171,56 @@ public void testAggsTimeoutWhileCollecting() {
});
}

/**
* Test the scenario where the suggest phase (part of the query phase) times out, yet there are results
* available coming from executing the query and aggs on each shard.
*/
public void testSuggestTimeoutWithPartialResults() {
SuggestBuilder suggestBuilder = new SuggestBuilder();
suggestBuilder.setGlobalText("text");
TimeoutSuggestionBuilder timeoutSuggestionBuilder = new TimeoutSuggestionBuilder();
suggestBuilder.addSuggestion("suggest", timeoutSuggestionBuilder);
SearchRequestBuilder searchRequestBuilder = prepareSearch("test").suggest(suggestBuilder)
.addAggregation(new TermsAggregationBuilder("terms").field("field.keyword"));
ElasticsearchAssertions.assertResponse(searchRequestBuilder, searchResponse -> {
assertThat(searchResponse.isTimedOut(), equalTo(true));
assertEquals(0, searchResponse.getShardFailures().length);
assertEquals(0, searchResponse.getFailedShards());
assertThat(searchResponse.getSuccessfulShards(), greaterThan(0));
assertEquals(searchResponse.getSuccessfulShards(), searchResponse.getTotalShards());
assertThat(searchResponse.getHits().getTotalHits().value(), greaterThan(0L));
assertThat(searchResponse.getHits().getHits().length, greaterThan(0));
StringTerms terms = searchResponse.getAggregations().get("terms");
assertEquals(1, terms.getBuckets().size());
StringTerms.Bucket bucket = terms.getBuckets().get(0);
assertEquals("value", bucket.getKeyAsString());
assertThat(bucket.getDocCount(), greaterThan(0L));
});
}

/**
* Test the scenario where the rescore phase (part of the query phase) times out, yet there are results
* available coming from executing the query and aggs on each shard.
*/
public void testRescoreTimeoutWithPartialResults() {
SearchRequestBuilder searchRequestBuilder = prepareSearch("test").setRescorer(new TimeoutRescorerBuilder())
.addAggregation(new TermsAggregationBuilder("terms").field("field.keyword"));
ElasticsearchAssertions.assertResponse(searchRequestBuilder, searchResponse -> {
assertThat(searchResponse.isTimedOut(), equalTo(true));
assertEquals(0, searchResponse.getShardFailures().length);
assertEquals(0, searchResponse.getFailedShards());
assertThat(searchResponse.getSuccessfulShards(), greaterThan(0));
assertEquals(searchResponse.getSuccessfulShards(), searchResponse.getTotalShards());
assertThat(searchResponse.getHits().getTotalHits().value(), greaterThan(0L));
assertThat(searchResponse.getHits().getHits().length, greaterThan(0));
StringTerms terms = searchResponse.getAggregations().get("terms");
assertEquals(1, terms.getBuckets().size());
StringTerms.Bucket bucket = terms.getBuckets().get(0);
assertEquals("value", bucket.getKeyAsString());
assertThat(bucket.getDocCount(), greaterThan(0L));
});
}

public void testPartialResultsIntolerantTimeoutBeforeCollecting() {
ElasticsearchException ex = expectThrows(
ElasticsearchException.class,
Expand All @@ -171,13 +247,67 @@ public void testPartialResultsIntolerantTimeoutWhileCollecting() {
assertEquals(429, ex.status().getStatus());
}

public static final class BulkScorerTimeoutQueryPlugin extends Plugin implements SearchPlugin {
public void testPartialResultsIntolerantTimeoutWhileSuggestingOnly() {
SuggestBuilder suggestBuilder = new SuggestBuilder();
suggestBuilder.setGlobalText("text");
TimeoutSuggestionBuilder timeoutSuggestionBuilder = new TimeoutSuggestionBuilder();
suggestBuilder.addSuggestion("suggest", timeoutSuggestionBuilder);
ElasticsearchException ex = expectThrows(
ElasticsearchException.class,
prepareSearch("test").suggest(suggestBuilder).setAllowPartialSearchResults(false) // this line causes timeouts to report
// failures
);
assertTrue(ex.toString().contains("Time exceeded"));
assertEquals(429, ex.status().getStatus());
}

public void testPartialResultsIntolerantTimeoutWhileSuggesting() {
SuggestBuilder suggestBuilder = new SuggestBuilder();
suggestBuilder.setGlobalText("text");
TimeoutSuggestionBuilder timeoutSuggestionBuilder = new TimeoutSuggestionBuilder();
suggestBuilder.addSuggestion("suggest", timeoutSuggestionBuilder);
ElasticsearchException ex = expectThrows(
ElasticsearchException.class,
prepareSearch("test").setQuery(new TermQueryBuilder("field", "value"))
.suggest(suggestBuilder)
.setAllowPartialSearchResults(false) // this line causes timeouts to report failures
);
assertTrue(ex.toString().contains("Time exceeded"));
assertEquals(429, ex.status().getStatus());
}

public void testPartialResultsIntolerantTimeoutWhileRescoring() {
ElasticsearchException ex = expectThrows(
ElasticsearchException.class,
prepareSearch("test").setQuery(new TermQueryBuilder("field", "value"))
.setRescorer(new TimeoutRescorerBuilder())
.setAllowPartialSearchResults(false) // this line causes timeouts to report failures
);
assertTrue(ex.toString().contains("Time exceeded"));
assertEquals(429, ex.status().getStatus());
}

public static final class SearchTimeoutPlugin extends Plugin implements SearchPlugin {
@Override
public List<QuerySpec<?>> getQueries() {
return Collections.singletonList(new QuerySpec<QueryBuilder>("timeout", BulkScorerTimeoutQuery::new, parser -> {
throw new UnsupportedOperationException();
}));
}

@Override
public List<SuggesterSpec<?>> getSuggesters() {
return Collections.singletonList(new SuggesterSpec<>("timeout", TimeoutSuggestionBuilder::new, parser -> {
throw new UnsupportedOperationException();
}, TermSuggestion::new));
}

@Override
public List<RescorerSpec<?>> getRescorers() {
return Collections.singletonList(new RescorerSpec<>("timeout", TimeoutRescorerBuilder::new, parser -> {
throw new UnsupportedOperationException();
}));
}
}

/**
Expand Down Expand Up @@ -315,4 +445,111 @@ public TransportVersion getMinimalSupportedVersion() {
return null;
}
}

/**
* Suggestion builder that triggers a timeout as part of its execution
*/
private static final class TimeoutSuggestionBuilder extends TermSuggestionBuilder {
TimeoutSuggestionBuilder() {
super("field");
}

TimeoutSuggestionBuilder(StreamInput in) throws IOException {
super(in);
}

@Override
public String getWriteableName() {
return "timeout";
}

@Override
public SuggestionSearchContext.SuggestionContext build(SearchExecutionContext context) {
return new TimeoutSuggestionContext(new TimeoutSuggester((ContextIndexSearcher) context.searcher()), context);
}
}

private static final class TimeoutSuggester extends Suggester<TimeoutSuggestionContext> {
private final ContextIndexSearcher contextIndexSearcher;

TimeoutSuggester(ContextIndexSearcher contextIndexSearcher) {
this.contextIndexSearcher = contextIndexSearcher;
}

@Override
protected TermSuggestion innerExecute(
String name,
TimeoutSuggestionContext suggestion,
IndexSearcher searcher,
CharsRefBuilder spare
) {
contextIndexSearcher.throwTimeExceededException();
assert false;
return new TermSuggestion(name, suggestion.getSize(), SortBy.SCORE);
}

@Override
protected TermSuggestion emptySuggestion(String name, TimeoutSuggestionContext suggestion, CharsRefBuilder spare) {
return new TermSuggestion(name, suggestion.getSize(), SortBy.SCORE);
}
}

private static final class TimeoutSuggestionContext extends SuggestionSearchContext.SuggestionContext {
TimeoutSuggestionContext(Suggester<?> suggester, SearchExecutionContext searchExecutionContext) {
super(suggester, searchExecutionContext);
}
}

private static final class TimeoutRescorerBuilder extends RescorerBuilder<TimeoutRescorerBuilder> {
TimeoutRescorerBuilder() {
super();
}

TimeoutRescorerBuilder(StreamInput in) throws IOException {
super(in);
}

@Override
protected void doWriteTo(StreamOutput out) {}

@Override
protected void doXContent(XContentBuilder builder, Params params) {}

@Override
protected RescoreContext innerBuildContext(int windowSize, SearchExecutionContext context) throws IOException {
return new RescoreContext(10, new Rescorer() {
@Override
public TopDocs rescore(TopDocs topDocs, IndexSearcher searcher, RescoreContext rescoreContext) {
((ContextIndexSearcher) context.searcher()).throwTimeExceededException();
assert false;
return null;
}

@Override
public Explanation explain(
int topLevelDocId,
IndexSearcher searcher,
RescoreContext rescoreContext,
Explanation sourceExplanation
) {
throw new UnsupportedOperationException();
}
});
}

@Override
public String getWriteableName() {
return "timeout";
}

@Override
public TransportVersion getMinimalSupportedVersion() {
return null;
}

@Override
public RescorerBuilder<TimeoutRescorerBuilder> rewrite(QueryRewriteContext ctx) {
return this;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import org.elasticsearch.common.lucene.search.function.ScoreFunction;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.settings.Settings.Builder;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.index.query.Operator;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
Expand Down Expand Up @@ -994,22 +993,6 @@ public void testRescoreAfterCollapseRandom() throws Exception {
});
}

public void testRescoreWithTimeout() throws Exception {
// no dummy docs since merges can change scores while we run queries.
int numDocs = indexRandomNumbers("whitespace", -1, false);

String intToEnglish = English.intToEnglish(between(0, numDocs - 1));
String query = intToEnglish.split(" ")[0];
assertResponse(
prepareSearch().setSearchType(SearchType.QUERY_THEN_FETCH)
.setQuery(QueryBuilders.matchQuery("field1", query).operator(Operator.OR))
.setSize(10)
.addRescorer(new QueryRescorerBuilder(functionScoreQuery(new TestTimedScoreFunctionBuilder())).windowSize(100))
.setTimeout(TimeValue.timeValueMillis(10)),
r -> assertTrue(r.isTimedOut())
);
}

@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return List.of(TestTimedQueryPlugin.class);
Expand Down
Loading