Skip to content

Commit 061fec8

Browse files
authored
Merge pull request #233 from mashhurs/esql-support
ES|QL support: - introduces query_type params, accepts dsl or esql option. - adds ES|QL executor to execute ESQL query and parse/map response to event validations - make sure LS (8.17.4+) supports ES|QL (new elasticsearch-ruby client) - make sure connected ES is greater than 8.11+ - query isn't empty or meaningful that starts with command syntax - if query_type is esql, make sure we accept meaningful inputs and do not allow response_type, index, etc.. DSL related params - informing if query isn't using METADATA which adds _id, _version to the response entries - informing ineffective params such as size, search_api, target if users configure ES|QL results field names in a dotted format. The plugin reproduces nested (example {a.b.c: 'val'} => {'a':{'b':{'c':'val'}}})
2 parents d9bf375 + e108c87 commit 061fec8

File tree

8 files changed

+812
-41
lines changed

8 files changed

+812
-41
lines changed

CHANGELOG.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,6 @@
1+
## 5.2.0
2+
- ES|QL support [#233](https://github.com/logstash-plugins/logstash-input-elasticsearch/pull/233)
3+
14
## 5.1.0
25
- Add "cursor"-like index tracking [#205](https://github.com/logstash-plugins/logstash-input-elasticsearch/pull/205)
36

docs/index.asciidoc

Lines changed: 122 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -230,6 +230,110 @@ The next scheduled run:
230230
* uses {ref}/point-in-time-api.html#point-in-time-api[Point in time (PIT)] + {ref}/paginate-search-results.html#search-after[Search after] to paginate through all the data, and
231231
* updates the value of the field at the end of the pagination.
232232

233+
[id="plugins-{type}s-{plugin}-esql"]
234+
==== {esql} support
235+
236+
.Technical Preview
237+
****
238+
The {esql} feature that allows using ES|QL queries with this plugin is in Technical Preview.
239+
Configuration options and implementation details are subject to change in minor releases without being preceded by deprecation warnings.
240+
****
241+
242+
{es} Query Language ({esql}) provides a SQL-like interface for querying your {es} data.
243+
244+
To use {esql}, this plugin needs to be installed in {ls} 8.17.4 or newer, and must be connected to {es} 8.11 or newer.
245+
246+
To configure {esql} query in the plugin, set the `query_type` to `esql` and provide your {esql} query in the `query` parameter.
247+
248+
IMPORTANT: {esql} is evolving and may still have limitations with regard to result size or supported field types. We recommend understanding https://www.elastic.co/guide/en/elasticsearch/reference/current/esql-limitations.html[ES|QL current limitations] before using it in production environments.
249+
250+
The following is a basic scheduled {esql} query that runs hourly:
251+
[source, ruby]
252+
input {
253+
elasticsearch {
254+
id => hourly_cron_job
255+
hosts => [ 'https://..']
256+
api_key => '....'
257+
query_type => 'esql'
258+
query => '
259+
FROM food-index
260+
| WHERE spicy_level = "hot" AND @timestamp > NOW() - 1 hour
261+
| LIMIT 500
262+
'
263+
schedule => '0 * * * *' # every hour at min 0
264+
}
265+
}
266+
267+
Set `config.support_escapes: true` in `logstash.yml` if you need to escape special chars in the query.
268+
269+
NOTE: With {esql} query, {ls} doesn't generate `event.original`.
270+
271+
[id="plugins-{type}s-{plugin}-esql-event-mapping"]
272+
===== Mapping {esql} result to {ls} event
273+
{esql} returns query results in a structured tabular format, where data is organized into _columns_ (fields) and _values_ (entries).
274+
The plugin maps each value entry to an event, populating corresponding fields.
275+
For example, a query might produce a table like:
276+
277+
[cols="2,1,1,1,2",options="header"]
278+
|===
279+
|`timestamp` |`user_id` | `action` | `status.code` | `status.desc`
280+
281+
|2025-04-10T12:00:00 |123 |login |200 | Success
282+
|2025-04-10T12:05:00 |456 |purchase |403 | Forbidden (unauthorized user)
283+
|===
284+
285+
For this case, the plugin emits two events look like
286+
[source, json]
287+
[
288+
{
289+
"timestamp": "2025-04-10T12:00:00",
290+
"user_id": 123,
291+
"action": "login",
292+
"status": {
293+
"code": 200,
294+
"desc": "Success"
295+
}
296+
},
297+
{
298+
"timestamp": "2025-04-10T12:05:00",
299+
"user_id": 456,
300+
"action": "purchase",
301+
"status": {
302+
"code": 403,
303+
"desc": "Forbidden (unauthorized user)"
304+
}
305+
}
306+
]
307+
308+
NOTE: If your index has a mapping with sub-objects where `status.code` and `status.desc` actually dotted fields, they appear in {ls} events as a nested structure.
309+
310+
[id="plugins-{type}s-{plugin}-esql-multifields"]
311+
===== Conflict on multi-fields
312+
313+
{esql} query fetches all parent and sub-fields fields if your {es} index has https://www.elastic.co/docs/reference/elasticsearch/mapping-reference/multi-fields[multi-fields] or https://www.elastic.co/docs/reference/elasticsearch/mapping-reference/subobjects[subobjects].
314+
Since {ls} events cannot contain parent field's concrete value and sub-field values together, the plugin ignores sub-fields with warning and includes parent.
315+
We recommend using the `RENAME` (or `DROP` to avoid warnings) keyword in your {esql} query explicitly rename the fields to include sub-fields into the event.
316+
317+
This a common occurrence if your template or mapping follows the pattern of always indexing strings as "text" (`field`) + " keyword" (`field.keyword`) multi-field.
318+
In this case it's recommended to do `KEEP field` if the string is identical and there is only one subfield as the engine will optimize and retrieve the keyword, otherwise you can do `KEEP field.keyword | RENAME field.keyword as field`.
319+
320+
To illustrate the situation with example, assuming your mapping has a time `time` field with `time.min` and `time.max` sub-fields as following:
321+
[source, ruby]
322+
"properties": {
323+
"time": { "type": "long" },
324+
"time.min": { "type": "long" },
325+
"time.max": { "type": "long" }
326+
}
327+
328+
The {esql} result will contain all three fields but the plugin cannot map them into {ls} event.
329+
To avoid this, you can use the `RENAME` keyword to rename the `time` parent field to get all three fields with unique fields.
330+
[source, ruby]
331+
...
332+
query => 'FROM my-index | RENAME time AS time.current'
333+
...
334+
335+
For comprehensive {esql} syntax reference and best practices, see the https://www.elastic.co/guide/en/elasticsearch/reference/current/esql-syntax.html[{esql} documentation].
336+
233337
[id="plugins-{type}s-{plugin}-options"]
234338
==== Elasticsearch Input configuration options
235339

@@ -257,6 +361,7 @@ Please check out <<plugins-{type}s-{plugin}-obsolete-options>> for details.
257361
| <<plugins-{type}s-{plugin}-password>> |<<password,password>>|No
258362
| <<plugins-{type}s-{plugin}-proxy>> |<<uri,uri>>|No
259363
| <<plugins-{type}s-{plugin}-query>> |<<string,string>>|No
364+
| <<plugins-{type}s-{plugin}-query_type>> |<<string,string>>, one of `["dsl","esql"]`|No
260365
| <<plugins-{type}s-{plugin}-response_type>> |<<string,string>>, one of `["hits","aggregations"]`|No
261366
| <<plugins-{type}s-{plugin}-request_timeout_seconds>> | <<number,number>>|No
262367
| <<plugins-{type}s-{plugin}-schedule>> |<<string,string>>|No
@@ -498,22 +603,35 @@ environment variables e.g. `proxy => '${LS_PROXY:}'`.
498603
* Value type is <<string,string>>
499604
* Default value is `'{ "sort": [ "_doc" ] }'`
500605

501-
The query to be executed. Read the {ref}/query-dsl.html[Elasticsearch query DSL
502-
documentation] for more information.
606+
The query to be executed.
607+
Accepted query shape is DSL or {esql} (when `query_type => 'esql'`).
608+
Read the {ref}/query-dsl.html[{es} query DSL documentation] or {ref}/esql.html[{esql} documentation] for more information.
503609

504610
When <<plugins-{type}s-{plugin}-search_api>> resolves to `search_after` and the query does not specify `sort`,
505611
the default sort `'{ "sort": { "_shard_doc": "asc" } }'` will be added to the query. Please refer to the {ref}/paginate-search-results.html#search-after[Elasticsearch search_after] parameter to know more.
506612

613+
[id="plugins-{type}s-{plugin}-query_type"]
614+
===== `query_type`
615+
616+
* Value can be `dsl` or `esql`
617+
* Default value is `dsl`
618+
619+
Defines the <<plugins-{type}s-{plugin}-query>> shape.
620+
When `dsl`, the query shape must be valid {es} JSON-style string.
621+
When `esql`, the query shape must be a valid {esql} string and `index`, `size`, `slices`, `search_api`, `docinfo`, `docinfo_target`, `docinfo_fields`, `response_type` and `tracking_field` parameters are not allowed.
622+
507623
[id="plugins-{type}s-{plugin}-response_type"]
508624
===== `response_type`
509625

510-
* Value can be any of: `hits`, `aggregations`
626+
* Value can be any of: `hits`, `aggregations`, `esql`
511627
* Default value is `hits`
512628

513629
Which part of the result to transform into Logstash events when processing the
514630
response from the query.
631+
515632
The default `hits` will generate one event per returned document (i.e. "hit").
516-
When set to `aggregations`, a single Logstash event will be generated with the
633+
634+
When set to `aggregations`, a single {ls} event will be generated with the
517635
contents of the `aggregations` object of the query's response. In this case the
518636
`hits` object will be ignored. The parameter `size` will be always be set to
519637
0 regardless of the default or user-defined value set in this plugin.

lib/logstash/inputs/elasticsearch.rb

Lines changed: 78 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,7 @@ class LogStash::Inputs::Elasticsearch < LogStash::Inputs::Base
7474
require 'logstash/inputs/elasticsearch/paginated_search'
7575
require 'logstash/inputs/elasticsearch/aggregation'
7676
require 'logstash/inputs/elasticsearch/cursor_tracker'
77+
require 'logstash/inputs/elasticsearch/esql'
7778

7879
include LogStash::PluginMixins::ECSCompatibilitySupport(:disabled, :v1, :v8 => :v1)
7980
include LogStash::PluginMixins::ECSCompatibilitySupport::TargetCheck
@@ -96,15 +97,21 @@ class LogStash::Inputs::Elasticsearch < LogStash::Inputs::Base
9697
# The index or alias to search.
9798
config :index, :validate => :string, :default => "logstash-*"
9899

99-
# The query to be executed. Read the Elasticsearch query DSL documentation
100-
# for more info
101-
# https://www.elastic.co/guide/en/elasticsearch/reference/current/query-dsl.html
100+
# A type of Elasticsearch query, provided by @query. This will validate query shape and other params.
101+
config :query_type, :validate => %w[dsl esql], :default => 'dsl'
102+
103+
# The query to be executed. DSL or ES|QL (when `query_type => 'esql'`) query shape is accepted.
104+
# Read the following documentations for more info
105+
# Query DSL: https://www.elastic.co/guide/en/elasticsearch/reference/current/query-dsl.html
106+
# ES|QL: https://www.elastic.co/guide/en/elasticsearch/reference/current/esql.html
102107
config :query, :validate => :string, :default => '{ "sort": [ "_doc" ] }'
103108

104-
# This allows you to speccify the response type: either hits or aggregations
105-
# where hits: normal search request
106-
# aggregations: aggregation request
107-
config :response_type, :validate => ['hits', 'aggregations'], :default => 'hits'
109+
# This allows you to specify the DSL response type: one of [hits, aggregations]
110+
# where
111+
# hits: normal search request
112+
# aggregations: aggregation request
113+
# Note that this param is invalid when `query_type => 'esql'`, ES|QL response shape is always a tabular format
114+
config :response_type, :validate => %w[hits aggregations], :default => 'hits'
108115

109116
# This allows you to set the maximum number of hits returned per scroll.
110117
config :size, :validate => :number, :default => 1000
@@ -286,6 +293,9 @@ class LogStash::Inputs::Elasticsearch < LogStash::Inputs::Base
286293
DEFAULT_EAV_HEADER = { "Elastic-Api-Version" => "2023-10-31" }.freeze
287294
INTERNAL_ORIGIN_HEADER = { 'x-elastic-product-origin' => 'logstash-input-elasticsearch'}.freeze
288295

296+
LS_ESQL_SUPPORT_VERSION = "8.17.4" # the version started using elasticsearch-ruby v8
297+
ES_ESQL_SUPPORT_VERSION = "8.11.0"
298+
289299
def initialize(params={})
290300
super(params)
291301

@@ -302,10 +312,17 @@ def register
302312
fill_hosts_from_cloud_id
303313
setup_ssl_params!
304314

305-
@base_query = LogStash::Json.load(@query)
306-
if @slices
307-
@base_query.include?('slice') && fail(LogStash::ConfigurationError, "Elasticsearch Input Plugin's `query` option cannot specify specific `slice` when configured to manage parallel slices with `slices` option")
308-
@slices < 1 && fail(LogStash::ConfigurationError, "Elasticsearch Input Plugin's `slices` option must be greater than zero, got `#{@slices}`")
315+
if @query_type == 'esql'
316+
validate_ls_version_for_esql_support!
317+
validate_esql_query!
318+
not_allowed_options = original_params.keys & %w(index size slices search_api docinfo docinfo_target docinfo_fields response_type tracking_field)
319+
raise(LogStash::ConfigurationError, "Configured #{not_allowed_options} params are not allowed while using ES|QL query") if not_allowed_options&.size > 1
320+
else
321+
@base_query = LogStash::Json.load(@query)
322+
if @slices
323+
@base_query.include?('slice') && fail(LogStash::ConfigurationError, "Elasticsearch Input Plugin's `query` option cannot specify specific `slice` when configured to manage parallel slices with `slices` option")
324+
@slices < 1 && fail(LogStash::ConfigurationError, "Elasticsearch Input Plugin's `slices` option must be greater than zero, got `#{@slices}`")
325+
end
309326
end
310327

311328
@retries < 0 && fail(LogStash::ConfigurationError, "Elasticsearch Input Plugin's `retries` option must be equal or greater than zero, got `#{@retries}`")
@@ -341,11 +358,13 @@ def register
341358

342359
test_connection!
343360

361+
validate_es_for_esql_support!
362+
344363
setup_serverless
345364

346365
setup_search_api
347366

348-
setup_query_executor
367+
@query_executor = create_query_executor
349368

350369
setup_cursor_tracker
351370

@@ -363,16 +382,6 @@ def run(output_queue)
363382
end
364383
end
365384

366-
def get_query_object
367-
if @cursor_tracker
368-
query = @cursor_tracker.inject_cursor(@query)
369-
@logger.debug("new query is #{query}")
370-
else
371-
query = @query
372-
end
373-
LogStash::Json.load(query)
374-
end
375-
376385
##
377386
# This can be called externally from the query_executor
378387
public
@@ -383,6 +392,23 @@ def push_hit(hit, output_queue, root_field = '_source')
383392
record_last_value(event)
384393
end
385394

395+
def decorate_event(event)
396+
decorate(event)
397+
end
398+
399+
private
400+
401+
def get_query_object
402+
return @query if @query_type == 'esql'
403+
if @cursor_tracker
404+
query = @cursor_tracker.inject_cursor(@query)
405+
@logger.debug("new query is #{query}")
406+
else
407+
query = @query
408+
end
409+
LogStash::Json.load(query)
410+
end
411+
386412
def record_last_value(event)
387413
@cursor_tracker.record_last_value(event) if @tracking_field
388414
end
@@ -414,8 +440,6 @@ def set_docinfo_fields(hit, event)
414440
event.set(@docinfo_target, docinfo_target)
415441
end
416442

417-
private
418-
419443
def hosts_default?(hosts)
420444
hosts.nil? || ( hosts.is_a?(Array) && hosts.empty? )
421445
end
@@ -664,18 +688,16 @@ def setup_search_api
664688

665689
end
666690

667-
def setup_query_executor
668-
@query_executor = case @response_type
669-
when 'hits'
670-
if @resolved_search_api == "search_after"
671-
LogStash::Inputs::Elasticsearch::SearchAfter.new(@client, self)
672-
else
673-
logger.warn("scroll API is no longer recommended for pagination. Consider using search_after instead.") if es_major_version >= 8
674-
LogStash::Inputs::Elasticsearch::Scroll.new(@client, self)
675-
end
676-
when 'aggregations'
677-
LogStash::Inputs::Elasticsearch::Aggregation.new(@client, self)
678-
end
691+
def create_query_executor
692+
return LogStash::Inputs::Elasticsearch::Esql.new(@client, self) if @query_type == 'esql'
693+
694+
# DSL query executor
695+
return LogStash::Inputs::Elasticsearch::Aggregation.new(@client, self) if @response_type == 'aggregations'
696+
# response_type is hits, executor can be search_after or scroll type
697+
return LogStash::Inputs::Elasticsearch::SearchAfter.new(@client, self) if @resolved_search_api == "search_after"
698+
699+
logger.warn("scroll API is no longer recommended for pagination. Consider using search_after instead.") if es_major_version >= 8
700+
LogStash::Inputs::Elasticsearch::Scroll.new(@client, self)
679701
end
680702

681703
def setup_cursor_tracker
@@ -714,6 +736,26 @@ def get_transport_client_class
714736
::Elastic::Transport::Transport::HTTP::Manticore
715737
end
716738

739+
def validate_ls_version_for_esql_support!
740+
if Gem::Version.create(LOGSTASH_VERSION) < Gem::Version.create(LS_ESQL_SUPPORT_VERSION)
741+
fail("Current version of Logstash does not include Elasticsearch client which supports ES|QL. Please upgrade Logstash to at least #{LS_ESQL_SUPPORT_VERSION}")
742+
end
743+
end
744+
745+
def validate_esql_query!
746+
fail(LogStash::ConfigurationError, "`query` cannot be empty") if @query.strip.empty?
747+
source_commands = %w[FROM ROW SHOW]
748+
contains_source_command = source_commands.any? { |source_command| @query.strip.start_with?(source_command) }
749+
fail(LogStash::ConfigurationError, "`query` needs to start with any of #{source_commands}") unless contains_source_command
750+
end
751+
752+
def validate_es_for_esql_support!
753+
return unless @query_type == 'esql'
754+
# make sure connected ES supports ES|QL (8.11+)
755+
es_supports_esql = Gem::Version.create(es_version) >= Gem::Version.create(ES_ESQL_SUPPORT_VERSION)
756+
fail("Connected Elasticsearch #{es_version} version does not supports ES|QL. ES|QL feature requires at least Elasticsearch #{ES_ESQL_SUPPORT_VERSION} version.") unless es_supports_esql
757+
end
758+
717759
module URIOrEmptyValidator
718760
##
719761
# @override to provide :uri_or_empty validator

0 commit comments

Comments
 (0)