datafusion_sql/
planner.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! [`SqlToRel`]: SQL Query Planner (produces [`LogicalPlan`] from SQL AST)
19use std::collections::HashMap;
20use std::sync::Arc;
21use std::vec;
22
23use arrow::datatypes::*;
24use datafusion_common::config::SqlParserOptions;
25use datafusion_common::error::add_possible_columns_to_diag;
26use datafusion_common::TableReference;
27use datafusion_common::{
28    field_not_found, internal_err, plan_datafusion_err, DFSchemaRef, Diagnostic,
29    SchemaError,
30};
31use datafusion_common::{not_impl_err, plan_err, DFSchema, DataFusionError, Result};
32use datafusion_expr::logical_plan::{LogicalPlan, LogicalPlanBuilder};
33use datafusion_expr::utils::find_column_exprs;
34use datafusion_expr::{col, Expr};
35use sqlparser::ast::{ArrayElemTypeDef, ExactNumberInfo, TimezoneInfo};
36use sqlparser::ast::{ColumnDef as SQLColumnDef, ColumnOption};
37use sqlparser::ast::{DataType as SQLDataType, Ident, ObjectName, TableAlias};
38
39use crate::utils::make_decimal_type;
40pub use datafusion_expr::planner::ContextProvider;
41
42/// SQL parser options
43#[derive(Debug, Clone, Copy)]
44pub struct ParserOptions {
45    /// Whether to parse float as decimal.
46    pub parse_float_as_decimal: bool,
47    /// Whether to normalize identifiers.
48    pub enable_ident_normalization: bool,
49    /// Whether to support varchar with length.
50    pub support_varchar_with_length: bool,
51    /// Whether to normalize options value.
52    pub enable_options_value_normalization: bool,
53    /// Whether to collect spans
54    pub collect_spans: bool,
55    /// Whether `VARCHAR` is mapped to `Utf8View` during SQL planning.
56    pub map_varchar_to_utf8view: bool,
57}
58
59impl ParserOptions {
60    /// Creates a new `ParserOptions` instance with default values.
61    ///
62    /// # Examples
63    ///
64    /// ```
65    /// use datafusion_sql::planner::ParserOptions;
66    /// let opts = ParserOptions::new();
67    /// assert_eq!(opts.parse_float_as_decimal, false);
68    /// assert_eq!(opts.enable_ident_normalization, true);
69    /// ```
70    pub fn new() -> Self {
71        Self {
72            parse_float_as_decimal: false,
73            enable_ident_normalization: true,
74            support_varchar_with_length: true,
75            map_varchar_to_utf8view: false,
76            enable_options_value_normalization: false,
77            collect_spans: false,
78        }
79    }
80
81    /// Sets the `parse_float_as_decimal` option.
82    ///
83    /// # Examples
84    ///
85    /// ```
86    /// use datafusion_sql::planner::ParserOptions;
87    /// let opts = ParserOptions::new().with_parse_float_as_decimal(true);
88    /// assert_eq!(opts.parse_float_as_decimal, true);
89    /// ```
90    pub fn with_parse_float_as_decimal(mut self, value: bool) -> Self {
91        self.parse_float_as_decimal = value;
92        self
93    }
94
95    /// Sets the `enable_ident_normalization` option.
96    ///
97    /// # Examples
98    ///
99    /// ```
100    /// use datafusion_sql::planner::ParserOptions;
101    /// let opts = ParserOptions::new().with_enable_ident_normalization(false);
102    /// assert_eq!(opts.enable_ident_normalization, false);
103    /// ```
104    pub fn with_enable_ident_normalization(mut self, value: bool) -> Self {
105        self.enable_ident_normalization = value;
106        self
107    }
108
109    /// Sets the `support_varchar_with_length` option.
110    pub fn with_support_varchar_with_length(mut self, value: bool) -> Self {
111        self.support_varchar_with_length = value;
112        self
113    }
114
115    /// Sets the `map_varchar_to_utf8view` option.
116    pub fn with_map_varchar_to_utf8view(mut self, value: bool) -> Self {
117        self.map_varchar_to_utf8view = value;
118        self
119    }
120
121    /// Sets the `enable_options_value_normalization` option.
122    pub fn with_enable_options_value_normalization(mut self, value: bool) -> Self {
123        self.enable_options_value_normalization = value;
124        self
125    }
126
127    /// Sets the `collect_spans` option.
128    pub fn with_collect_spans(mut self, value: bool) -> Self {
129        self.collect_spans = value;
130        self
131    }
132}
133
134impl Default for ParserOptions {
135    fn default() -> Self {
136        Self::new()
137    }
138}
139
140impl From<&SqlParserOptions> for ParserOptions {
141    fn from(options: &SqlParserOptions) -> Self {
142        Self {
143            parse_float_as_decimal: options.parse_float_as_decimal,
144            enable_ident_normalization: options.enable_ident_normalization,
145            support_varchar_with_length: options.support_varchar_with_length,
146            map_varchar_to_utf8view: options.map_varchar_to_utf8view,
147            enable_options_value_normalization: options
148                .enable_options_value_normalization,
149            collect_spans: options.collect_spans,
150        }
151    }
152}
153
154/// Ident Normalizer
155#[derive(Debug)]
156pub struct IdentNormalizer {
157    normalize: bool,
158}
159
160impl Default for IdentNormalizer {
161    fn default() -> Self {
162        Self { normalize: true }
163    }
164}
165
166impl IdentNormalizer {
167    pub fn new(normalize: bool) -> Self {
168        Self { normalize }
169    }
170
171    pub fn normalize(&self, ident: Ident) -> String {
172        if self.normalize {
173            crate::utils::normalize_ident(ident)
174        } else {
175            ident.value
176        }
177    }
178}
179
180/// Struct to store the states used by the Planner. The Planner will leverage the states
181/// to resolve CTEs, Views, subqueries and PREPARE statements. The states include
182/// Common Table Expression (CTE) provided with WITH clause and
183/// Parameter Data Types provided with PREPARE statement and the query schema of the
184/// outer query plan.
185///
186/// # Cloning
187///
188/// Only the `ctes` are truly cloned when the `PlannerContext` is cloned.
189/// This helps resolve scoping issues of CTEs.
190/// By using cloning, a subquery can inherit CTEs from the outer query
191/// and can also define its own private CTEs without affecting the outer query.
192///
193#[derive(Debug, Clone)]
194pub struct PlannerContext {
195    /// Data types for numbered parameters ($1, $2, etc), if supplied
196    /// in `PREPARE` statement
197    prepare_param_data_types: Arc<Vec<DataType>>,
198    /// Map of CTE name to logical plan of the WITH clause.
199    /// Use `Arc<LogicalPlan>` to allow cheap cloning
200    ctes: HashMap<String, Arc<LogicalPlan>>,
201    /// The query schema of the outer query plan, used to resolve the columns in subquery
202    outer_query_schema: Option<DFSchemaRef>,
203    /// The joined schemas of all FROM clauses planned so far. When planning LATERAL
204    /// FROM clauses, this should become a suffix of the `outer_query_schema`.
205    outer_from_schema: Option<DFSchemaRef>,
206    /// The query schema defined by the table
207    create_table_schema: Option<DFSchemaRef>,
208}
209
210impl Default for PlannerContext {
211    fn default() -> Self {
212        Self::new()
213    }
214}
215
216impl PlannerContext {
217    /// Create an empty PlannerContext
218    pub fn new() -> Self {
219        Self {
220            prepare_param_data_types: Arc::new(vec![]),
221            ctes: HashMap::new(),
222            outer_query_schema: None,
223            outer_from_schema: None,
224            create_table_schema: None,
225        }
226    }
227
228    /// Update the PlannerContext with provided prepare_param_data_types
229    pub fn with_prepare_param_data_types(
230        mut self,
231        prepare_param_data_types: Vec<DataType>,
232    ) -> Self {
233        self.prepare_param_data_types = prepare_param_data_types.into();
234        self
235    }
236
237    // Return a reference to the outer query's schema
238    pub fn outer_query_schema(&self) -> Option<&DFSchema> {
239        self.outer_query_schema.as_ref().map(|s| s.as_ref())
240    }
241
242    /// Sets the outer query schema, returning the existing one, if
243    /// any
244    pub fn set_outer_query_schema(
245        &mut self,
246        mut schema: Option<DFSchemaRef>,
247    ) -> Option<DFSchemaRef> {
248        std::mem::swap(&mut self.outer_query_schema, &mut schema);
249        schema
250    }
251
252    pub fn set_table_schema(
253        &mut self,
254        mut schema: Option<DFSchemaRef>,
255    ) -> Option<DFSchemaRef> {
256        std::mem::swap(&mut self.create_table_schema, &mut schema);
257        schema
258    }
259
260    pub fn table_schema(&self) -> Option<DFSchemaRef> {
261        self.create_table_schema.clone()
262    }
263
264    // Return a clone of the outer FROM schema
265    pub fn outer_from_schema(&self) -> Option<Arc<DFSchema>> {
266        self.outer_from_schema.clone()
267    }
268
269    /// Sets the outer FROM schema, returning the existing one, if any
270    pub fn set_outer_from_schema(
271        &mut self,
272        mut schema: Option<DFSchemaRef>,
273    ) -> Option<DFSchemaRef> {
274        std::mem::swap(&mut self.outer_from_schema, &mut schema);
275        schema
276    }
277
278    /// Extends the FROM schema, returning the existing one, if any
279    pub fn extend_outer_from_schema(&mut self, schema: &DFSchemaRef) -> Result<()> {
280        match self.outer_from_schema.as_mut() {
281            Some(from_schema) => Arc::make_mut(from_schema).merge(schema),
282            None => self.outer_from_schema = Some(Arc::clone(schema)),
283        };
284        Ok(())
285    }
286
287    /// Return the types of parameters (`$1`, `$2`, etc) if known
288    pub fn prepare_param_data_types(&self) -> &[DataType] {
289        &self.prepare_param_data_types
290    }
291
292    /// Returns true if there is a Common Table Expression (CTE) /
293    /// Subquery for the specified name
294    pub fn contains_cte(&self, cte_name: &str) -> bool {
295        self.ctes.contains_key(cte_name)
296    }
297
298    /// Inserts a LogicalPlan for the Common Table Expression (CTE) /
299    /// Subquery for the specified name
300    pub fn insert_cte(&mut self, cte_name: impl Into<String>, plan: LogicalPlan) {
301        let cte_name = cte_name.into();
302        self.ctes.insert(cte_name, Arc::new(plan));
303    }
304
305    /// Return a plan for the Common Table Expression (CTE) / Subquery for the
306    /// specified name
307    pub fn get_cte(&self, cte_name: &str) -> Option<&LogicalPlan> {
308        self.ctes.get(cte_name).map(|cte| cte.as_ref())
309    }
310
311    /// Remove the plan of CTE / Subquery for the specified name
312    pub(super) fn remove_cte(&mut self, cte_name: &str) {
313        self.ctes.remove(cte_name);
314    }
315}
316
317/// SQL query planner and binder
318///
319/// This struct is used to convert a SQL AST into a [`LogicalPlan`].
320///
321/// You can control the behavior of the planner by providing [`ParserOptions`].
322///
323/// It performs the following tasks:
324///
325/// 1. Name and type resolution (called "binding" in other systems). This
326///    phase looks up table and column names using the [`ContextProvider`].
327/// 2. Mechanical translation of the AST into a [`LogicalPlan`].
328///
329/// It does not perform type coercion, or perform optimization, which are done
330/// by subsequent passes.
331///
332/// Key interfaces are:
333/// * [`Self::sql_statement_to_plan`]: Convert a statement
334///   (e.g. `SELECT ...`) into a [`LogicalPlan`]
335/// * [`Self::sql_to_expr`]: Convert an expression (e.g. `1 + 2`) into an [`Expr`]
336pub struct SqlToRel<'a, S: ContextProvider> {
337    pub(crate) context_provider: &'a S,
338    pub(crate) options: ParserOptions,
339    pub(crate) ident_normalizer: IdentNormalizer,
340}
341
342impl<'a, S: ContextProvider> SqlToRel<'a, S> {
343    /// Create a new query planner.
344    ///
345    /// The query planner derives the parser options from the context provider.
346    pub fn new(context_provider: &'a S) -> Self {
347        let parser_options = ParserOptions::from(&context_provider.options().sql_parser);
348        Self::new_with_options(context_provider, parser_options)
349    }
350
351    /// Create a new query planner with the given parser options.
352    ///
353    /// The query planner ignores the parser options from the context provider
354    /// and uses the given parser options instead.
355    pub fn new_with_options(context_provider: &'a S, options: ParserOptions) -> Self {
356        let ident_normalize = options.enable_ident_normalization;
357
358        SqlToRel {
359            context_provider,
360            options,
361            ident_normalizer: IdentNormalizer::new(ident_normalize),
362        }
363    }
364
365    pub fn build_schema(&self, columns: Vec<SQLColumnDef>) -> Result<Schema> {
366        let mut fields = Vec::with_capacity(columns.len());
367
368        for column in columns {
369            let data_type = self.convert_data_type(&column.data_type)?;
370            let not_nullable = column
371                .options
372                .iter()
373                .any(|x| x.option == ColumnOption::NotNull);
374            fields.push(Field::new(
375                self.ident_normalizer.normalize(column.name),
376                data_type,
377                !not_nullable,
378            ));
379        }
380
381        Ok(Schema::new(fields))
382    }
383
384    /// Returns a vector of (column_name, default_expr) pairs
385    pub(super) fn build_column_defaults(
386        &self,
387        columns: &Vec<SQLColumnDef>,
388        planner_context: &mut PlannerContext,
389    ) -> Result<Vec<(String, Expr)>> {
390        let mut column_defaults = vec![];
391        // Default expressions are restricted, column references are not allowed
392        let empty_schema = DFSchema::empty();
393        let error_desc = |e: DataFusionError| match e {
394            DataFusionError::SchemaError(SchemaError::FieldNotFound { .. }, _) => {
395                plan_datafusion_err!(
396                    "Column reference is not allowed in the DEFAULT expression : {}",
397                    e
398                )
399            }
400            _ => e,
401        };
402
403        for column in columns {
404            if let Some(default_sql_expr) =
405                column.options.iter().find_map(|o| match &o.option {
406                    ColumnOption::Default(expr) => Some(expr),
407                    _ => None,
408                })
409            {
410                let default_expr = self
411                    .sql_to_expr(default_sql_expr.clone(), &empty_schema, planner_context)
412                    .map_err(error_desc)?;
413                column_defaults.push((
414                    self.ident_normalizer.normalize(column.name.clone()),
415                    default_expr,
416                ));
417            }
418        }
419        Ok(column_defaults)
420    }
421
422    /// Apply the given TableAlias to the input plan
423    pub(crate) fn apply_table_alias(
424        &self,
425        plan: LogicalPlan,
426        alias: TableAlias,
427    ) -> Result<LogicalPlan> {
428        let idents = alias.columns.into_iter().map(|c| c.name).collect();
429        let plan = self.apply_expr_alias(plan, idents)?;
430
431        LogicalPlanBuilder::from(plan)
432            .alias(TableReference::bare(
433                self.ident_normalizer.normalize(alias.name),
434            ))?
435            .build()
436    }
437
438    pub(crate) fn apply_expr_alias(
439        &self,
440        plan: LogicalPlan,
441        idents: Vec<Ident>,
442    ) -> Result<LogicalPlan> {
443        if idents.is_empty() {
444            Ok(plan)
445        } else if idents.len() != plan.schema().fields().len() {
446            plan_err!(
447                "Source table contains {} columns but only {} \
448                names given as column alias",
449                plan.schema().fields().len(),
450                idents.len()
451            )
452        } else {
453            let fields = plan.schema().fields().clone();
454            LogicalPlanBuilder::from(plan)
455                .project(fields.iter().zip(idents.into_iter()).map(|(field, ident)| {
456                    col(field.name()).alias(self.ident_normalizer.normalize(ident))
457                }))?
458                .build()
459        }
460    }
461
462    /// Validate the schema provides all of the columns referenced in the expressions.
463    pub(crate) fn validate_schema_satisfies_exprs(
464        &self,
465        schema: &DFSchema,
466        exprs: &[Expr],
467    ) -> Result<()> {
468        find_column_exprs(exprs)
469            .iter()
470            .try_for_each(|col| match col {
471                Expr::Column(col) => match &col.relation {
472                    Some(r) => schema.field_with_qualified_name(r, &col.name).map(|_| ()),
473                    None => {
474                        if !schema.fields_with_unqualified_name(&col.name).is_empty() {
475                            Ok(())
476                        } else {
477                            Err(field_not_found(
478                                col.relation.clone(),
479                                col.name.as_str(),
480                                schema,
481                            ))
482                        }
483                    }
484                }
485                .map_err(|err: DataFusionError| match &err {
486                    DataFusionError::SchemaError(
487                        SchemaError::FieldNotFound {
488                            field,
489                            valid_fields,
490                        },
491                        _,
492                    ) => {
493                        let mut diagnostic = if let Some(relation) = &col.relation {
494                            Diagnostic::new_error(
495                                format!(
496                                    "column '{}' not found in '{}'",
497                                    &col.name, relation
498                                ),
499                                col.spans().first(),
500                            )
501                        } else {
502                            Diagnostic::new_error(
503                                format!("column '{}' not found", &col.name),
504                                col.spans().first(),
505                            )
506                        };
507                        add_possible_columns_to_diag(
508                            &mut diagnostic,
509                            field,
510                            valid_fields,
511                        );
512                        err.with_diagnostic(diagnostic)
513                    }
514                    _ => err,
515                }),
516                _ => internal_err!("Not a column"),
517            })
518    }
519
520    pub(crate) fn convert_data_type(&self, sql_type: &SQLDataType) -> Result<DataType> {
521        // First check if any of the registered type_planner can handle this type
522        if let Some(type_planner) = self.context_provider.get_type_planner() {
523            if let Some(data_type) = type_planner.plan_type(sql_type)? {
524                return Ok(data_type);
525            }
526        }
527
528        // If no type_planner can handle this type, use the default conversion
529        match sql_type {
530            SQLDataType::Array(ArrayElemTypeDef::AngleBracket(inner_sql_type)) => {
531                // Arrays may be multi-dimensional.
532                let inner_data_type = self.convert_data_type(inner_sql_type)?;
533                Ok(DataType::new_list(inner_data_type, true))
534            }
535            SQLDataType::Array(ArrayElemTypeDef::SquareBracket(
536                inner_sql_type,
537                maybe_array_size,
538            )) => {
539                let inner_data_type = self.convert_data_type(inner_sql_type)?;
540                if let Some(array_size) = maybe_array_size {
541                    Ok(DataType::new_fixed_size_list(
542                        inner_data_type,
543                        *array_size as i32,
544                        true,
545                    ))
546                } else {
547                    Ok(DataType::new_list(inner_data_type, true))
548                }
549            }
550            SQLDataType::Array(ArrayElemTypeDef::None) => {
551                not_impl_err!("Arrays with unspecified type is not supported")
552            }
553            other => self.convert_simple_data_type(other),
554        }
555    }
556
557    fn convert_simple_data_type(&self, sql_type: &SQLDataType) -> Result<DataType> {
558        match sql_type {
559            SQLDataType::Boolean | SQLDataType::Bool => Ok(DataType::Boolean),
560            SQLDataType::TinyInt(_) => Ok(DataType::Int8),
561            SQLDataType::SmallInt(_) | SQLDataType::Int2(_) => Ok(DataType::Int16),
562            SQLDataType::Int(_) | SQLDataType::Integer(_) | SQLDataType::Int4(_) => {
563                Ok(DataType::Int32)
564            }
565            SQLDataType::BigInt(_) | SQLDataType::Int8(_) => Ok(DataType::Int64),
566            SQLDataType::TinyIntUnsigned(_) => Ok(DataType::UInt8),
567            SQLDataType::SmallIntUnsigned(_) | SQLDataType::Int2Unsigned(_) => {
568                Ok(DataType::UInt16)
569            }
570            SQLDataType::IntUnsigned(_)
571            | SQLDataType::IntegerUnsigned(_)
572            | SQLDataType::Int4Unsigned(_) => Ok(DataType::UInt32),
573            SQLDataType::Varchar(length) => {
574                match (length, self.options.support_varchar_with_length) {
575                    (Some(_), false) => plan_err!(
576                        "does not support Varchar with length, \
577                    please set `support_varchar_with_length` to be true"
578                    ),
579                    _ => {
580                        if self.options.map_varchar_to_utf8view {
581                            Ok(DataType::Utf8View)
582                        } else {
583                            Ok(DataType::Utf8)
584                        }
585                    }
586                }
587            }
588            SQLDataType::BigIntUnsigned(_) | SQLDataType::Int8Unsigned(_) => {
589                Ok(DataType::UInt64)
590            }
591            SQLDataType::Float(_) => Ok(DataType::Float32),
592            SQLDataType::Real | SQLDataType::Float4 => Ok(DataType::Float32),
593            SQLDataType::Double(ExactNumberInfo::None)
594            | SQLDataType::DoublePrecision
595            | SQLDataType::Float8 => Ok(DataType::Float64),
596            SQLDataType::Double(
597                ExactNumberInfo::Precision(_) | ExactNumberInfo::PrecisionAndScale(_, _),
598            ) => {
599                not_impl_err!(
600                    "Unsupported SQL type (precision/scale not supported) {sql_type}"
601                )
602            }
603            SQLDataType::Char(_) | SQLDataType::Text | SQLDataType::String(_) => {
604                Ok(DataType::Utf8)
605            }
606            SQLDataType::Timestamp(precision, tz_info)
607                if precision.is_none() || [0, 3, 6, 9].contains(&precision.unwrap()) =>
608            {
609                let tz = if matches!(tz_info, TimezoneInfo::Tz)
610                    || matches!(tz_info, TimezoneInfo::WithTimeZone)
611                {
612                    // Timestamp With Time Zone
613                    // INPUT : [SQLDataType]   TimestampTz + [Config] Time Zone
614                    // OUTPUT: [ArrowDataType] Timestamp<TimeUnit, Some(Time Zone)>
615                    self.context_provider.options().execution.time_zone.clone()
616                } else {
617                    // Timestamp Without Time zone
618                    None
619                };
620                let precision = match precision {
621                    Some(0) => TimeUnit::Second,
622                    Some(3) => TimeUnit::Millisecond,
623                    Some(6) => TimeUnit::Microsecond,
624                    None | Some(9) => TimeUnit::Nanosecond,
625                    _ => unreachable!(),
626                };
627                Ok(DataType::Timestamp(precision, tz.map(Into::into)))
628            }
629            SQLDataType::Date => Ok(DataType::Date32),
630            SQLDataType::Time(None, tz_info) => {
631                if matches!(tz_info, TimezoneInfo::None)
632                    || matches!(tz_info, TimezoneInfo::WithoutTimeZone)
633                {
634                    Ok(DataType::Time64(TimeUnit::Nanosecond))
635                } else {
636                    // We don't support TIMETZ and TIME WITH TIME ZONE for now
637                    not_impl_err!("Unsupported SQL type {sql_type:?}")
638                }
639            }
640            SQLDataType::Numeric(exact_number_info)
641            | SQLDataType::Decimal(exact_number_info) => {
642                let (precision, scale) = match *exact_number_info {
643                    ExactNumberInfo::None => (None, None),
644                    ExactNumberInfo::Precision(precision) => (Some(precision), None),
645                    ExactNumberInfo::PrecisionAndScale(precision, scale) => {
646                        (Some(precision), Some(scale))
647                    }
648                };
649                make_decimal_type(precision, scale)
650            }
651            SQLDataType::Bytea => Ok(DataType::Binary),
652            SQLDataType::Interval => Ok(DataType::Interval(IntervalUnit::MonthDayNano)),
653            SQLDataType::Struct(fields, _) => {
654                let fields = fields
655                    .iter()
656                    .enumerate()
657                    .map(|(idx, field)| {
658                        let data_type = self.convert_data_type(&field.field_type)?;
659                        let field_name = match &field.field_name {
660                            Some(ident) => ident.clone(),
661                            None => Ident::new(format!("c{idx}")),
662                        };
663                        Ok(Arc::new(Field::new(
664                            self.ident_normalizer.normalize(field_name),
665                            data_type,
666                            true,
667                        )))
668                    })
669                    .collect::<Result<Vec<_>>>()?;
670                Ok(DataType::Struct(Fields::from(fields)))
671            }
672            SQLDataType::Nvarchar(_)
673            | SQLDataType::JSON
674            | SQLDataType::Uuid
675            | SQLDataType::Binary(_)
676            | SQLDataType::Varbinary(_)
677            | SQLDataType::Blob(_)
678            | SQLDataType::Datetime(_)
679            | SQLDataType::Regclass
680            | SQLDataType::Custom(_, _)
681            | SQLDataType::Array(_)
682            | SQLDataType::Enum(_, _)
683            | SQLDataType::Set(_)
684            | SQLDataType::MediumInt(_)
685            | SQLDataType::MediumIntUnsigned(_)
686            | SQLDataType::Character(_)
687            | SQLDataType::CharacterVarying(_)
688            | SQLDataType::CharVarying(_)
689            | SQLDataType::CharacterLargeObject(_)
690            | SQLDataType::CharLargeObject(_)
691            | SQLDataType::Timestamp(_, _)
692            | SQLDataType::Time(Some(_), _)
693            | SQLDataType::Dec(_)
694            | SQLDataType::BigNumeric(_)
695            | SQLDataType::BigDecimal(_)
696            | SQLDataType::Clob(_)
697            | SQLDataType::Bytes(_)
698            | SQLDataType::Int64
699            | SQLDataType::Float64
700            | SQLDataType::JSONB
701            | SQLDataType::Unspecified
702            | SQLDataType::Int16
703            | SQLDataType::Int32
704            | SQLDataType::Int128
705            | SQLDataType::Int256
706            | SQLDataType::UInt8
707            | SQLDataType::UInt16
708            | SQLDataType::UInt32
709            | SQLDataType::UInt64
710            | SQLDataType::UInt128
711            | SQLDataType::UInt256
712            | SQLDataType::Float32
713            | SQLDataType::Date32
714            | SQLDataType::Datetime64(_, _)
715            | SQLDataType::FixedString(_)
716            | SQLDataType::Map(_, _)
717            | SQLDataType::Tuple(_)
718            | SQLDataType::Nested(_)
719            | SQLDataType::Union(_)
720            | SQLDataType::Nullable(_)
721            | SQLDataType::LowCardinality(_)
722            | SQLDataType::Trigger
723            | SQLDataType::TinyBlob
724            | SQLDataType::MediumBlob
725            | SQLDataType::LongBlob
726            | SQLDataType::TinyText
727            | SQLDataType::MediumText
728            | SQLDataType::LongText
729            | SQLDataType::Bit(_)
730            | SQLDataType::BitVarying(_)
731            | SQLDataType::Signed
732            | SQLDataType::SignedInteger
733            | SQLDataType::Unsigned
734            | SQLDataType::UnsignedInteger
735            | SQLDataType::AnyType
736            | SQLDataType::Table(_)
737            | SQLDataType::VarBit(_)
738            | SQLDataType::GeometricType(_) => {
739                not_impl_err!("Unsupported SQL type {sql_type:?}")
740            }
741        }
742    }
743
744    pub(crate) fn object_name_to_table_reference(
745        &self,
746        object_name: ObjectName,
747    ) -> Result<TableReference> {
748        object_name_to_table_reference(
749            object_name,
750            self.options.enable_ident_normalization,
751        )
752    }
753}
754
755/// Create a [`TableReference`] after normalizing the specified ObjectName
756///
757/// Examples
758/// ```text
759/// ['foo']          -> Bare { table: "foo" }
760/// ['"foo.bar"]]    -> Bare { table: "foo.bar" }
761/// ['foo', 'Bar']   -> Partial { schema: "foo", table: "bar" } <-- note lower case "bar"
762/// ['foo', 'bar']   -> Partial { schema: "foo", table: "bar" }
763/// ['foo', '"Bar"'] -> Partial { schema: "foo", table: "Bar" }
764/// ```
765pub fn object_name_to_table_reference(
766    object_name: ObjectName,
767    enable_normalization: bool,
768) -> Result<TableReference> {
769    // Use destructure to make it clear no fields on ObjectName are ignored
770    let ObjectName(object_name_parts) = object_name;
771    let idents = object_name_parts
772        .into_iter()
773        .map(|object_name_part| {
774            object_name_part.as_ident().cloned().ok_or_else(|| {
775                plan_datafusion_err!(
776                    "Expected identifier, but found: {:?}",
777                    object_name_part
778                )
779            })
780        })
781        .collect::<Result<Vec<_>>>()?;
782    idents_to_table_reference(idents, enable_normalization)
783}
784
785struct IdentTaker {
786    normalizer: IdentNormalizer,
787    idents: Vec<Ident>,
788}
789
790/// Take the next identifier from the back of idents, panic'ing if
791/// there are none left
792impl IdentTaker {
793    fn new(idents: Vec<Ident>, enable_normalization: bool) -> Self {
794        Self {
795            normalizer: IdentNormalizer::new(enable_normalization),
796            idents,
797        }
798    }
799
800    fn take(&mut self) -> String {
801        let ident = self.idents.pop().expect("no more identifiers");
802        self.normalizer.normalize(ident)
803    }
804
805    /// Returns the number of remaining identifiers
806    fn len(&self) -> usize {
807        self.idents.len()
808    }
809}
810
811// impl Display for a nicer error message
812impl std::fmt::Display for IdentTaker {
813    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
814        let mut first = true;
815        for ident in self.idents.iter() {
816            if !first {
817                write!(f, ".")?;
818            }
819            write!(f, "{}", ident)?;
820            first = false;
821        }
822
823        Ok(())
824    }
825}
826
827/// Create a [`TableReference`] after normalizing the specified identifier
828pub(crate) fn idents_to_table_reference(
829    idents: Vec<Ident>,
830    enable_normalization: bool,
831) -> Result<TableReference> {
832    let mut taker = IdentTaker::new(idents, enable_normalization);
833
834    match taker.len() {
835        1 => {
836            let table = taker.take();
837            Ok(TableReference::bare(table))
838        }
839        2 => {
840            let table = taker.take();
841            let schema = taker.take();
842            Ok(TableReference::partial(schema, table))
843        }
844        3 => {
845            let table = taker.take();
846            let schema = taker.take();
847            let catalog = taker.take();
848            Ok(TableReference::full(catalog, schema, table))
849        }
850        _ => plan_err!(
851            "Unsupported compound identifier '{}'. Expected 1, 2 or 3 parts, got {}",
852            taker,
853            taker.len()
854        ),
855    }
856}
857
858/// Construct a WHERE qualifier suitable for e.g. information_schema filtering
859/// from the provided object identifiers (catalog, schema and table names).
860pub fn object_name_to_qualifier(
861    sql_table_name: &ObjectName,
862    enable_normalization: bool,
863) -> Result<String> {
864    let columns = vec!["table_name", "table_schema", "table_catalog"].into_iter();
865    let normalizer = IdentNormalizer::new(enable_normalization);
866    sql_table_name
867        .0
868        .iter()
869        .rev()
870        .zip(columns)
871        .map(|(object_name_part, column_name)| {
872            object_name_part
873                .as_ident()
874                .map(|ident| {
875                    format!(
876                        r#"{} = '{}'"#,
877                        column_name,
878                        normalizer.normalize(ident.clone())
879                    )
880                })
881                .ok_or_else(|| {
882                    plan_datafusion_err!(
883                        "Expected identifier, but found: {:?}",
884                        object_name_part
885                    )
886                })
887        })
888        .collect::<Result<Vec<_>>>()
889        .map(|parts| parts.join(" AND "))
890}