datafusion_optimizer/analyzer/
mod.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! [`Analyzer`] and [`AnalyzerRule`]
19
20use std::fmt::Debug;
21use std::sync::Arc;
22
23use log::debug;
24
25use datafusion_common::config::ConfigOptions;
26use datafusion_common::instant::Instant;
27use datafusion_common::Result;
28use datafusion_expr::expr_rewriter::FunctionRewrite;
29use datafusion_expr::{InvariantLevel, LogicalPlan};
30
31use crate::analyzer::resolve_grouping_function::ResolveGroupingFunction;
32use crate::analyzer::type_coercion::TypeCoercion;
33use crate::utils::log_plan;
34
35use self::function_rewrite::ApplyFunctionRewrites;
36
37pub mod function_rewrite;
38pub mod resolve_grouping_function;
39pub mod type_coercion;
40
41pub mod subquery {
42    #[deprecated(
43        since = "44.0.0",
44        note = "please use `datafusion_expr::check_subquery_expr` instead"
45    )]
46    pub use datafusion_expr::check_subquery_expr;
47}
48
49/// [`AnalyzerRule`]s transform [`LogicalPlan`]s in some way to make
50/// the plan valid prior to the rest of the DataFusion optimization process.
51///
52/// `AnalyzerRule`s are different than an [`OptimizerRule`](crate::OptimizerRule)s
53/// which must preserve the semantics of the `LogicalPlan`, while computing
54/// results in a more optimal way.
55///
56/// For example, an `AnalyzerRule` may resolve [`Expr`](datafusion_expr::Expr)s into more specific
57/// forms such as a subquery reference, or do type coercion to ensure the types
58/// of operands are correct.
59///
60/// Use [`SessionState::add_analyzer_rule`] to register additional
61/// `AnalyzerRule`s.
62///
63/// [`SessionState::add_analyzer_rule`]: https://docs.rs/datafusion/latest/datafusion/execution/session_state/struct.SessionState.html#method.add_analyzer_rule
64pub trait AnalyzerRule: Debug {
65    /// Rewrite `plan`
66    fn analyze(&self, plan: LogicalPlan, config: &ConfigOptions) -> Result<LogicalPlan>;
67
68    /// A human readable name for this analyzer rule
69    fn name(&self) -> &str;
70}
71
72/// Rule-based Analyzer.
73///
74/// Applies [`FunctionRewrite`]s and [`AnalyzerRule`]s to transform a
75/// [`LogicalPlan`] in preparation for execution.
76///
77/// For example, the `Analyzer` applies type coercion to ensure the types of
78/// operands match the types required by functions.
79#[derive(Clone, Debug)]
80pub struct Analyzer {
81    /// Expr --> Function writes to apply prior to analysis passes
82    pub function_rewrites: Vec<Arc<dyn FunctionRewrite + Send + Sync>>,
83    /// All rules to apply
84    pub rules: Vec<Arc<dyn AnalyzerRule + Send + Sync>>,
85}
86
87impl Default for Analyzer {
88    fn default() -> Self {
89        Self::new()
90    }
91}
92
93impl Analyzer {
94    /// Create a new analyzer using the recommended list of rules
95    pub fn new() -> Self {
96        let rules: Vec<Arc<dyn AnalyzerRule + Send + Sync>> = vec![
97            Arc::new(ResolveGroupingFunction::new()),
98            Arc::new(TypeCoercion::new()),
99        ];
100        Self::with_rules(rules)
101    }
102
103    /// Create a new analyzer with the given rules
104    pub fn with_rules(rules: Vec<Arc<dyn AnalyzerRule + Send + Sync>>) -> Self {
105        Self {
106            function_rewrites: vec![],
107            rules,
108        }
109    }
110
111    /// Add a function rewrite rule
112    pub fn add_function_rewrite(
113        &mut self,
114        rewrite: Arc<dyn FunctionRewrite + Send + Sync>,
115    ) {
116        self.function_rewrites.push(rewrite);
117    }
118
119    /// return the list of function rewrites in this analyzer
120    pub fn function_rewrites(&self) -> &[Arc<dyn FunctionRewrite + Send + Sync>] {
121        &self.function_rewrites
122    }
123
124    /// Analyze the logical plan by applying analyzer rules, and
125    /// do necessary check and fail the invalid plans
126    pub fn execute_and_check<F>(
127        &self,
128        plan: LogicalPlan,
129        config: &ConfigOptions,
130        mut observer: F,
131    ) -> Result<LogicalPlan>
132    where
133        F: FnMut(&LogicalPlan, &dyn AnalyzerRule),
134    {
135        // verify the logical plan required invariants at the start, before analyzer
136        plan.check_invariants(InvariantLevel::Always)
137            .map_err(|e| e.context("Invalid input plan passed to Analyzer"))?;
138
139        let start_time = Instant::now();
140        let mut new_plan = plan;
141
142        // Create an analyzer pass that rewrites `Expr`s to function_calls, as
143        // appropriate.
144        //
145        // Note this is run before all other rules since it rewrites based on
146        // the argument types (List or Scalar), and TypeCoercion may cast the
147        // argument types from Scalar to List.
148        let expr_to_function: Option<Arc<dyn AnalyzerRule + Send + Sync>> =
149            if self.function_rewrites.is_empty() {
150                None
151            } else {
152                Some(Arc::new(ApplyFunctionRewrites::new(
153                    self.function_rewrites.clone(),
154                )))
155            };
156        let rules = expr_to_function.iter().chain(self.rules.iter());
157
158        // TODO add common rule executor for Analyzer and Optimizer
159        for rule in rules {
160            new_plan = rule
161                .analyze(new_plan, config)
162                .map_err(|e| e.context(rule.name()))?;
163            log_plan(rule.name(), &new_plan);
164            observer(&new_plan, rule.as_ref());
165        }
166
167        // verify at the end, after the last LP analyzer pass, that the plan is executable.
168        new_plan
169            .check_invariants(InvariantLevel::Executable)
170            .map_err(|e| e.context("Invalid (non-executable) plan after Analyzer"))?;
171
172        log_plan("Final analyzed plan", &new_plan);
173        debug!("Analyzer took {} ms", start_time.elapsed().as_millis());
174        Ok(new_plan)
175    }
176}