datafusion_expr/logical_plan/
builder.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! This module provides a builder for creating LogicalPlans
19
20use std::any::Any;
21use std::cmp::Ordering;
22use std::collections::{HashMap, HashSet};
23use std::iter::once;
24use std::sync::Arc;
25
26use crate::dml::CopyTo;
27use crate::expr::{Alias, PlannedReplaceSelectItem, Sort as SortExpr};
28use crate::expr_rewriter::{
29    coerce_plan_expr_for_schema, normalize_col,
30    normalize_col_with_schemas_and_ambiguity_check, normalize_cols, normalize_sorts,
31    rewrite_sort_cols_by_aggs,
32};
33use crate::logical_plan::{
34    Aggregate, Analyze, Distinct, DistinctOn, EmptyRelation, Explain, Filter, Join,
35    JoinConstraint, JoinType, Limit, LogicalPlan, Partitioning, PlanType, Prepare,
36    Projection, Repartition, Sort, SubqueryAlias, TableScan, Union, Unnest, Values,
37    Window,
38};
39use crate::select_expr::SelectExpr;
40use crate::utils::{
41    can_hash, columnize_expr, compare_sort_expr, expand_qualified_wildcard,
42    expand_wildcard, expr_to_columns, find_valid_equijoin_key_pair,
43    group_window_expr_by_sort_keys,
44};
45use crate::{
46    and, binary_expr, lit, DmlStatement, Expr, ExprSchemable, Operator, RecursiveQuery,
47    Statement, TableProviderFilterPushDown, TableSource, WriteOp,
48};
49
50use super::dml::InsertOp;
51use super::plan::{ColumnUnnestList, ExplainFormat};
52use arrow::compute::can_cast_types;
53use arrow::datatypes::{DataType, Field, Fields, Schema, SchemaRef};
54use datafusion_common::display::ToStringifiedPlan;
55use datafusion_common::file_options::file_type::FileType;
56use datafusion_common::{
57    exec_err, get_target_functional_dependencies, internal_err, not_impl_err,
58    plan_datafusion_err, plan_err, Column, Constraints, DFSchema, DFSchemaRef,
59    DataFusionError, Result, ScalarValue, TableReference, ToDFSchema, UnnestOptions,
60};
61use datafusion_expr_common::type_coercion::binary::type_union_resolution;
62
63use indexmap::IndexSet;
64
65/// Default table name for unnamed table
66pub const UNNAMED_TABLE: &str = "?table?";
67
68/// Options for [`LogicalPlanBuilder`]
69#[derive(Default, Debug, Clone)]
70pub struct LogicalPlanBuilderOptions {
71    /// Flag indicating whether the plan builder should add
72    /// functionally dependent expressions as additional aggregation groupings.
73    add_implicit_group_by_exprs: bool,
74}
75
76impl LogicalPlanBuilderOptions {
77    pub fn new() -> Self {
78        Default::default()
79    }
80
81    /// Should the builder add functionally dependent expressions as additional aggregation groupings.
82    pub fn with_add_implicit_group_by_exprs(mut self, add: bool) -> Self {
83        self.add_implicit_group_by_exprs = add;
84        self
85    }
86}
87
88/// Builder for logical plans
89///
90/// # Example building a simple plan
91/// ```
92/// # use datafusion_expr::{lit, col, LogicalPlanBuilder, logical_plan::table_scan};
93/// # use datafusion_common::Result;
94/// # use arrow::datatypes::{Schema, DataType, Field};
95/// #
96/// # fn main() -> Result<()> {
97/// #
98/// # fn employee_schema() -> Schema {
99/// #    Schema::new(vec![
100/// #           Field::new("id", DataType::Int32, false),
101/// #           Field::new("first_name", DataType::Utf8, false),
102/// #           Field::new("last_name", DataType::Utf8, false),
103/// #           Field::new("state", DataType::Utf8, false),
104/// #           Field::new("salary", DataType::Int32, false),
105/// #       ])
106/// #   }
107/// #
108/// // Create a plan similar to
109/// // SELECT last_name
110/// // FROM employees
111/// // WHERE salary < 1000
112/// let plan = table_scan(Some("employee"), &employee_schema(), None)?
113///  // Keep only rows where salary < 1000
114///  .filter(col("salary").lt(lit(1000)))?
115///  // only show "last_name" in the final results
116///  .project(vec![col("last_name")])?
117///  .build()?;
118///
119/// // Convert from plan back to builder
120/// let builder = LogicalPlanBuilder::from(plan);
121///
122/// # Ok(())
123/// # }
124/// ```
125#[derive(Debug, Clone)]
126pub struct LogicalPlanBuilder {
127    plan: Arc<LogicalPlan>,
128    options: LogicalPlanBuilderOptions,
129}
130
131impl LogicalPlanBuilder {
132    /// Create a builder from an existing plan
133    pub fn new(plan: LogicalPlan) -> Self {
134        Self {
135            plan: Arc::new(plan),
136            options: LogicalPlanBuilderOptions::default(),
137        }
138    }
139
140    /// Create a builder from an existing plan
141    pub fn new_from_arc(plan: Arc<LogicalPlan>) -> Self {
142        Self {
143            plan,
144            options: LogicalPlanBuilderOptions::default(),
145        }
146    }
147
148    pub fn with_options(mut self, options: LogicalPlanBuilderOptions) -> Self {
149        self.options = options;
150        self
151    }
152
153    /// Return the output schema of the plan build so far
154    pub fn schema(&self) -> &DFSchemaRef {
155        self.plan.schema()
156    }
157
158    /// Return the LogicalPlan of the plan build so far
159    pub fn plan(&self) -> &LogicalPlan {
160        &self.plan
161    }
162
163    /// Create an empty relation.
164    ///
165    /// `produce_one_row` set to true means this empty node needs to produce a placeholder row.
166    pub fn empty(produce_one_row: bool) -> Self {
167        Self::new(LogicalPlan::EmptyRelation(EmptyRelation {
168            produce_one_row,
169            schema: DFSchemaRef::new(DFSchema::empty()),
170        }))
171    }
172
173    /// Convert a regular plan into a recursive query.
174    /// `is_distinct` indicates whether the recursive term should be de-duplicated (`UNION`) after each iteration or not (`UNION ALL`).
175    pub fn to_recursive_query(
176        self,
177        name: String,
178        recursive_term: LogicalPlan,
179        is_distinct: bool,
180    ) -> Result<Self> {
181        // TODO: we need to do a bunch of validation here. Maybe more.
182        if is_distinct {
183            return not_impl_err!(
184                "Recursive queries with a distinct 'UNION' (in which the previous iteration's results will be de-duplicated) is not supported"
185            );
186        }
187        // Ensure that the static term and the recursive term have the same number of fields
188        let static_fields_len = self.plan.schema().fields().len();
189        let recursive_fields_len = recursive_term.schema().fields().len();
190        if static_fields_len != recursive_fields_len {
191            return plan_err!(
192                "Non-recursive term and recursive term must have the same number of columns ({} != {})",
193                static_fields_len, recursive_fields_len
194            );
195        }
196        // Ensure that the recursive term has the same field types as the static term
197        let coerced_recursive_term =
198            coerce_plan_expr_for_schema(recursive_term, self.plan.schema())?;
199        Ok(Self::from(LogicalPlan::RecursiveQuery(RecursiveQuery {
200            name,
201            static_term: self.plan,
202            recursive_term: Arc::new(coerced_recursive_term),
203            is_distinct,
204        })))
205    }
206
207    /// Create a values list based relation, and the schema is inferred from data, consuming
208    /// `value`. See the [Postgres VALUES](https://www.postgresql.org/docs/current/queries-values.html)
209    /// documentation for more details.
210    ///
211    /// so it's usually better to override the default names with a table alias list.
212    ///
213    /// If the values include params/binders such as $1, $2, $3, etc, then the `param_data_types` should be provided.
214    pub fn values(values: Vec<Vec<Expr>>) -> Result<Self> {
215        if values.is_empty() {
216            return plan_err!("Values list cannot be empty");
217        }
218        let n_cols = values[0].len();
219        if n_cols == 0 {
220            return plan_err!("Values list cannot be zero length");
221        }
222        for (i, row) in values.iter().enumerate() {
223            if row.len() != n_cols {
224                return plan_err!(
225                    "Inconsistent data length across values list: got {} values in row {} but expected {}",
226                    row.len(),
227                    i,
228                    n_cols
229                );
230            }
231        }
232
233        // Infer from data itself
234        Self::infer_data(values)
235    }
236
237    /// Create a values list based relation, and the schema is inferred from data itself or table schema if provided, consuming
238    /// `value`. See the [Postgres VALUES](https://www.postgresql.org/docs/current/queries-values.html)
239    /// documentation for more details.
240    ///
241    /// By default, it assigns the names column1, column2, etc. to the columns of a VALUES table.
242    /// The column names are not specified by the SQL standard and different database systems do it differently,
243    /// so it's usually better to override the default names with a table alias list.
244    ///
245    /// If the values include params/binders such as $1, $2, $3, etc, then the `param_data_types` should be provided.
246    pub fn values_with_schema(
247        values: Vec<Vec<Expr>>,
248        schema: &DFSchemaRef,
249    ) -> Result<Self> {
250        if values.is_empty() {
251            return plan_err!("Values list cannot be empty");
252        }
253        let n_cols = schema.fields().len();
254        if n_cols == 0 {
255            return plan_err!("Values list cannot be zero length");
256        }
257        for (i, row) in values.iter().enumerate() {
258            if row.len() != n_cols {
259                return plan_err!(
260                    "Inconsistent data length across values list: got {} values in row {} but expected {}",
261                    row.len(),
262                    i,
263                    n_cols
264                );
265            }
266        }
267
268        // Check the type of value against the schema
269        Self::infer_values_from_schema(values, schema)
270    }
271
272    fn infer_values_from_schema(
273        values: Vec<Vec<Expr>>,
274        schema: &DFSchema,
275    ) -> Result<Self> {
276        let n_cols = values[0].len();
277        let mut fields = ValuesFields::new();
278        for j in 0..n_cols {
279            let field_type = schema.field(j).data_type();
280            let field_nullable = schema.field(j).is_nullable();
281            for row in values.iter() {
282                let value = &row[j];
283                let data_type = value.get_type(schema)?;
284
285                if !data_type.equals_datatype(field_type) {
286                    if can_cast_types(&data_type, field_type) {
287                    } else {
288                        return exec_err!(
289                            "type mismatch and can't cast to got {} and {}",
290                            data_type,
291                            field_type
292                        );
293                    }
294                }
295            }
296            fields.push(field_type.to_owned(), field_nullable);
297        }
298
299        Self::infer_inner(values, fields, schema)
300    }
301
302    fn infer_data(values: Vec<Vec<Expr>>) -> Result<Self> {
303        let n_cols = values[0].len();
304        let schema = DFSchema::empty();
305        let mut fields = ValuesFields::new();
306
307        for j in 0..n_cols {
308            let mut common_type: Option<DataType> = None;
309            for (i, row) in values.iter().enumerate() {
310                let value = &row[j];
311                let data_type = value.get_type(&schema)?;
312                if data_type == DataType::Null {
313                    continue;
314                }
315
316                if let Some(prev_type) = common_type {
317                    // get common type of each column values.
318                    let data_types = vec![prev_type.clone(), data_type.clone()];
319                    let Some(new_type) = type_union_resolution(&data_types) else {
320                        return plan_err!("Inconsistent data type across values list at row {i} column {j}. Was {prev_type} but found {data_type}");
321                    };
322                    common_type = Some(new_type);
323                } else {
324                    common_type = Some(data_type);
325                }
326            }
327            // assuming common_type was not set, and no error, therefore the type should be NULL
328            // since the code loop skips NULL
329            fields.push(common_type.unwrap_or(DataType::Null), true);
330        }
331
332        Self::infer_inner(values, fields, &schema)
333    }
334
335    fn infer_inner(
336        mut values: Vec<Vec<Expr>>,
337        fields: ValuesFields,
338        schema: &DFSchema,
339    ) -> Result<Self> {
340        let fields = fields.into_fields();
341        // wrap cast if data type is not same as common type.
342        for row in &mut values {
343            for (j, field_type) in fields.iter().map(|f| f.data_type()).enumerate() {
344                if let Expr::Literal(ScalarValue::Null) = row[j] {
345                    row[j] = Expr::Literal(ScalarValue::try_from(field_type)?);
346                } else {
347                    row[j] = std::mem::take(&mut row[j]).cast_to(field_type, schema)?;
348                }
349            }
350        }
351
352        let dfschema = DFSchema::from_unqualified_fields(fields, HashMap::new())?;
353        let schema = DFSchemaRef::new(dfschema);
354
355        Ok(Self::new(LogicalPlan::Values(Values { schema, values })))
356    }
357
358    /// Convert a table provider into a builder with a TableScan
359    ///
360    /// Note that if you pass a string as `table_name`, it is treated
361    /// as a SQL identifier, as described on [`TableReference`] and
362    /// thus is normalized
363    ///
364    /// # Example:
365    /// ```
366    /// # use datafusion_expr::{lit, col, LogicalPlanBuilder,
367    /// #  logical_plan::builder::LogicalTableSource, logical_plan::table_scan
368    /// # };
369    /// # use std::sync::Arc;
370    /// # use arrow::datatypes::{Schema, DataType, Field};
371    /// # use datafusion_common::TableReference;
372    /// #
373    /// # let employee_schema = Arc::new(Schema::new(vec![
374    /// #           Field::new("id", DataType::Int32, false),
375    /// # ])) as _;
376    /// # let table_source = Arc::new(LogicalTableSource::new(employee_schema));
377    /// // Scan table_source with the name "mytable" (after normalization)
378    /// # let table = table_source.clone();
379    /// let scan = LogicalPlanBuilder::scan("MyTable", table, None);
380    ///
381    /// // Scan table_source with the name "MyTable" by enclosing in quotes
382    /// # let table = table_source.clone();
383    /// let scan = LogicalPlanBuilder::scan(r#""MyTable""#, table, None);
384    ///
385    /// // Scan table_source with the name "MyTable" by forming the table reference
386    /// # let table = table_source.clone();
387    /// let table_reference = TableReference::bare("MyTable");
388    /// let scan = LogicalPlanBuilder::scan(table_reference, table, None);
389    /// ```
390    pub fn scan(
391        table_name: impl Into<TableReference>,
392        table_source: Arc<dyn TableSource>,
393        projection: Option<Vec<usize>>,
394    ) -> Result<Self> {
395        Self::scan_with_filters(table_name, table_source, projection, vec![])
396    }
397
398    /// Create a [CopyTo] for copying the contents of this builder to the specified file(s)
399    pub fn copy_to(
400        input: LogicalPlan,
401        output_url: String,
402        file_type: Arc<dyn FileType>,
403        options: HashMap<String, String>,
404        partition_by: Vec<String>,
405    ) -> Result<Self> {
406        Ok(Self::new(LogicalPlan::Copy(CopyTo {
407            input: Arc::new(input),
408            output_url,
409            partition_by,
410            file_type,
411            options,
412        })))
413    }
414
415    /// Create a [`DmlStatement`] for inserting the contents of this builder into the named table.
416    ///
417    /// Note,  use a [`DefaultTableSource`] to insert into a [`TableProvider`]
418    ///
419    /// [`DefaultTableSource`]: https://docs.rs/datafusion/latest/datafusion/datasource/default_table_source/struct.DefaultTableSource.html
420    /// [`TableProvider`]: https://docs.rs/datafusion/latest/datafusion/catalog/trait.TableProvider.html
421    ///
422    /// # Example:
423    /// ```
424    /// # use datafusion_expr::{lit, LogicalPlanBuilder,
425    /// #  logical_plan::builder::LogicalTableSource,
426    /// # };
427    /// # use std::sync::Arc;
428    /// # use arrow::datatypes::{Schema, DataType, Field};
429    /// # use datafusion_expr::dml::InsertOp;
430    /// #
431    /// # fn test() -> datafusion_common::Result<()> {
432    /// # let employee_schema = Arc::new(Schema::new(vec![
433    /// #     Field::new("id", DataType::Int32, false),
434    /// # ])) as _;
435    /// # let table_source = Arc::new(LogicalTableSource::new(employee_schema));
436    /// // VALUES (1), (2)
437    /// let input = LogicalPlanBuilder::values(vec![vec![lit(1)], vec![lit(2)]])?
438    ///   .build()?;
439    /// // INSERT INTO MyTable VALUES (1), (2)
440    /// let insert_plan = LogicalPlanBuilder::insert_into(
441    ///   input,
442    ///   "MyTable",
443    ///   table_source,
444    ///   InsertOp::Append,
445    /// )?;
446    /// # Ok(())
447    /// # }
448    /// ```
449    pub fn insert_into(
450        input: LogicalPlan,
451        table_name: impl Into<TableReference>,
452        target: Arc<dyn TableSource>,
453        insert_op: InsertOp,
454    ) -> Result<Self> {
455        Ok(Self::new(LogicalPlan::Dml(DmlStatement::new(
456            table_name.into(),
457            target,
458            WriteOp::Insert(insert_op),
459            Arc::new(input),
460        ))))
461    }
462
463    /// Convert a table provider into a builder with a TableScan
464    pub fn scan_with_filters(
465        table_name: impl Into<TableReference>,
466        table_source: Arc<dyn TableSource>,
467        projection: Option<Vec<usize>>,
468        filters: Vec<Expr>,
469    ) -> Result<Self> {
470        Self::scan_with_filters_inner(table_name, table_source, projection, filters, None)
471    }
472
473    /// Convert a table provider into a builder with a TableScan with filter and fetch
474    pub fn scan_with_filters_fetch(
475        table_name: impl Into<TableReference>,
476        table_source: Arc<dyn TableSource>,
477        projection: Option<Vec<usize>>,
478        filters: Vec<Expr>,
479        fetch: Option<usize>,
480    ) -> Result<Self> {
481        Self::scan_with_filters_inner(
482            table_name,
483            table_source,
484            projection,
485            filters,
486            fetch,
487        )
488    }
489
490    fn scan_with_filters_inner(
491        table_name: impl Into<TableReference>,
492        table_source: Arc<dyn TableSource>,
493        projection: Option<Vec<usize>>,
494        filters: Vec<Expr>,
495        fetch: Option<usize>,
496    ) -> Result<Self> {
497        let table_scan =
498            TableScan::try_new(table_name, table_source, projection, filters, fetch)?;
499
500        // Inline TableScan
501        if table_scan.filters.is_empty() {
502            if let Some(p) = table_scan.source.get_logical_plan() {
503                let sub_plan = p.into_owned();
504                // Ensures that the reference to the inlined table remains the
505                // same, meaning we don't have to change any of the parent nodes
506                // that reference this table.
507                return Self::new(sub_plan).alias(table_scan.table_name);
508            }
509        }
510
511        Ok(Self::new(LogicalPlan::TableScan(table_scan)))
512    }
513
514    /// Wrap a plan in a window
515    pub fn window_plan(
516        input: LogicalPlan,
517        window_exprs: impl IntoIterator<Item = Expr>,
518    ) -> Result<LogicalPlan> {
519        let mut plan = input;
520        let mut groups = group_window_expr_by_sort_keys(window_exprs)?;
521        // To align with the behavior of PostgreSQL, we want the sort_keys sorted as same rule as PostgreSQL that first
522        // we compare the sort key themselves and if one window's sort keys are a prefix of another
523        // put the window with more sort keys first. so more deeply sorted plans gets nested further down as children.
524        // The sort_by() implementation here is a stable sort.
525        // Note that by this rule if there's an empty over, it'll be at the top level
526        groups.sort_by(|(key_a, _), (key_b, _)| {
527            for ((first, _), (second, _)) in key_a.iter().zip(key_b.iter()) {
528                let key_ordering = compare_sort_expr(first, second, plan.schema());
529                match key_ordering {
530                    Ordering::Less => {
531                        return Ordering::Less;
532                    }
533                    Ordering::Greater => {
534                        return Ordering::Greater;
535                    }
536                    Ordering::Equal => {}
537                }
538            }
539            key_b.len().cmp(&key_a.len())
540        });
541        for (_, exprs) in groups {
542            let window_exprs = exprs.into_iter().collect::<Vec<_>>();
543            // Partition and sorting is done at physical level, see the EnforceDistribution
544            // and EnforceSorting rules.
545            plan = LogicalPlanBuilder::from(plan)
546                .window(window_exprs)?
547                .build()?;
548        }
549        Ok(plan)
550    }
551
552    /// Apply a projection without alias.
553    pub fn project(
554        self,
555        expr: impl IntoIterator<Item = impl Into<SelectExpr>>,
556    ) -> Result<Self> {
557        project(Arc::unwrap_or_clone(self.plan), expr).map(Self::new)
558    }
559
560    /// Apply a projection without alias with optional validation
561    /// (true to validate, false to not validate)
562    pub fn project_with_validation(
563        self,
564        expr: Vec<(impl Into<SelectExpr>, bool)>,
565    ) -> Result<Self> {
566        project_with_validation(Arc::unwrap_or_clone(self.plan), expr).map(Self::new)
567    }
568
569    /// Select the given column indices
570    pub fn select(self, indices: impl IntoIterator<Item = usize>) -> Result<Self> {
571        let exprs: Vec<_> = indices
572            .into_iter()
573            .map(|x| Expr::Column(Column::from(self.plan.schema().qualified_field(x))))
574            .collect();
575        self.project(exprs)
576    }
577
578    /// Apply a filter
579    pub fn filter(self, expr: impl Into<Expr>) -> Result<Self> {
580        let expr = normalize_col(expr.into(), &self.plan)?;
581        Filter::try_new(expr, self.plan)
582            .map(LogicalPlan::Filter)
583            .map(Self::new)
584    }
585
586    /// Apply a filter which is used for a having clause
587    pub fn having(self, expr: impl Into<Expr>) -> Result<Self> {
588        let expr = normalize_col(expr.into(), &self.plan)?;
589        Filter::try_new_with_having(expr, self.plan)
590            .map(LogicalPlan::Filter)
591            .map(Self::from)
592    }
593
594    /// Make a builder for a prepare logical plan from the builder's plan
595    pub fn prepare(self, name: String, data_types: Vec<DataType>) -> Result<Self> {
596        Ok(Self::new(LogicalPlan::Statement(Statement::Prepare(
597            Prepare {
598                name,
599                data_types,
600                input: self.plan,
601            },
602        ))))
603    }
604
605    /// Limit the number of rows returned
606    ///
607    /// `skip` - Number of rows to skip before fetch any row.
608    ///
609    /// `fetch` - Maximum number of rows to fetch, after skipping `skip` rows,
610    ///          if specified.
611    pub fn limit(self, skip: usize, fetch: Option<usize>) -> Result<Self> {
612        let skip_expr = if skip == 0 {
613            None
614        } else {
615            Some(lit(skip as i64))
616        };
617        let fetch_expr = fetch.map(|f| lit(f as i64));
618        self.limit_by_expr(skip_expr, fetch_expr)
619    }
620
621    /// Limit the number of rows returned
622    ///
623    /// Similar to `limit` but uses expressions for `skip` and `fetch`
624    pub fn limit_by_expr(self, skip: Option<Expr>, fetch: Option<Expr>) -> Result<Self> {
625        Ok(Self::new(LogicalPlan::Limit(Limit {
626            skip: skip.map(Box::new),
627            fetch: fetch.map(Box::new),
628            input: self.plan,
629        })))
630    }
631
632    /// Apply an alias
633    pub fn alias(self, alias: impl Into<TableReference>) -> Result<Self> {
634        subquery_alias(Arc::unwrap_or_clone(self.plan), alias).map(Self::new)
635    }
636
637    /// Add missing sort columns to all downstream projection
638    ///
639    /// Thus, if you have a LogicalPlan that selects A and B and have
640    /// not requested a sort by C, this code will add C recursively to
641    /// all input projections.
642    ///
643    /// Adding a new column is not correct if there is a `Distinct`
644    /// node, which produces only distinct values of its
645    /// inputs. Adding a new column to its input will result in
646    /// potentially different results than with the original column.
647    ///
648    /// For example, if the input is like:
649    ///
650    /// Distinct(A, B)
651    ///
652    /// If the input looks like
653    ///
654    /// a | b | c
655    /// --+---+---
656    /// 1 | 2 | 3
657    /// 1 | 2 | 4
658    ///
659    /// Distinct (A, B) --> (1,2)
660    ///
661    /// But Distinct (A, B, C) --> (1, 2, 3), (1, 2, 4)
662    ///  (which will appear as a (1, 2), (1, 2) if a and b are projected
663    ///
664    /// See <https://github.com/apache/datafusion/issues/5065> for more details
665    fn add_missing_columns(
666        curr_plan: LogicalPlan,
667        missing_cols: &IndexSet<Column>,
668        is_distinct: bool,
669    ) -> Result<LogicalPlan> {
670        match curr_plan {
671            LogicalPlan::Projection(Projection {
672                input,
673                mut expr,
674                schema: _,
675            }) if missing_cols.iter().all(|c| input.schema().has_column(c)) => {
676                let mut missing_exprs = missing_cols
677                    .iter()
678                    .map(|c| normalize_col(Expr::Column(c.clone()), &input))
679                    .collect::<Result<Vec<_>>>()?;
680
681                // Do not let duplicate columns to be added, some of the
682                // missing_cols may be already present but without the new
683                // projected alias.
684                missing_exprs.retain(|e| !expr.contains(e));
685                if is_distinct {
686                    Self::ambiguous_distinct_check(&missing_exprs, missing_cols, &expr)?;
687                }
688                expr.extend(missing_exprs);
689                project(Arc::unwrap_or_clone(input), expr)
690            }
691            _ => {
692                let is_distinct =
693                    is_distinct || matches!(curr_plan, LogicalPlan::Distinct(_));
694                let new_inputs = curr_plan
695                    .inputs()
696                    .into_iter()
697                    .map(|input_plan| {
698                        Self::add_missing_columns(
699                            (*input_plan).clone(),
700                            missing_cols,
701                            is_distinct,
702                        )
703                    })
704                    .collect::<Result<Vec<_>>>()?;
705                curr_plan.with_new_exprs(curr_plan.expressions(), new_inputs)
706            }
707        }
708    }
709
710    fn ambiguous_distinct_check(
711        missing_exprs: &[Expr],
712        missing_cols: &IndexSet<Column>,
713        projection_exprs: &[Expr],
714    ) -> Result<()> {
715        if missing_exprs.is_empty() {
716            return Ok(());
717        }
718
719        // if the missing columns are all only aliases for things in
720        // the existing select list, it is ok
721        //
722        // This handles the special case for
723        // SELECT col as <alias> ORDER BY <alias>
724        //
725        // As described in https://github.com/apache/datafusion/issues/5293
726        let all_aliases = missing_exprs.iter().all(|e| {
727            projection_exprs.iter().any(|proj_expr| {
728                if let Expr::Alias(Alias { expr, .. }) = proj_expr {
729                    e == expr.as_ref()
730                } else {
731                    false
732                }
733            })
734        });
735        if all_aliases {
736            return Ok(());
737        }
738
739        let missing_col_names = missing_cols
740            .iter()
741            .map(|col| col.flat_name())
742            .collect::<String>();
743
744        plan_err!("For SELECT DISTINCT, ORDER BY expressions {missing_col_names} must appear in select list")
745    }
746
747    /// Apply a sort by provided expressions with default direction
748    pub fn sort_by(
749        self,
750        expr: impl IntoIterator<Item = impl Into<Expr>> + Clone,
751    ) -> Result<Self> {
752        self.sort(
753            expr.into_iter()
754                .map(|e| e.into().sort(true, false))
755                .collect::<Vec<SortExpr>>(),
756        )
757    }
758
759    pub fn sort(
760        self,
761        sorts: impl IntoIterator<Item = impl Into<SortExpr>> + Clone,
762    ) -> Result<Self> {
763        self.sort_with_limit(sorts, None)
764    }
765
766    /// Apply a sort
767    pub fn sort_with_limit(
768        self,
769        sorts: impl IntoIterator<Item = impl Into<SortExpr>> + Clone,
770        fetch: Option<usize>,
771    ) -> Result<Self> {
772        let sorts = rewrite_sort_cols_by_aggs(sorts, &self.plan)?;
773
774        let schema = self.plan.schema();
775
776        // Collect sort columns that are missing in the input plan's schema
777        let mut missing_cols: IndexSet<Column> = IndexSet::new();
778        sorts.iter().try_for_each::<_, Result<()>>(|sort| {
779            let columns = sort.expr.column_refs();
780
781            missing_cols.extend(
782                columns
783                    .into_iter()
784                    .filter(|c| !schema.has_column(c))
785                    .cloned(),
786            );
787
788            Ok(())
789        })?;
790
791        if missing_cols.is_empty() {
792            return Ok(Self::new(LogicalPlan::Sort(Sort {
793                expr: normalize_sorts(sorts, &self.plan)?,
794                input: self.plan,
795                fetch,
796            })));
797        }
798
799        // remove pushed down sort columns
800        let new_expr = schema.columns().into_iter().map(Expr::Column).collect();
801
802        let is_distinct = false;
803        let plan = Self::add_missing_columns(
804            Arc::unwrap_or_clone(self.plan),
805            &missing_cols,
806            is_distinct,
807        )?;
808
809        let sort_plan = LogicalPlan::Sort(Sort {
810            expr: normalize_sorts(sorts, &plan)?,
811            input: Arc::new(plan),
812            fetch,
813        });
814
815        Projection::try_new(new_expr, Arc::new(sort_plan))
816            .map(LogicalPlan::Projection)
817            .map(Self::new)
818    }
819
820    /// Apply a union, preserving duplicate rows
821    pub fn union(self, plan: LogicalPlan) -> Result<Self> {
822        union(Arc::unwrap_or_clone(self.plan), plan).map(Self::new)
823    }
824
825    /// Apply a union by name, preserving duplicate rows
826    pub fn union_by_name(self, plan: LogicalPlan) -> Result<Self> {
827        union_by_name(Arc::unwrap_or_clone(self.plan), plan).map(Self::new)
828    }
829
830    /// Apply a union by name, removing duplicate rows
831    pub fn union_by_name_distinct(self, plan: LogicalPlan) -> Result<Self> {
832        let left_plan: LogicalPlan = Arc::unwrap_or_clone(self.plan);
833        let right_plan: LogicalPlan = plan;
834
835        Ok(Self::new(LogicalPlan::Distinct(Distinct::All(Arc::new(
836            union_by_name(left_plan, right_plan)?,
837        )))))
838    }
839
840    /// Apply a union, removing duplicate rows
841    pub fn union_distinct(self, plan: LogicalPlan) -> Result<Self> {
842        let left_plan: LogicalPlan = Arc::unwrap_or_clone(self.plan);
843        let right_plan: LogicalPlan = plan;
844
845        Ok(Self::new(LogicalPlan::Distinct(Distinct::All(Arc::new(
846            union(left_plan, right_plan)?,
847        )))))
848    }
849
850    /// Apply deduplication: Only distinct (different) values are returned)
851    pub fn distinct(self) -> Result<Self> {
852        Ok(Self::new(LogicalPlan::Distinct(Distinct::All(self.plan))))
853    }
854
855    /// Project first values of the specified expression list according to the provided
856    /// sorting expressions grouped by the `DISTINCT ON` clause expressions.
857    pub fn distinct_on(
858        self,
859        on_expr: Vec<Expr>,
860        select_expr: Vec<Expr>,
861        sort_expr: Option<Vec<SortExpr>>,
862    ) -> Result<Self> {
863        Ok(Self::new(LogicalPlan::Distinct(Distinct::On(
864            DistinctOn::try_new(on_expr, select_expr, sort_expr, self.plan)?,
865        ))))
866    }
867
868    /// Apply a join to `right` using explicitly specified columns and an
869    /// optional filter expression.
870    ///
871    /// See [`join_on`](Self::join_on) for a more concise way to specify the
872    /// join condition. Since DataFusion will automatically identify and
873    /// optimize equality predicates there is no performance difference between
874    /// this function and `join_on`
875    ///
876    /// `left_cols` and `right_cols` are used to form "equijoin" predicates (see
877    /// example below), which are then combined with the optional `filter`
878    /// expression.
879    ///
880    /// Note that in case of outer join, the `filter` is applied to only matched rows.
881    pub fn join(
882        self,
883        right: LogicalPlan,
884        join_type: JoinType,
885        join_keys: (Vec<impl Into<Column>>, Vec<impl Into<Column>>),
886        filter: Option<Expr>,
887    ) -> Result<Self> {
888        self.join_detailed(right, join_type, join_keys, filter, false)
889    }
890
891    /// Apply a join using the specified expressions.
892    ///
893    /// Note that DataFusion automatically optimizes joins, including
894    /// identifying and optimizing equality predicates.
895    ///
896    /// # Example
897    ///
898    /// ```
899    /// # use datafusion_expr::{Expr, col, LogicalPlanBuilder,
900    /// #  logical_plan::builder::LogicalTableSource, logical_plan::JoinType,};
901    /// # use std::sync::Arc;
902    /// # use arrow::datatypes::{Schema, DataType, Field};
903    /// # use datafusion_common::Result;
904    /// # fn main() -> Result<()> {
905    /// let example_schema = Arc::new(Schema::new(vec![
906    ///     Field::new("a", DataType::Int32, false),
907    ///     Field::new("b", DataType::Int32, false),
908    ///     Field::new("c", DataType::Int32, false),
909    /// ]));
910    /// let table_source = Arc::new(LogicalTableSource::new(example_schema));
911    /// let left_table = table_source.clone();
912    /// let right_table = table_source.clone();
913    ///
914    /// let right_plan = LogicalPlanBuilder::scan("right", right_table, None)?.build()?;
915    ///
916    /// // Form the expression `(left.a != right.a)` AND `(left.b != right.b)`
917    /// let exprs = vec![
918    ///     col("left.a").eq(col("right.a")),
919    ///     col("left.b").not_eq(col("right.b"))
920    ///  ];
921    ///
922    /// // Perform the equivalent of `left INNER JOIN right ON (a != a2 AND b != b2)`
923    /// // finding all pairs of rows from `left` and `right` where
924    /// // where `a = a2` and `b != b2`.
925    /// let plan = LogicalPlanBuilder::scan("left", left_table, None)?
926    ///     .join_on(right_plan, JoinType::Inner, exprs)?
927    ///     .build()?;
928    /// # Ok(())
929    /// # }
930    /// ```
931    pub fn join_on(
932        self,
933        right: LogicalPlan,
934        join_type: JoinType,
935        on_exprs: impl IntoIterator<Item = Expr>,
936    ) -> Result<Self> {
937        let filter = on_exprs.into_iter().reduce(Expr::and);
938
939        self.join_detailed(
940            right,
941            join_type,
942            (Vec::<Column>::new(), Vec::<Column>::new()),
943            filter,
944            false,
945        )
946    }
947
948    pub(crate) fn normalize(
949        plan: &LogicalPlan,
950        column: impl Into<Column>,
951    ) -> Result<Column> {
952        let column = column.into();
953        if column.relation.is_some() {
954            // column is already normalized
955            return Ok(column);
956        }
957
958        let schema = plan.schema();
959        let fallback_schemas = plan.fallback_normalize_schemas();
960        let using_columns = plan.using_columns()?;
961        column.normalize_with_schemas_and_ambiguity_check(
962            &[&[schema], &fallback_schemas],
963            &using_columns,
964        )
965    }
966
967    /// Apply a join with on constraint and specified null equality.
968    ///
969    /// The behavior is the same as [`join`](Self::join) except that it allows
970    /// specifying the null equality behavior.
971    ///
972    /// If `null_equals_null=true`, rows where both join keys are `null` will be
973    /// emitted. Otherwise rows where either or both join keys are `null` will be
974    /// omitted.
975    pub fn join_detailed(
976        self,
977        right: LogicalPlan,
978        join_type: JoinType,
979        join_keys: (Vec<impl Into<Column>>, Vec<impl Into<Column>>),
980        filter: Option<Expr>,
981        null_equals_null: bool,
982    ) -> Result<Self> {
983        if join_keys.0.len() != join_keys.1.len() {
984            return plan_err!("left_keys and right_keys were not the same length");
985        }
986
987        let filter = if let Some(expr) = filter {
988            let filter = normalize_col_with_schemas_and_ambiguity_check(
989                expr,
990                &[&[self.schema(), right.schema()]],
991                &[],
992            )?;
993            Some(filter)
994        } else {
995            None
996        };
997
998        let (left_keys, right_keys): (Vec<Result<Column>>, Vec<Result<Column>>) =
999            join_keys
1000                .0
1001                .into_iter()
1002                .zip(join_keys.1)
1003                .map(|(l, r)| {
1004                    let l = l.into();
1005                    let r = r.into();
1006
1007                    match (&l.relation, &r.relation) {
1008                        (Some(lr), Some(rr)) => {
1009                            let l_is_left =
1010                                self.plan.schema().field_with_qualified_name(lr, &l.name);
1011                            let l_is_right =
1012                                right.schema().field_with_qualified_name(lr, &l.name);
1013                            let r_is_left =
1014                                self.plan.schema().field_with_qualified_name(rr, &r.name);
1015                            let r_is_right =
1016                                right.schema().field_with_qualified_name(rr, &r.name);
1017
1018                            match (l_is_left, l_is_right, r_is_left, r_is_right) {
1019                                (_, Ok(_), Ok(_), _) => (Ok(r), Ok(l)),
1020                                (Ok(_), _, _, Ok(_)) => (Ok(l), Ok(r)),
1021                                _ => (
1022                                    Self::normalize(&self.plan, l),
1023                                    Self::normalize(&right, r),
1024                                ),
1025                            }
1026                        }
1027                        (Some(lr), None) => {
1028                            let l_is_left =
1029                                self.plan.schema().field_with_qualified_name(lr, &l.name);
1030                            let l_is_right =
1031                                right.schema().field_with_qualified_name(lr, &l.name);
1032
1033                            match (l_is_left, l_is_right) {
1034                                (Ok(_), _) => (Ok(l), Self::normalize(&right, r)),
1035                                (_, Ok(_)) => (Self::normalize(&self.plan, r), Ok(l)),
1036                                _ => (
1037                                    Self::normalize(&self.plan, l),
1038                                    Self::normalize(&right, r),
1039                                ),
1040                            }
1041                        }
1042                        (None, Some(rr)) => {
1043                            let r_is_left =
1044                                self.plan.schema().field_with_qualified_name(rr, &r.name);
1045                            let r_is_right =
1046                                right.schema().field_with_qualified_name(rr, &r.name);
1047
1048                            match (r_is_left, r_is_right) {
1049                                (Ok(_), _) => (Ok(r), Self::normalize(&right, l)),
1050                                (_, Ok(_)) => (Self::normalize(&self.plan, l), Ok(r)),
1051                                _ => (
1052                                    Self::normalize(&self.plan, l),
1053                                    Self::normalize(&right, r),
1054                                ),
1055                            }
1056                        }
1057                        (None, None) => {
1058                            let mut swap = false;
1059                            let left_key = Self::normalize(&self.plan, l.clone())
1060                                .or_else(|_| {
1061                                    swap = true;
1062                                    Self::normalize(&right, l)
1063                                });
1064                            if swap {
1065                                (Self::normalize(&self.plan, r), left_key)
1066                            } else {
1067                                (left_key, Self::normalize(&right, r))
1068                            }
1069                        }
1070                    }
1071                })
1072                .unzip();
1073
1074        let left_keys = left_keys.into_iter().collect::<Result<Vec<Column>>>()?;
1075        let right_keys = right_keys.into_iter().collect::<Result<Vec<Column>>>()?;
1076
1077        let on: Vec<_> = left_keys
1078            .into_iter()
1079            .zip(right_keys)
1080            .map(|(l, r)| (Expr::Column(l), Expr::Column(r)))
1081            .collect();
1082        let join_schema =
1083            build_join_schema(self.plan.schema(), right.schema(), &join_type)?;
1084
1085        // Inner type without join condition is cross join
1086        if join_type != JoinType::Inner && on.is_empty() && filter.is_none() {
1087            return plan_err!("join condition should not be empty");
1088        }
1089
1090        Ok(Self::new(LogicalPlan::Join(Join {
1091            left: self.plan,
1092            right: Arc::new(right),
1093            on,
1094            filter,
1095            join_type,
1096            join_constraint: JoinConstraint::On,
1097            schema: DFSchemaRef::new(join_schema),
1098            null_equals_null,
1099        })))
1100    }
1101
1102    /// Apply a join with using constraint, which duplicates all join columns in output schema.
1103    pub fn join_using(
1104        self,
1105        right: LogicalPlan,
1106        join_type: JoinType,
1107        using_keys: Vec<impl Into<Column> + Clone>,
1108    ) -> Result<Self> {
1109        let left_keys: Vec<Column> = using_keys
1110            .clone()
1111            .into_iter()
1112            .map(|c| Self::normalize(&self.plan, c))
1113            .collect::<Result<_>>()?;
1114        let right_keys: Vec<Column> = using_keys
1115            .into_iter()
1116            .map(|c| Self::normalize(&right, c))
1117            .collect::<Result<_>>()?;
1118
1119        let on: Vec<(_, _)> = left_keys.into_iter().zip(right_keys).collect();
1120        let join_schema =
1121            build_join_schema(self.plan.schema(), right.schema(), &join_type)?;
1122        let mut join_on: Vec<(Expr, Expr)> = vec![];
1123        let mut filters: Option<Expr> = None;
1124        for (l, r) in &on {
1125            if self.plan.schema().has_column(l)
1126                && right.schema().has_column(r)
1127                && can_hash(self.plan.schema().field_from_column(l)?.data_type())
1128            {
1129                join_on.push((Expr::Column(l.clone()), Expr::Column(r.clone())));
1130            } else if self.plan.schema().has_column(l)
1131                && right.schema().has_column(r)
1132                && can_hash(self.plan.schema().field_from_column(r)?.data_type())
1133            {
1134                join_on.push((Expr::Column(r.clone()), Expr::Column(l.clone())));
1135            } else {
1136                let expr = binary_expr(
1137                    Expr::Column(l.clone()),
1138                    Operator::Eq,
1139                    Expr::Column(r.clone()),
1140                );
1141                match filters {
1142                    None => filters = Some(expr),
1143                    Some(filter_expr) => filters = Some(and(expr, filter_expr)),
1144                }
1145            }
1146        }
1147
1148        if join_on.is_empty() {
1149            let join = Self::from(self.plan).cross_join(right)?;
1150            join.filter(filters.ok_or_else(|| {
1151                DataFusionError::Internal("filters should not be None here".to_string())
1152            })?)
1153        } else {
1154            Ok(Self::new(LogicalPlan::Join(Join {
1155                left: self.plan,
1156                right: Arc::new(right),
1157                on: join_on,
1158                filter: filters,
1159                join_type,
1160                join_constraint: JoinConstraint::Using,
1161                schema: DFSchemaRef::new(join_schema),
1162                null_equals_null: false,
1163            })))
1164        }
1165    }
1166
1167    /// Apply a cross join
1168    pub fn cross_join(self, right: LogicalPlan) -> Result<Self> {
1169        let join_schema =
1170            build_join_schema(self.plan.schema(), right.schema(), &JoinType::Inner)?;
1171        Ok(Self::new(LogicalPlan::Join(Join {
1172            left: self.plan,
1173            right: Arc::new(right),
1174            on: vec![],
1175            filter: None,
1176            join_type: JoinType::Inner,
1177            join_constraint: JoinConstraint::On,
1178            null_equals_null: false,
1179            schema: DFSchemaRef::new(join_schema),
1180        })))
1181    }
1182
1183    /// Repartition
1184    pub fn repartition(self, partitioning_scheme: Partitioning) -> Result<Self> {
1185        Ok(Self::new(LogicalPlan::Repartition(Repartition {
1186            input: self.plan,
1187            partitioning_scheme,
1188        })))
1189    }
1190
1191    /// Apply a window functions to extend the schema
1192    pub fn window(
1193        self,
1194        window_expr: impl IntoIterator<Item = impl Into<Expr>>,
1195    ) -> Result<Self> {
1196        let window_expr = normalize_cols(window_expr, &self.plan)?;
1197        validate_unique_names("Windows", &window_expr)?;
1198        Ok(Self::new(LogicalPlan::Window(Window::try_new(
1199            window_expr,
1200            self.plan,
1201        )?)))
1202    }
1203
1204    /// Apply an aggregate: grouping on the `group_expr` expressions
1205    /// and calculating `aggr_expr` aggregates for each distinct
1206    /// value of the `group_expr`;
1207    pub fn aggregate(
1208        self,
1209        group_expr: impl IntoIterator<Item = impl Into<Expr>>,
1210        aggr_expr: impl IntoIterator<Item = impl Into<Expr>>,
1211    ) -> Result<Self> {
1212        let group_expr = normalize_cols(group_expr, &self.plan)?;
1213        let aggr_expr = normalize_cols(aggr_expr, &self.plan)?;
1214
1215        let group_expr = if self.options.add_implicit_group_by_exprs {
1216            add_group_by_exprs_from_dependencies(group_expr, self.plan.schema())?
1217        } else {
1218            group_expr
1219        };
1220
1221        Aggregate::try_new(self.plan, group_expr, aggr_expr)
1222            .map(LogicalPlan::Aggregate)
1223            .map(Self::new)
1224    }
1225
1226    /// Create an expression to represent the explanation of the plan
1227    ///
1228    /// if `analyze` is true, runs the actual plan and produces
1229    /// information about metrics during run.
1230    ///
1231    /// if `verbose` is true, prints out additional details.
1232    pub fn explain(self, verbose: bool, analyze: bool) -> Result<Self> {
1233        let schema = LogicalPlan::explain_schema();
1234        let schema = schema.to_dfschema_ref()?;
1235
1236        if analyze {
1237            Ok(Self::new(LogicalPlan::Analyze(Analyze {
1238                verbose,
1239                input: self.plan,
1240                schema,
1241            })))
1242        } else {
1243            let stringified_plans =
1244                vec![self.plan.to_stringified(PlanType::InitialLogicalPlan)];
1245
1246            Ok(Self::new(LogicalPlan::Explain(Explain {
1247                verbose,
1248                plan: self.plan,
1249                explain_format: ExplainFormat::Indent,
1250                stringified_plans,
1251                schema,
1252                logical_optimization_succeeded: false,
1253            })))
1254        }
1255    }
1256
1257    /// Process intersect set operator
1258    pub fn intersect(
1259        left_plan: LogicalPlan,
1260        right_plan: LogicalPlan,
1261        is_all: bool,
1262    ) -> Result<LogicalPlan> {
1263        LogicalPlanBuilder::intersect_or_except(
1264            left_plan,
1265            right_plan,
1266            JoinType::LeftSemi,
1267            is_all,
1268        )
1269    }
1270
1271    /// Process except set operator
1272    pub fn except(
1273        left_plan: LogicalPlan,
1274        right_plan: LogicalPlan,
1275        is_all: bool,
1276    ) -> Result<LogicalPlan> {
1277        LogicalPlanBuilder::intersect_or_except(
1278            left_plan,
1279            right_plan,
1280            JoinType::LeftAnti,
1281            is_all,
1282        )
1283    }
1284
1285    /// Process intersect or except
1286    fn intersect_or_except(
1287        left_plan: LogicalPlan,
1288        right_plan: LogicalPlan,
1289        join_type: JoinType,
1290        is_all: bool,
1291    ) -> Result<LogicalPlan> {
1292        let left_len = left_plan.schema().fields().len();
1293        let right_len = right_plan.schema().fields().len();
1294
1295        if left_len != right_len {
1296            return plan_err!(
1297                "INTERSECT/EXCEPT query must have the same number of columns. Left is {left_len} and right is {right_len}."
1298            );
1299        }
1300
1301        let join_keys = left_plan
1302            .schema()
1303            .fields()
1304            .iter()
1305            .zip(right_plan.schema().fields().iter())
1306            .map(|(left_field, right_field)| {
1307                (
1308                    (Column::from_name(left_field.name())),
1309                    (Column::from_name(right_field.name())),
1310                )
1311            })
1312            .unzip();
1313        if is_all {
1314            LogicalPlanBuilder::from(left_plan)
1315                .join_detailed(right_plan, join_type, join_keys, None, true)?
1316                .build()
1317        } else {
1318            LogicalPlanBuilder::from(left_plan)
1319                .distinct()?
1320                .join_detailed(right_plan, join_type, join_keys, None, true)?
1321                .build()
1322        }
1323    }
1324
1325    /// Build the plan
1326    pub fn build(self) -> Result<LogicalPlan> {
1327        Ok(Arc::unwrap_or_clone(self.plan))
1328    }
1329
1330    /// Apply a join with both explicit equijoin and non equijoin predicates.
1331    ///
1332    /// Note this is a low level API that requires identifying specific
1333    /// predicate types. Most users should use  [`join_on`](Self::join_on) that
1334    /// automatically identifies predicates appropriately.
1335    ///
1336    /// `equi_exprs` defines equijoin predicates, of the form `l = r)` for each
1337    /// `(l, r)` tuple. `l`, the first element of the tuple, must only refer
1338    /// to columns from the existing input. `r`, the second element of the tuple,
1339    /// must only refer to columns from the right input.
1340    ///
1341    /// `filter` contains any other other filter expression to apply during the
1342    /// join. Note that `equi_exprs` predicates are evaluated more efficiently
1343    /// than the filter expressions, so they are preferred.
1344    pub fn join_with_expr_keys(
1345        self,
1346        right: LogicalPlan,
1347        join_type: JoinType,
1348        equi_exprs: (Vec<impl Into<Expr>>, Vec<impl Into<Expr>>),
1349        filter: Option<Expr>,
1350    ) -> Result<Self> {
1351        if equi_exprs.0.len() != equi_exprs.1.len() {
1352            return plan_err!("left_keys and right_keys were not the same length");
1353        }
1354
1355        let join_key_pairs = equi_exprs
1356            .0
1357            .into_iter()
1358            .zip(equi_exprs.1)
1359            .map(|(l, r)| {
1360                let left_key = l.into();
1361                let right_key = r.into();
1362                let mut left_using_columns  = HashSet::new();
1363                expr_to_columns(&left_key, &mut left_using_columns)?;
1364                let normalized_left_key = normalize_col_with_schemas_and_ambiguity_check(
1365                    left_key,
1366                    &[&[self.plan.schema()]],
1367                    &[],
1368                )?;
1369
1370                let mut right_using_columns = HashSet::new();
1371                expr_to_columns(&right_key, &mut right_using_columns)?;
1372                let normalized_right_key = normalize_col_with_schemas_and_ambiguity_check(
1373                    right_key,
1374                    &[&[right.schema()]],
1375                    &[],
1376                )?;
1377
1378                // find valid equijoin
1379                find_valid_equijoin_key_pair(
1380                        &normalized_left_key,
1381                        &normalized_right_key,
1382                        self.plan.schema(),
1383                        right.schema(),
1384                    )?.ok_or_else(||
1385                        plan_datafusion_err!(
1386                            "can't create join plan, join key should belong to one input, error key: ({normalized_left_key},{normalized_right_key})"
1387                        ))
1388            })
1389            .collect::<Result<Vec<_>>>()?;
1390
1391        let join_schema =
1392            build_join_schema(self.plan.schema(), right.schema(), &join_type)?;
1393
1394        Ok(Self::new(LogicalPlan::Join(Join {
1395            left: self.plan,
1396            right: Arc::new(right),
1397            on: join_key_pairs,
1398            filter,
1399            join_type,
1400            join_constraint: JoinConstraint::On,
1401            schema: DFSchemaRef::new(join_schema),
1402            null_equals_null: false,
1403        })))
1404    }
1405
1406    /// Unnest the given column.
1407    pub fn unnest_column(self, column: impl Into<Column>) -> Result<Self> {
1408        unnest(Arc::unwrap_or_clone(self.plan), vec![column.into()]).map(Self::new)
1409    }
1410
1411    /// Unnest the given column given [`UnnestOptions`]
1412    pub fn unnest_column_with_options(
1413        self,
1414        column: impl Into<Column>,
1415        options: UnnestOptions,
1416    ) -> Result<Self> {
1417        unnest_with_options(
1418            Arc::unwrap_or_clone(self.plan),
1419            vec![column.into()],
1420            options,
1421        )
1422        .map(Self::new)
1423    }
1424
1425    /// Unnest the given columns with the given [`UnnestOptions`]
1426    pub fn unnest_columns_with_options(
1427        self,
1428        columns: Vec<Column>,
1429        options: UnnestOptions,
1430    ) -> Result<Self> {
1431        unnest_with_options(Arc::unwrap_or_clone(self.plan), columns, options)
1432            .map(Self::new)
1433    }
1434}
1435
1436impl From<LogicalPlan> for LogicalPlanBuilder {
1437    fn from(plan: LogicalPlan) -> Self {
1438        LogicalPlanBuilder::new(plan)
1439    }
1440}
1441
1442impl From<Arc<LogicalPlan>> for LogicalPlanBuilder {
1443    fn from(plan: Arc<LogicalPlan>) -> Self {
1444        LogicalPlanBuilder::new_from_arc(plan)
1445    }
1446}
1447
1448/// Container used when building fields for a `VALUES` node.
1449#[derive(Default)]
1450struct ValuesFields {
1451    inner: Vec<Field>,
1452}
1453
1454impl ValuesFields {
1455    pub fn new() -> Self {
1456        Self::default()
1457    }
1458
1459    pub fn push(&mut self, data_type: DataType, nullable: bool) {
1460        // Naming follows the convention described here:
1461        // https://www.postgresql.org/docs/current/queries-values.html
1462        let name = format!("column{}", self.inner.len() + 1);
1463        self.inner.push(Field::new(name, data_type, nullable));
1464    }
1465
1466    pub fn into_fields(self) -> Fields {
1467        self.inner.into()
1468    }
1469}
1470
1471// `name_map` tracks a mapping between a field name and the number of appearances of that field.
1472//
1473// Some field names might already come to this function with the count (number of times it appeared)
1474// as a sufix e.g. id:1, so there's still a chance of name collisions, for example,
1475// if these three fields passed to this function: "col:1", "col" and "col", the function
1476// would rename them to -> col:1, col, col:1 causing a posteriror error when building the DFSchema.
1477// that's why we need the `seen` set, so the fields are always unique.
1478//
1479pub fn change_redundant_column(fields: &Fields) -> Vec<Field> {
1480    let mut name_map = HashMap::new();
1481    let mut seen: HashSet<String> = HashSet::new();
1482
1483    fields
1484        .into_iter()
1485        .map(|field| {
1486            let base_name = field.name();
1487            let count = name_map.entry(base_name.clone()).or_insert(0);
1488            let mut new_name = base_name.clone();
1489
1490            // Loop until we find a name that hasn't been used
1491            while seen.contains(&new_name) {
1492                *count += 1;
1493                new_name = format!("{}:{}", base_name, count);
1494            }
1495
1496            seen.insert(new_name.clone());
1497
1498            let mut modified_field =
1499                Field::new(&new_name, field.data_type().clone(), field.is_nullable());
1500            modified_field.set_metadata(field.metadata().clone());
1501            modified_field
1502        })
1503        .collect()
1504}
1505
1506fn mark_field(schema: &DFSchema) -> (Option<TableReference>, Arc<Field>) {
1507    let mut table_references = schema
1508        .iter()
1509        .filter_map(|(qualifier, _)| qualifier)
1510        .collect::<Vec<_>>();
1511    table_references.dedup();
1512    let table_reference = if table_references.len() == 1 {
1513        table_references.pop().cloned()
1514    } else {
1515        None
1516    };
1517
1518    (
1519        table_reference,
1520        Arc::new(Field::new("mark", DataType::Boolean, false)),
1521    )
1522}
1523
1524/// Creates a schema for a join operation.
1525/// The fields from the left side are first
1526pub fn build_join_schema(
1527    left: &DFSchema,
1528    right: &DFSchema,
1529    join_type: &JoinType,
1530) -> Result<DFSchema> {
1531    fn nullify_fields<'a>(
1532        fields: impl Iterator<Item = (Option<&'a TableReference>, &'a Arc<Field>)>,
1533    ) -> Vec<(Option<TableReference>, Arc<Field>)> {
1534        fields
1535            .map(|(q, f)| {
1536                // TODO: find a good way to do that
1537                let field = f.as_ref().clone().with_nullable(true);
1538                (q.cloned(), Arc::new(field))
1539            })
1540            .collect()
1541    }
1542
1543    let right_fields = right.iter();
1544    let left_fields = left.iter();
1545
1546    let qualified_fields: Vec<(Option<TableReference>, Arc<Field>)> = match join_type {
1547        JoinType::Inner => {
1548            // left then right
1549            let left_fields = left_fields
1550                .map(|(q, f)| (q.cloned(), Arc::clone(f)))
1551                .collect::<Vec<_>>();
1552            let right_fields = right_fields
1553                .map(|(q, f)| (q.cloned(), Arc::clone(f)))
1554                .collect::<Vec<_>>();
1555            left_fields.into_iter().chain(right_fields).collect()
1556        }
1557        JoinType::Left => {
1558            // left then right, right set to nullable in case of not matched scenario
1559            let left_fields = left_fields
1560                .map(|(q, f)| (q.cloned(), Arc::clone(f)))
1561                .collect::<Vec<_>>();
1562            left_fields
1563                .into_iter()
1564                .chain(nullify_fields(right_fields))
1565                .collect()
1566        }
1567        JoinType::Right => {
1568            // left then right, left set to nullable in case of not matched scenario
1569            let right_fields = right_fields
1570                .map(|(q, f)| (q.cloned(), Arc::clone(f)))
1571                .collect::<Vec<_>>();
1572            nullify_fields(left_fields)
1573                .into_iter()
1574                .chain(right_fields)
1575                .collect()
1576        }
1577        JoinType::Full => {
1578            // left then right, all set to nullable in case of not matched scenario
1579            nullify_fields(left_fields)
1580                .into_iter()
1581                .chain(nullify_fields(right_fields))
1582                .collect()
1583        }
1584        JoinType::LeftSemi | JoinType::LeftAnti => {
1585            // Only use the left side for the schema
1586            left_fields
1587                .map(|(q, f)| (q.cloned(), Arc::clone(f)))
1588                .collect()
1589        }
1590        JoinType::LeftMark => left_fields
1591            .map(|(q, f)| (q.cloned(), Arc::clone(f)))
1592            .chain(once(mark_field(right)))
1593            .collect(),
1594        JoinType::RightSemi | JoinType::RightAnti => {
1595            // Only use the right side for the schema
1596            right_fields
1597                .map(|(q, f)| (q.cloned(), Arc::clone(f)))
1598                .collect()
1599        }
1600    };
1601    let func_dependencies = left.functional_dependencies().join(
1602        right.functional_dependencies(),
1603        join_type,
1604        left.fields().len(),
1605    );
1606    let metadata = left
1607        .metadata()
1608        .clone()
1609        .into_iter()
1610        .chain(right.metadata().clone())
1611        .collect();
1612    let dfschema = DFSchema::new_with_metadata(qualified_fields, metadata)?;
1613    dfschema.with_functional_dependencies(func_dependencies)
1614}
1615
1616/// Add additional "synthetic" group by expressions based on functional
1617/// dependencies.
1618///
1619/// For example, if we are grouping on `[c1]`, and we know from
1620/// functional dependencies that column `c1` determines `c2`, this function
1621/// adds `c2` to the group by list.
1622///
1623/// This allows MySQL style selects like
1624/// `SELECT col FROM t WHERE pk = 5` if col is unique
1625pub fn add_group_by_exprs_from_dependencies(
1626    mut group_expr: Vec<Expr>,
1627    schema: &DFSchemaRef,
1628) -> Result<Vec<Expr>> {
1629    // Names of the fields produced by the GROUP BY exprs for example, `GROUP BY
1630    // c1 + 1` produces an output field named `"c1 + 1"`
1631    let mut group_by_field_names = group_expr
1632        .iter()
1633        .map(|e| e.schema_name().to_string())
1634        .collect::<Vec<_>>();
1635
1636    if let Some(target_indices) =
1637        get_target_functional_dependencies(schema, &group_by_field_names)
1638    {
1639        for idx in target_indices {
1640            let expr = Expr::Column(Column::from(schema.qualified_field(idx)));
1641            let expr_name = expr.schema_name().to_string();
1642            if !group_by_field_names.contains(&expr_name) {
1643                group_by_field_names.push(expr_name);
1644                group_expr.push(expr);
1645            }
1646        }
1647    }
1648    Ok(group_expr)
1649}
1650
1651/// Errors if one or more expressions have equal names.
1652pub fn validate_unique_names<'a>(
1653    node_name: &str,
1654    expressions: impl IntoIterator<Item = &'a Expr>,
1655) -> Result<()> {
1656    let mut unique_names = HashMap::new();
1657
1658    expressions.into_iter().enumerate().try_for_each(|(position, expr)| {
1659        let name = expr.schema_name().to_string();
1660        match unique_names.get(&name) {
1661            None => {
1662                unique_names.insert(name, (position, expr));
1663                Ok(())
1664            },
1665            Some((existing_position, existing_expr)) => {
1666                plan_err!("{node_name} require unique expression names \
1667                             but the expression \"{existing_expr}\" at position {existing_position} and \"{expr}\" \
1668                             at position {position} have the same name. Consider aliasing (\"AS\") one of them."
1669                            )
1670            }
1671        }
1672    })
1673}
1674
1675/// Union two [`LogicalPlan`]s.
1676///
1677/// Constructs the UNION plan, but does not perform type-coercion. Therefore the
1678/// subtree expressions will not be properly typed until the optimizer pass.
1679///
1680/// If a properly typed UNION plan is needed, refer to [`TypeCoercionRewriter::coerce_union`]
1681/// or alternatively, merge the union input schema using [`coerce_union_schema`] and
1682/// apply the expression rewrite with [`coerce_plan_expr_for_schema`].
1683///
1684/// [`TypeCoercionRewriter::coerce_union`]: https://docs.rs/datafusion-optimizer/latest/datafusion_optimizer/analyzer/type_coercion/struct.TypeCoercionRewriter.html#method.coerce_union
1685/// [`coerce_union_schema`]: https://docs.rs/datafusion-optimizer/latest/datafusion_optimizer/analyzer/type_coercion/fn.coerce_union_schema.html
1686pub fn union(left_plan: LogicalPlan, right_plan: LogicalPlan) -> Result<LogicalPlan> {
1687    Ok(LogicalPlan::Union(Union::try_new_with_loose_types(vec![
1688        Arc::new(left_plan),
1689        Arc::new(right_plan),
1690    ])?))
1691}
1692
1693/// Like [`union`], but combine rows from different tables by name, rather than
1694/// by position.
1695pub fn union_by_name(
1696    left_plan: LogicalPlan,
1697    right_plan: LogicalPlan,
1698) -> Result<LogicalPlan> {
1699    Ok(LogicalPlan::Union(Union::try_new_by_name(vec![
1700        Arc::new(left_plan),
1701        Arc::new(right_plan),
1702    ])?))
1703}
1704
1705/// Create Projection
1706/// # Errors
1707/// This function errors under any of the following conditions:
1708/// * Two or more expressions have the same name
1709/// * An invalid expression is used (e.g. a `sort` expression)
1710pub fn project(
1711    plan: LogicalPlan,
1712    expr: impl IntoIterator<Item = impl Into<SelectExpr>>,
1713) -> Result<LogicalPlan> {
1714    project_with_validation(plan, expr.into_iter().map(|e| (e, true)))
1715}
1716
1717/// Create Projection. Similar to project except that the expressions
1718/// passed in have a flag to indicate if that expression requires
1719/// validation (normalize & columnize) (true) or not (false)
1720/// # Errors
1721/// This function errors under any of the following conditions:
1722/// * Two or more expressions have the same name
1723/// * An invalid expression is used (e.g. a `sort` expression)
1724fn project_with_validation(
1725    plan: LogicalPlan,
1726    expr: impl IntoIterator<Item = (impl Into<SelectExpr>, bool)>,
1727) -> Result<LogicalPlan> {
1728    let mut projected_expr = vec![];
1729    for (e, validate) in expr {
1730        let e = e.into();
1731        match e {
1732            SelectExpr::Wildcard(opt) => {
1733                let expanded = expand_wildcard(plan.schema(), &plan, Some(&opt))?;
1734
1735                // If there is a REPLACE statement, replace that column with the given
1736                // replace expression. Column name remains the same.
1737                let expanded = if let Some(replace) = opt.replace {
1738                    replace_columns(expanded, &replace)?
1739                } else {
1740                    expanded
1741                };
1742
1743                for e in expanded {
1744                    if validate {
1745                        projected_expr
1746                            .push(columnize_expr(normalize_col(e, &plan)?, &plan)?)
1747                    } else {
1748                        projected_expr.push(e)
1749                    }
1750                }
1751            }
1752            SelectExpr::QualifiedWildcard(table_ref, opt) => {
1753                let expanded =
1754                    expand_qualified_wildcard(&table_ref, plan.schema(), Some(&opt))?;
1755
1756                // If there is a REPLACE statement, replace that column with the given
1757                // replace expression. Column name remains the same.
1758                let expanded = if let Some(replace) = opt.replace {
1759                    replace_columns(expanded, &replace)?
1760                } else {
1761                    expanded
1762                };
1763
1764                for e in expanded {
1765                    if validate {
1766                        projected_expr
1767                            .push(columnize_expr(normalize_col(e, &plan)?, &plan)?)
1768                    } else {
1769                        projected_expr.push(e)
1770                    }
1771                }
1772            }
1773            SelectExpr::Expression(e) => {
1774                if validate {
1775                    projected_expr.push(columnize_expr(normalize_col(e, &plan)?, &plan)?)
1776                } else {
1777                    projected_expr.push(e)
1778                }
1779            }
1780        }
1781    }
1782    validate_unique_names("Projections", projected_expr.iter())?;
1783
1784    Projection::try_new(projected_expr, Arc::new(plan)).map(LogicalPlan::Projection)
1785}
1786
1787/// If there is a REPLACE statement in the projected expression in the form of
1788/// "REPLACE (some_column_within_an_expr AS some_column)", this function replaces
1789/// that column with the given replace expression. Column name remains the same.
1790/// Multiple REPLACEs are also possible with comma separations.
1791fn replace_columns(
1792    mut exprs: Vec<Expr>,
1793    replace: &PlannedReplaceSelectItem,
1794) -> Result<Vec<Expr>> {
1795    for expr in exprs.iter_mut() {
1796        if let Expr::Column(Column { name, .. }) = expr {
1797            if let Some((_, new_expr)) = replace
1798                .items()
1799                .iter()
1800                .zip(replace.expressions().iter())
1801                .find(|(item, _)| item.column_name.value == *name)
1802            {
1803                *expr = new_expr.clone().alias(name.clone())
1804            }
1805        }
1806    }
1807    Ok(exprs)
1808}
1809
1810/// Create a SubqueryAlias to wrap a LogicalPlan.
1811pub fn subquery_alias(
1812    plan: LogicalPlan,
1813    alias: impl Into<TableReference>,
1814) -> Result<LogicalPlan> {
1815    SubqueryAlias::try_new(Arc::new(plan), alias).map(LogicalPlan::SubqueryAlias)
1816}
1817
1818/// Create a LogicalPlanBuilder representing a scan of a table with the provided name and schema.
1819/// This is mostly used for testing and documentation.
1820pub fn table_scan(
1821    name: Option<impl Into<TableReference>>,
1822    table_schema: &Schema,
1823    projection: Option<Vec<usize>>,
1824) -> Result<LogicalPlanBuilder> {
1825    table_scan_with_filters(name, table_schema, projection, vec![])
1826}
1827
1828/// Create a LogicalPlanBuilder representing a scan of a table with the provided name and schema,
1829/// and inlined filters.
1830/// This is mostly used for testing and documentation.
1831pub fn table_scan_with_filters(
1832    name: Option<impl Into<TableReference>>,
1833    table_schema: &Schema,
1834    projection: Option<Vec<usize>>,
1835    filters: Vec<Expr>,
1836) -> Result<LogicalPlanBuilder> {
1837    let table_source = table_source(table_schema);
1838    let name = name
1839        .map(|n| n.into())
1840        .unwrap_or_else(|| TableReference::bare(UNNAMED_TABLE));
1841    LogicalPlanBuilder::scan_with_filters(name, table_source, projection, filters)
1842}
1843
1844/// Create a LogicalPlanBuilder representing a scan of a table with the provided name and schema,
1845/// filters, and inlined fetch.
1846/// This is mostly used for testing and documentation.
1847pub fn table_scan_with_filter_and_fetch(
1848    name: Option<impl Into<TableReference>>,
1849    table_schema: &Schema,
1850    projection: Option<Vec<usize>>,
1851    filters: Vec<Expr>,
1852    fetch: Option<usize>,
1853) -> Result<LogicalPlanBuilder> {
1854    let table_source = table_source(table_schema);
1855    let name = name
1856        .map(|n| n.into())
1857        .unwrap_or_else(|| TableReference::bare(UNNAMED_TABLE));
1858    LogicalPlanBuilder::scan_with_filters_fetch(
1859        name,
1860        table_source,
1861        projection,
1862        filters,
1863        fetch,
1864    )
1865}
1866
1867pub fn table_source(table_schema: &Schema) -> Arc<dyn TableSource> {
1868    let table_schema = Arc::new(table_schema.clone());
1869    Arc::new(LogicalTableSource {
1870        table_schema,
1871        constraints: Default::default(),
1872    })
1873}
1874
1875pub fn table_source_with_constraints(
1876    table_schema: &Schema,
1877    constraints: Constraints,
1878) -> Arc<dyn TableSource> {
1879    let table_schema = Arc::new(table_schema.clone());
1880    Arc::new(LogicalTableSource {
1881        table_schema,
1882        constraints,
1883    })
1884}
1885
1886/// Wrap projection for a plan, if the join keys contains normal expression.
1887pub fn wrap_projection_for_join_if_necessary(
1888    join_keys: &[Expr],
1889    input: LogicalPlan,
1890) -> Result<(LogicalPlan, Vec<Column>, bool)> {
1891    let input_schema = input.schema();
1892    let alias_join_keys: Vec<Expr> = join_keys
1893        .iter()
1894        .map(|key| {
1895            // The display_name() of cast expression will ignore the cast info, and show the inner expression name.
1896            // If we do not add alias, it will throw same field name error in the schema when adding projection.
1897            // For example:
1898            //    input scan : [a, b, c],
1899            //    join keys: [cast(a as int)]
1900            //
1901            //  then a and cast(a as int) will use the same field name - `a` in projection schema.
1902            //  https://github.com/apache/datafusion/issues/4478
1903            if matches!(key, Expr::Cast(_)) || matches!(key, Expr::TryCast(_)) {
1904                let alias = format!("{key}");
1905                key.clone().alias(alias)
1906            } else {
1907                key.clone()
1908            }
1909        })
1910        .collect::<Vec<_>>();
1911
1912    let need_project = join_keys.iter().any(|key| !matches!(key, Expr::Column(_)));
1913    let plan = if need_project {
1914        // Include all columns from the input and extend them with the join keys
1915        let mut projection = input_schema
1916            .columns()
1917            .into_iter()
1918            .map(Expr::Column)
1919            .collect::<Vec<_>>();
1920        let join_key_items = alias_join_keys
1921            .iter()
1922            .flat_map(|expr| expr.try_as_col().is_none().then_some(expr))
1923            .cloned()
1924            .collect::<HashSet<Expr>>();
1925        projection.extend(join_key_items);
1926
1927        LogicalPlanBuilder::from(input)
1928            .project(projection.into_iter().map(SelectExpr::from))?
1929            .build()?
1930    } else {
1931        input
1932    };
1933
1934    let join_on = alias_join_keys
1935        .into_iter()
1936        .map(|key| {
1937            if let Some(col) = key.try_as_col() {
1938                Ok(col.clone())
1939            } else {
1940                let name = key.schema_name().to_string();
1941                Ok(Column::from_name(name))
1942            }
1943        })
1944        .collect::<Result<Vec<_>>>()?;
1945
1946    Ok((plan, join_on, need_project))
1947}
1948
1949/// Basic TableSource implementation intended for use in tests and documentation. It is expected
1950/// that users will provide their own TableSource implementations or use DataFusion's
1951/// DefaultTableSource.
1952pub struct LogicalTableSource {
1953    table_schema: SchemaRef,
1954    constraints: Constraints,
1955}
1956
1957impl LogicalTableSource {
1958    /// Create a new LogicalTableSource
1959    pub fn new(table_schema: SchemaRef) -> Self {
1960        Self {
1961            table_schema,
1962            constraints: Constraints::default(),
1963        }
1964    }
1965
1966    pub fn with_constraints(mut self, constraints: Constraints) -> Self {
1967        self.constraints = constraints;
1968        self
1969    }
1970}
1971
1972impl TableSource for LogicalTableSource {
1973    fn as_any(&self) -> &dyn Any {
1974        self
1975    }
1976
1977    fn schema(&self) -> SchemaRef {
1978        Arc::clone(&self.table_schema)
1979    }
1980
1981    fn constraints(&self) -> Option<&Constraints> {
1982        Some(&self.constraints)
1983    }
1984
1985    fn supports_filters_pushdown(
1986        &self,
1987        filters: &[&Expr],
1988    ) -> Result<Vec<TableProviderFilterPushDown>> {
1989        Ok(vec![TableProviderFilterPushDown::Exact; filters.len()])
1990    }
1991}
1992
1993/// Create a [`LogicalPlan::Unnest`] plan
1994pub fn unnest(input: LogicalPlan, columns: Vec<Column>) -> Result<LogicalPlan> {
1995    unnest_with_options(input, columns, UnnestOptions::default())
1996}
1997
1998// Get the data type of a multi-dimensional type after unnesting it
1999// with a given depth
2000fn get_unnested_list_datatype_recursive(
2001    data_type: &DataType,
2002    depth: usize,
2003) -> Result<DataType> {
2004    match data_type {
2005        DataType::List(field)
2006        | DataType::FixedSizeList(field, _)
2007        | DataType::LargeList(field) => {
2008            if depth == 1 {
2009                return Ok(field.data_type().clone());
2010            }
2011            return get_unnested_list_datatype_recursive(field.data_type(), depth - 1);
2012        }
2013        _ => {}
2014    };
2015
2016    internal_err!("trying to unnest on invalid data type {:?}", data_type)
2017}
2018
2019pub fn get_struct_unnested_columns(
2020    col_name: &String,
2021    inner_fields: &Fields,
2022) -> Vec<Column> {
2023    inner_fields
2024        .iter()
2025        .map(|f| Column::from_name(format!("{}.{}", col_name, f.name())))
2026        .collect()
2027}
2028
2029// Based on data type, either struct or a variant of list
2030// return a set of columns as the result of unnesting
2031// the input columns.
2032// For example, given a column with name "a",
2033// - List(Element) returns ["a"] with data type Element
2034// - Struct(field1, field2) returns ["a.field1","a.field2"]
2035// For list data type, an argument depth is used to specify
2036// the recursion level
2037pub fn get_unnested_columns(
2038    col_name: &String,
2039    data_type: &DataType,
2040    depth: usize,
2041) -> Result<Vec<(Column, Arc<Field>)>> {
2042    let mut qualified_columns = Vec::with_capacity(1);
2043
2044    match data_type {
2045        DataType::List(_) | DataType::FixedSizeList(_, _) | DataType::LargeList(_) => {
2046            let data_type = get_unnested_list_datatype_recursive(data_type, depth)?;
2047            let new_field = Arc::new(Field::new(
2048                col_name, data_type,
2049                // Unnesting may produce NULLs even if the list is not null.
2050                // For example: unnest([1], []) -> 1, null
2051                true,
2052            ));
2053            let column = Column::from_name(col_name);
2054            // let column = Column::from((None, &new_field));
2055            qualified_columns.push((column, new_field));
2056        }
2057        DataType::Struct(fields) => {
2058            qualified_columns.extend(fields.iter().map(|f| {
2059                let new_name = format!("{}.{}", col_name, f.name());
2060                let column = Column::from_name(&new_name);
2061                let new_field = f.as_ref().clone().with_name(new_name);
2062                // let column = Column::from((None, &f));
2063                (column, Arc::new(new_field))
2064            }))
2065        }
2066        _ => {
2067            return internal_err!(
2068                "trying to unnest on invalid data type {:?}",
2069                data_type
2070            );
2071        }
2072    };
2073    Ok(qualified_columns)
2074}
2075
2076/// Create a [`LogicalPlan::Unnest`] plan with options
2077/// This function receive a list of columns to be unnested
2078/// because multiple unnest can be performed on the same column (e.g unnest with different depth)
2079/// The new schema will contains post-unnest fields replacing the original field
2080///
2081/// For example:
2082/// Input schema as
2083/// ```text
2084/// +---------------------+-------------------+
2085/// | col1                | col2              |
2086/// +---------------------+-------------------+
2087/// | Struct(INT64,INT32) | List(List(Int64)) |
2088/// +---------------------+-------------------+
2089/// ```
2090///
2091///
2092///
2093/// Then unnesting columns with:
2094/// - (col1,Struct)
2095/// - (col2,List(\[depth=1,depth=2\]))
2096///
2097/// will generate a new schema as
2098/// ```text
2099/// +---------+---------+---------------------+---------------------+
2100/// | col1.c0 | col1.c1 | unnest_col2_depth_1 | unnest_col2_depth_2 |
2101/// +---------+---------+---------------------+---------------------+
2102/// | Int64   | Int32   | List(Int64)         |  Int64              |
2103/// +---------+---------+---------------------+---------------------+
2104/// ```
2105pub fn unnest_with_options(
2106    input: LogicalPlan,
2107    columns_to_unnest: Vec<Column>,
2108    options: UnnestOptions,
2109) -> Result<LogicalPlan> {
2110    let mut list_columns: Vec<(usize, ColumnUnnestList)> = vec![];
2111    let mut struct_columns = vec![];
2112    let indices_to_unnest = columns_to_unnest
2113        .iter()
2114        .map(|c| Ok((input.schema().index_of_column(c)?, c)))
2115        .collect::<Result<HashMap<usize, &Column>>>()?;
2116
2117    let input_schema = input.schema();
2118
2119    let mut dependency_indices = vec![];
2120    // Transform input schema into new schema
2121    // Given this comprehensive example
2122    //
2123    // input schema:
2124    // 1.col1_unnest_placeholder: list[list[int]],
2125    // 2.col1: list[list[int]]
2126    // 3.col2: list[int]
2127    // with unnest on unnest(col1,depth=2), unnest(col1,depth=1) and unnest(col2,depth=1)
2128    // output schema:
2129    // 1.unnest_col1_depth_2: int
2130    // 2.unnest_col1_depth_1: list[int]
2131    // 3.col1: list[list[int]]
2132    // 4.unnest_col2_depth_1: int
2133    // Meaning the placeholder column will be replaced by its unnested variation(s), note
2134    // the plural.
2135    let fields = input_schema
2136        .iter()
2137        .enumerate()
2138        .map(|(index, (original_qualifier, original_field))| {
2139            match indices_to_unnest.get(&index) {
2140                Some(column_to_unnest) => {
2141                    let recursions_on_column = options
2142                        .recursions
2143                        .iter()
2144                        .filter(|p| -> bool { &p.input_column == *column_to_unnest })
2145                        .collect::<Vec<_>>();
2146                    let mut transformed_columns = recursions_on_column
2147                        .iter()
2148                        .map(|r| {
2149                            list_columns.push((
2150                                index,
2151                                ColumnUnnestList {
2152                                    output_column: r.output_column.clone(),
2153                                    depth: r.depth,
2154                                },
2155                            ));
2156                            Ok(get_unnested_columns(
2157                                &r.output_column.name,
2158                                original_field.data_type(),
2159                                r.depth,
2160                            )?
2161                            .into_iter()
2162                            .next()
2163                            .unwrap()) // because unnesting a list column always result into one result
2164                        })
2165                        .collect::<Result<Vec<(Column, Arc<Field>)>>>()?;
2166                    if transformed_columns.is_empty() {
2167                        transformed_columns = get_unnested_columns(
2168                            &column_to_unnest.name,
2169                            original_field.data_type(),
2170                            1,
2171                        )?;
2172                        match original_field.data_type() {
2173                            DataType::Struct(_) => {
2174                                struct_columns.push(index);
2175                            }
2176                            DataType::List(_)
2177                            | DataType::FixedSizeList(_, _)
2178                            | DataType::LargeList(_) => {
2179                                list_columns.push((
2180                                    index,
2181                                    ColumnUnnestList {
2182                                        output_column: Column::from_name(
2183                                            &column_to_unnest.name,
2184                                        ),
2185                                        depth: 1,
2186                                    },
2187                                ));
2188                            }
2189                            _ => {}
2190                        };
2191                    }
2192
2193                    // new columns dependent on the same original index
2194                    dependency_indices
2195                        .extend(std::iter::repeat_n(index, transformed_columns.len()));
2196                    Ok(transformed_columns
2197                        .iter()
2198                        .map(|(col, field)| (col.relation.to_owned(), field.to_owned()))
2199                        .collect())
2200                }
2201                None => {
2202                    dependency_indices.push(index);
2203                    Ok(vec![(
2204                        original_qualifier.cloned(),
2205                        Arc::clone(original_field),
2206                    )])
2207                }
2208            }
2209        })
2210        .collect::<Result<Vec<_>>>()?
2211        .into_iter()
2212        .flatten()
2213        .collect::<Vec<_>>();
2214
2215    let metadata = input_schema.metadata().clone();
2216    let df_schema = DFSchema::new_with_metadata(fields, metadata)?;
2217    // We can use the existing functional dependencies:
2218    let deps = input_schema.functional_dependencies().clone();
2219    let schema = Arc::new(df_schema.with_functional_dependencies(deps)?);
2220
2221    Ok(LogicalPlan::Unnest(Unnest {
2222        input: Arc::new(input),
2223        exec_columns: columns_to_unnest,
2224        list_type_columns: list_columns,
2225        struct_type_columns: struct_columns,
2226        dependency_indices,
2227        schema,
2228        options,
2229    }))
2230}
2231
2232#[cfg(test)]
2233mod tests {
2234    use super::*;
2235    use crate::logical_plan::StringifiedPlan;
2236    use crate::{col, expr, expr_fn::exists, in_subquery, lit, scalar_subquery};
2237
2238    use crate::test::function_stub::sum;
2239    use datafusion_common::{Constraint, RecursionUnnestOption, SchemaError};
2240
2241    #[test]
2242    fn plan_builder_simple() -> Result<()> {
2243        let plan =
2244            table_scan(Some("employee_csv"), &employee_schema(), Some(vec![0, 3]))?
2245                .filter(col("state").eq(lit("CO")))?
2246                .project(vec![col("id")])?
2247                .build()?;
2248
2249        let expected = "Projection: employee_csv.id\
2250        \n  Filter: employee_csv.state = Utf8(\"CO\")\
2251        \n    TableScan: employee_csv projection=[id, state]";
2252
2253        assert_eq!(expected, format!("{plan}"));
2254
2255        Ok(())
2256    }
2257
2258    #[test]
2259    fn plan_builder_schema() {
2260        let schema = employee_schema();
2261        let projection = None;
2262        let plan =
2263            LogicalPlanBuilder::scan("employee_csv", table_source(&schema), projection)
2264                .unwrap();
2265        let expected = DFSchema::try_from_qualified_schema(
2266            TableReference::bare("employee_csv"),
2267            &schema,
2268        )
2269        .unwrap();
2270        assert_eq!(&expected, plan.schema().as_ref());
2271
2272        // Note scan of "EMPLOYEE_CSV" is treated as a SQL identifier
2273        // (and thus normalized to "employee"csv") as well
2274        let projection = None;
2275        let plan =
2276            LogicalPlanBuilder::scan("EMPLOYEE_CSV", table_source(&schema), projection)
2277                .unwrap();
2278        assert_eq!(&expected, plan.schema().as_ref());
2279    }
2280
2281    #[test]
2282    fn plan_builder_empty_name() {
2283        let schema = employee_schema();
2284        let projection = None;
2285        let err =
2286            LogicalPlanBuilder::scan("", table_source(&schema), projection).unwrap_err();
2287        assert_eq!(
2288            err.strip_backtrace(),
2289            "Error during planning: table_name cannot be empty"
2290        );
2291    }
2292
2293    #[test]
2294    fn plan_builder_sort() -> Result<()> {
2295        let plan =
2296            table_scan(Some("employee_csv"), &employee_schema(), Some(vec![3, 4]))?
2297                .sort(vec![
2298                    expr::Sort::new(col("state"), true, true),
2299                    expr::Sort::new(col("salary"), false, false),
2300                ])?
2301                .build()?;
2302
2303        let expected = "Sort: employee_csv.state ASC NULLS FIRST, employee_csv.salary DESC NULLS LAST\
2304        \n  TableScan: employee_csv projection=[state, salary]";
2305
2306        assert_eq!(expected, format!("{plan}"));
2307
2308        Ok(())
2309    }
2310
2311    #[test]
2312    fn plan_builder_union() -> Result<()> {
2313        let plan =
2314            table_scan(Some("employee_csv"), &employee_schema(), Some(vec![3, 4]))?;
2315
2316        let plan = plan
2317            .clone()
2318            .union(plan.clone().build()?)?
2319            .union(plan.clone().build()?)?
2320            .union(plan.build()?)?
2321            .build()?;
2322
2323        let expected = "Union\
2324        \n  Union\
2325        \n    Union\
2326        \n      TableScan: employee_csv projection=[state, salary]\
2327        \n      TableScan: employee_csv projection=[state, salary]\
2328        \n    TableScan: employee_csv projection=[state, salary]\
2329        \n  TableScan: employee_csv projection=[state, salary]";
2330
2331        assert_eq!(expected, format!("{plan}"));
2332
2333        Ok(())
2334    }
2335
2336    #[test]
2337    fn plan_builder_union_distinct() -> Result<()> {
2338        let plan =
2339            table_scan(Some("employee_csv"), &employee_schema(), Some(vec![3, 4]))?;
2340
2341        let plan = plan
2342            .clone()
2343            .union_distinct(plan.clone().build()?)?
2344            .union_distinct(plan.clone().build()?)?
2345            .union_distinct(plan.build()?)?
2346            .build()?;
2347
2348        let expected = "\
2349        Distinct:\
2350        \n  Union\
2351        \n    Distinct:\
2352        \n      Union\
2353        \n        Distinct:\
2354        \n          Union\
2355        \n            TableScan: employee_csv projection=[state, salary]\
2356        \n            TableScan: employee_csv projection=[state, salary]\
2357        \n        TableScan: employee_csv projection=[state, salary]\
2358        \n    TableScan: employee_csv projection=[state, salary]";
2359
2360        assert_eq!(expected, format!("{plan}"));
2361
2362        Ok(())
2363    }
2364
2365    #[test]
2366    fn plan_builder_simple_distinct() -> Result<()> {
2367        let plan =
2368            table_scan(Some("employee_csv"), &employee_schema(), Some(vec![0, 3]))?
2369                .filter(col("state").eq(lit("CO")))?
2370                .project(vec![col("id")])?
2371                .distinct()?
2372                .build()?;
2373
2374        let expected = "\
2375        Distinct:\
2376        \n  Projection: employee_csv.id\
2377        \n    Filter: employee_csv.state = Utf8(\"CO\")\
2378        \n      TableScan: employee_csv projection=[id, state]";
2379
2380        assert_eq!(expected, format!("{plan}"));
2381
2382        Ok(())
2383    }
2384
2385    #[test]
2386    fn exists_subquery() -> Result<()> {
2387        let foo = test_table_scan_with_name("foo")?;
2388        let bar = test_table_scan_with_name("bar")?;
2389
2390        let subquery = LogicalPlanBuilder::from(foo)
2391            .project(vec![col("a")])?
2392            .filter(col("a").eq(col("bar.a")))?
2393            .build()?;
2394
2395        let outer_query = LogicalPlanBuilder::from(bar)
2396            .project(vec![col("a")])?
2397            .filter(exists(Arc::new(subquery)))?
2398            .build()?;
2399
2400        let expected = "Filter: EXISTS (<subquery>)\
2401        \n  Subquery:\
2402        \n    Filter: foo.a = bar.a\
2403        \n      Projection: foo.a\
2404        \n        TableScan: foo\
2405        \n  Projection: bar.a\
2406        \n    TableScan: bar";
2407        assert_eq!(expected, format!("{outer_query}"));
2408
2409        Ok(())
2410    }
2411
2412    #[test]
2413    fn filter_in_subquery() -> Result<()> {
2414        let foo = test_table_scan_with_name("foo")?;
2415        let bar = test_table_scan_with_name("bar")?;
2416
2417        let subquery = LogicalPlanBuilder::from(foo)
2418            .project(vec![col("a")])?
2419            .filter(col("a").eq(col("bar.a")))?
2420            .build()?;
2421
2422        // SELECT a FROM bar WHERE a IN (SELECT a FROM foo WHERE a = bar.a)
2423        let outer_query = LogicalPlanBuilder::from(bar)
2424            .project(vec![col("a")])?
2425            .filter(in_subquery(col("a"), Arc::new(subquery)))?
2426            .build()?;
2427
2428        let expected = "Filter: bar.a IN (<subquery>)\
2429        \n  Subquery:\
2430        \n    Filter: foo.a = bar.a\
2431        \n      Projection: foo.a\
2432        \n        TableScan: foo\
2433        \n  Projection: bar.a\
2434        \n    TableScan: bar";
2435        assert_eq!(expected, format!("{outer_query}"));
2436
2437        Ok(())
2438    }
2439
2440    #[test]
2441    fn select_scalar_subquery() -> Result<()> {
2442        let foo = test_table_scan_with_name("foo")?;
2443        let bar = test_table_scan_with_name("bar")?;
2444
2445        let subquery = LogicalPlanBuilder::from(foo)
2446            .project(vec![col("b")])?
2447            .filter(col("a").eq(col("bar.a")))?
2448            .build()?;
2449
2450        // SELECT (SELECT a FROM foo WHERE a = bar.a) FROM bar
2451        let outer_query = LogicalPlanBuilder::from(bar)
2452            .project(vec![scalar_subquery(Arc::new(subquery))])?
2453            .build()?;
2454
2455        let expected = "Projection: (<subquery>)\
2456        \n  Subquery:\
2457        \n    Filter: foo.a = bar.a\
2458        \n      Projection: foo.b\
2459        \n        TableScan: foo\
2460        \n  TableScan: bar";
2461        assert_eq!(expected, format!("{outer_query}"));
2462
2463        Ok(())
2464    }
2465
2466    #[test]
2467    fn projection_non_unique_names() -> Result<()> {
2468        let plan = table_scan(
2469            Some("employee_csv"),
2470            &employee_schema(),
2471            // project id and first_name by column index
2472            Some(vec![0, 1]),
2473        )?
2474        // two columns with the same name => error
2475        .project(vec![col("id"), col("first_name").alias("id")]);
2476
2477        match plan {
2478            Err(DataFusionError::SchemaError(
2479                SchemaError::AmbiguousReference {
2480                    field:
2481                        Column {
2482                            relation: Some(TableReference::Bare { table }),
2483                            name,
2484                            spans: _,
2485                        },
2486                },
2487                _,
2488            )) => {
2489                assert_eq!(*"employee_csv", *table);
2490                assert_eq!("id", &name);
2491                Ok(())
2492            }
2493            _ => plan_err!("Plan should have returned an DataFusionError::SchemaError"),
2494        }
2495    }
2496
2497    fn employee_schema() -> Schema {
2498        Schema::new(vec![
2499            Field::new("id", DataType::Int32, false),
2500            Field::new("first_name", DataType::Utf8, false),
2501            Field::new("last_name", DataType::Utf8, false),
2502            Field::new("state", DataType::Utf8, false),
2503            Field::new("salary", DataType::Int32, false),
2504        ])
2505    }
2506
2507    #[test]
2508    fn stringified_plan() {
2509        let stringified_plan =
2510            StringifiedPlan::new(PlanType::InitialLogicalPlan, "...the plan...");
2511        assert!(stringified_plan.should_display(true));
2512        assert!(!stringified_plan.should_display(false)); // not in non verbose mode
2513
2514        let stringified_plan =
2515            StringifiedPlan::new(PlanType::FinalLogicalPlan, "...the plan...");
2516        assert!(stringified_plan.should_display(true));
2517        assert!(stringified_plan.should_display(false)); // display in non verbose mode too
2518
2519        let stringified_plan =
2520            StringifiedPlan::new(PlanType::InitialPhysicalPlan, "...the plan...");
2521        assert!(stringified_plan.should_display(true));
2522        assert!(!stringified_plan.should_display(false)); // not in non verbose mode
2523
2524        let stringified_plan =
2525            StringifiedPlan::new(PlanType::FinalPhysicalPlan, "...the plan...");
2526        assert!(stringified_plan.should_display(true));
2527        assert!(stringified_plan.should_display(false)); // display in non verbose mode
2528
2529        let stringified_plan = StringifiedPlan::new(
2530            PlanType::OptimizedLogicalPlan {
2531                optimizer_name: "random opt pass".into(),
2532            },
2533            "...the plan...",
2534        );
2535        assert!(stringified_plan.should_display(true));
2536        assert!(!stringified_plan.should_display(false));
2537    }
2538
2539    fn test_table_scan_with_name(name: &str) -> Result<LogicalPlan> {
2540        let schema = Schema::new(vec![
2541            Field::new("a", DataType::UInt32, false),
2542            Field::new("b", DataType::UInt32, false),
2543            Field::new("c", DataType::UInt32, false),
2544        ]);
2545        table_scan(Some(name), &schema, None)?.build()
2546    }
2547
2548    #[test]
2549    fn plan_builder_intersect_different_num_columns_error() -> Result<()> {
2550        let plan1 =
2551            table_scan(TableReference::none(), &employee_schema(), Some(vec![3]))?;
2552        let plan2 =
2553            table_scan(TableReference::none(), &employee_schema(), Some(vec![3, 4]))?;
2554
2555        let expected = "Error during planning: INTERSECT/EXCEPT query must have the same number of columns. \
2556         Left is 1 and right is 2.";
2557        let err_msg1 =
2558            LogicalPlanBuilder::intersect(plan1.build()?, plan2.build()?, true)
2559                .unwrap_err();
2560
2561        assert_eq!(err_msg1.strip_backtrace(), expected);
2562
2563        Ok(())
2564    }
2565
2566    #[test]
2567    fn plan_builder_unnest() -> Result<()> {
2568        // Cannot unnest on a scalar column
2569        let err = nested_table_scan("test_table")?
2570            .unnest_column("scalar")
2571            .unwrap_err();
2572        assert!(err
2573            .to_string()
2574            .starts_with("Internal error: trying to unnest on invalid data type UInt32"));
2575
2576        // Unnesting the strings list.
2577        let plan = nested_table_scan("test_table")?
2578            .unnest_column("strings")?
2579            .build()?;
2580
2581        let expected = "\
2582        Unnest: lists[test_table.strings|depth=1] structs[]\
2583        \n  TableScan: test_table";
2584        assert_eq!(expected, format!("{plan}"));
2585
2586        // Check unnested field is a scalar
2587        let field = plan.schema().field_with_name(None, "strings").unwrap();
2588        assert_eq!(&DataType::Utf8, field.data_type());
2589
2590        // Unnesting the singular struct column result into 2 new columns for each subfield
2591        let plan = nested_table_scan("test_table")?
2592            .unnest_column("struct_singular")?
2593            .build()?;
2594
2595        let expected = "\
2596        Unnest: lists[] structs[test_table.struct_singular]\
2597        \n  TableScan: test_table";
2598        assert_eq!(expected, format!("{plan}"));
2599
2600        for field_name in &["a", "b"] {
2601            // Check unnested struct field is a scalar
2602            let field = plan
2603                .schema()
2604                .field_with_name(None, &format!("struct_singular.{}", field_name))
2605                .unwrap();
2606            assert_eq!(&DataType::UInt32, field.data_type());
2607        }
2608
2609        // Unnesting multiple fields in separate plans
2610        let plan = nested_table_scan("test_table")?
2611            .unnest_column("strings")?
2612            .unnest_column("structs")?
2613            .unnest_column("struct_singular")?
2614            .build()?;
2615
2616        let expected = "\
2617        Unnest: lists[] structs[test_table.struct_singular]\
2618        \n  Unnest: lists[test_table.structs|depth=1] structs[]\
2619        \n    Unnest: lists[test_table.strings|depth=1] structs[]\
2620        \n      TableScan: test_table";
2621        assert_eq!(expected, format!("{plan}"));
2622
2623        // Check unnested struct list field should be a struct.
2624        let field = plan.schema().field_with_name(None, "structs").unwrap();
2625        assert!(matches!(field.data_type(), DataType::Struct(_)));
2626
2627        // Unnesting multiple fields at the same time, using infer syntax
2628        let cols = vec!["strings", "structs", "struct_singular"]
2629            .into_iter()
2630            .map(|c| c.into())
2631            .collect();
2632
2633        let plan = nested_table_scan("test_table")?
2634            .unnest_columns_with_options(cols, UnnestOptions::default())?
2635            .build()?;
2636
2637        let expected = "\
2638        Unnest: lists[test_table.strings|depth=1, test_table.structs|depth=1] structs[test_table.struct_singular]\
2639        \n  TableScan: test_table";
2640        assert_eq!(expected, format!("{plan}"));
2641
2642        // Unnesting missing column should fail.
2643        let plan = nested_table_scan("test_table")?.unnest_column("missing");
2644        assert!(plan.is_err());
2645
2646        // Simultaneously unnesting a list (with different depth) and a struct column
2647        let plan = nested_table_scan("test_table")?
2648            .unnest_columns_with_options(
2649                vec!["stringss".into(), "struct_singular".into()],
2650                UnnestOptions::default()
2651                    .with_recursions(RecursionUnnestOption {
2652                        input_column: "stringss".into(),
2653                        output_column: "stringss_depth_1".into(),
2654                        depth: 1,
2655                    })
2656                    .with_recursions(RecursionUnnestOption {
2657                        input_column: "stringss".into(),
2658                        output_column: "stringss_depth_2".into(),
2659                        depth: 2,
2660                    }),
2661            )?
2662            .build()?;
2663
2664        let expected = "\
2665        Unnest: lists[test_table.stringss|depth=1, test_table.stringss|depth=2] structs[test_table.struct_singular]\
2666        \n  TableScan: test_table";
2667        assert_eq!(expected, format!("{plan}"));
2668
2669        // Check output columns has correct type
2670        let field = plan
2671            .schema()
2672            .field_with_name(None, "stringss_depth_1")
2673            .unwrap();
2674        assert_eq!(
2675            &DataType::new_list(DataType::Utf8, false),
2676            field.data_type()
2677        );
2678        let field = plan
2679            .schema()
2680            .field_with_name(None, "stringss_depth_2")
2681            .unwrap();
2682        assert_eq!(&DataType::Utf8, field.data_type());
2683        // unnesting struct is still correct
2684        for field_name in &["a", "b"] {
2685            let field = plan
2686                .schema()
2687                .field_with_name(None, &format!("struct_singular.{}", field_name))
2688                .unwrap();
2689            assert_eq!(&DataType::UInt32, field.data_type());
2690        }
2691
2692        Ok(())
2693    }
2694
2695    fn nested_table_scan(table_name: &str) -> Result<LogicalPlanBuilder> {
2696        // Create a schema with a scalar field, a list of strings, a list of structs
2697        // and a singular struct
2698        let struct_field_in_list = Field::new_struct(
2699            "item",
2700            vec![
2701                Field::new("a", DataType::UInt32, false),
2702                Field::new("b", DataType::UInt32, false),
2703            ],
2704            false,
2705        );
2706        let string_field = Field::new_list_field(DataType::Utf8, false);
2707        let strings_field = Field::new_list("item", string_field.clone(), false);
2708        let schema = Schema::new(vec![
2709            Field::new("scalar", DataType::UInt32, false),
2710            Field::new_list("strings", string_field, false),
2711            Field::new_list("structs", struct_field_in_list, false),
2712            Field::new(
2713                "struct_singular",
2714                DataType::Struct(Fields::from(vec![
2715                    Field::new("a", DataType::UInt32, false),
2716                    Field::new("b", DataType::UInt32, false),
2717                ])),
2718                false,
2719            ),
2720            Field::new_list("stringss", strings_field, false),
2721        ]);
2722
2723        table_scan(Some(table_name), &schema, None)
2724    }
2725
2726    #[test]
2727    fn test_union_after_join() -> Result<()> {
2728        let values = vec![vec![lit(1)]];
2729
2730        let left = LogicalPlanBuilder::values(values.clone())?
2731            .alias("left")?
2732            .build()?;
2733        let right = LogicalPlanBuilder::values(values)?
2734            .alias("right")?
2735            .build()?;
2736
2737        let join = LogicalPlanBuilder::from(left).cross_join(right)?.build()?;
2738
2739        let _ = LogicalPlanBuilder::from(join.clone())
2740            .union(join)?
2741            .build()?;
2742
2743        Ok(())
2744    }
2745
2746    #[test]
2747    fn test_change_redundant_column() -> Result<()> {
2748        let t1_field_1 = Field::new("a", DataType::Int32, false);
2749        let t2_field_1 = Field::new("a", DataType::Int32, false);
2750        let t2_field_3 = Field::new("a", DataType::Int32, false);
2751        let t2_field_4 = Field::new("a:1", DataType::Int32, false);
2752        let t1_field_2 = Field::new("b", DataType::Int32, false);
2753        let t2_field_2 = Field::new("b", DataType::Int32, false);
2754
2755        let field_vec = vec![
2756            t1_field_1, t2_field_1, t1_field_2, t2_field_2, t2_field_3, t2_field_4,
2757        ];
2758        let remove_redundant = change_redundant_column(&Fields::from(field_vec));
2759
2760        assert_eq!(
2761            remove_redundant,
2762            vec![
2763                Field::new("a", DataType::Int32, false),
2764                Field::new("a:1", DataType::Int32, false),
2765                Field::new("b", DataType::Int32, false),
2766                Field::new("b:1", DataType::Int32, false),
2767                Field::new("a:2", DataType::Int32, false),
2768                Field::new("a:1:1", DataType::Int32, false),
2769            ]
2770        );
2771        Ok(())
2772    }
2773
2774    #[test]
2775    fn plan_builder_from_logical_plan() -> Result<()> {
2776        let plan =
2777            table_scan(Some("employee_csv"), &employee_schema(), Some(vec![3, 4]))?
2778                .sort(vec![
2779                    expr::Sort::new(col("state"), true, true),
2780                    expr::Sort::new(col("salary"), false, false),
2781                ])?
2782                .build()?;
2783
2784        let plan_expected = format!("{plan}");
2785        let plan_builder: LogicalPlanBuilder = Arc::new(plan).into();
2786        assert_eq!(plan_expected, format!("{}", plan_builder.plan));
2787
2788        Ok(())
2789    }
2790
2791    #[test]
2792    fn plan_builder_aggregate_without_implicit_group_by_exprs() -> Result<()> {
2793        let constraints =
2794            Constraints::new_unverified(vec![Constraint::PrimaryKey(vec![0])]);
2795        let table_source = table_source_with_constraints(&employee_schema(), constraints);
2796
2797        let plan =
2798            LogicalPlanBuilder::scan("employee_csv", table_source, Some(vec![0, 3, 4]))?
2799                .aggregate(vec![col("id")], vec![sum(col("salary"))])?
2800                .build()?;
2801
2802        let expected =
2803            "Aggregate: groupBy=[[employee_csv.id]], aggr=[[sum(employee_csv.salary)]]\
2804        \n  TableScan: employee_csv projection=[id, state, salary]";
2805        assert_eq!(expected, format!("{plan}"));
2806
2807        Ok(())
2808    }
2809
2810    #[test]
2811    fn plan_builder_aggregate_with_implicit_group_by_exprs() -> Result<()> {
2812        let constraints =
2813            Constraints::new_unverified(vec![Constraint::PrimaryKey(vec![0])]);
2814        let table_source = table_source_with_constraints(&employee_schema(), constraints);
2815
2816        let options =
2817            LogicalPlanBuilderOptions::new().with_add_implicit_group_by_exprs(true);
2818        let plan =
2819            LogicalPlanBuilder::scan("employee_csv", table_source, Some(vec![0, 3, 4]))?
2820                .with_options(options)
2821                .aggregate(vec![col("id")], vec![sum(col("salary"))])?
2822                .build()?;
2823
2824        let expected =
2825            "Aggregate: groupBy=[[employee_csv.id, employee_csv.state, employee_csv.salary]], aggr=[[sum(employee_csv.salary)]]\
2826        \n  TableScan: employee_csv projection=[id, state, salary]";
2827        assert_eq!(expected, format!("{plan}"));
2828
2829        Ok(())
2830    }
2831}