datafusion_common/
dfschema.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! DFSchema is an extended schema struct that DataFusion uses to provide support for
19//! fields with optional relation names.
20
21use std::collections::{BTreeSet, HashMap, HashSet};
22use std::fmt::{Display, Formatter};
23use std::hash::Hash;
24use std::sync::Arc;
25
26use crate::error::{DataFusionError, Result, _plan_err, _schema_err};
27use crate::{
28    field_not_found, unqualified_field_not_found, Column, FunctionalDependencies,
29    SchemaError, TableReference,
30};
31
32use arrow::compute::can_cast_types;
33use arrow::datatypes::{
34    DataType, Field, FieldRef, Fields, Schema, SchemaBuilder, SchemaRef,
35};
36
37/// A reference-counted reference to a [DFSchema].
38pub type DFSchemaRef = Arc<DFSchema>;
39
40/// DFSchema wraps an Arrow schema and adds relation names.
41///
42/// The schema may hold the fields across multiple tables. Some fields may be
43/// qualified and some unqualified. A qualified field is a field that has a
44/// relation name associated with it.
45///
46/// Unqualified fields must be unique not only amongst themselves, but also must
47/// have a distinct name from any qualified field names. This allows finding a
48/// qualified field by name to be possible, so long as there aren't multiple
49/// qualified fields with the same name.
50///
51/// There is an alias to `Arc<DFSchema>` named [DFSchemaRef].
52///
53/// # Creating qualified schemas
54///
55/// Use [DFSchema::try_from_qualified_schema] to create a qualified schema from
56/// an Arrow schema.
57///
58/// ```rust
59/// use datafusion_common::{DFSchema, Column};
60/// use arrow::datatypes::{DataType, Field, Schema};
61///
62/// let arrow_schema = Schema::new(vec![
63///    Field::new("c1", DataType::Int32, false),
64/// ]);
65///
66/// let df_schema = DFSchema::try_from_qualified_schema("t1", &arrow_schema).unwrap();
67/// let column = Column::from_qualified_name("t1.c1");
68/// assert!(df_schema.has_column(&column));
69///
70/// // Can also access qualified fields with unqualified name, if it's unambiguous
71/// let column = Column::from_qualified_name("c1");
72/// assert!(df_schema.has_column(&column));
73/// ```
74///
75/// # Creating unqualified schemas
76///
77/// Create an unqualified schema using TryFrom:
78///
79/// ```rust
80/// use datafusion_common::{DFSchema, Column};
81/// use arrow::datatypes::{DataType, Field, Schema};
82///
83/// let arrow_schema = Schema::new(vec![
84///    Field::new("c1", DataType::Int32, false),
85/// ]);
86///
87/// let df_schema = DFSchema::try_from(arrow_schema).unwrap();
88/// let column = Column::new_unqualified("c1");
89/// assert!(df_schema.has_column(&column));
90/// ```
91///
92/// # Converting back to Arrow schema
93///
94/// Use the `Into` trait to convert `DFSchema` into an Arrow schema:
95///
96/// ```rust
97/// use datafusion_common::DFSchema;
98/// use arrow::datatypes::{Schema, Field};
99/// use std::collections::HashMap;
100///
101/// let df_schema = DFSchema::from_unqualified_fields(vec![
102///    Field::new("c1", arrow::datatypes::DataType::Int32, false),
103/// ].into(),HashMap::new()).unwrap();
104/// let schema = Schema::from(df_schema);
105/// assert_eq!(schema.fields().len(), 1);
106/// ```
107#[derive(Debug, Clone, PartialEq, Eq)]
108pub struct DFSchema {
109    /// Inner Arrow schema reference.
110    inner: SchemaRef,
111    /// Optional qualifiers for each column in this schema. In the same order as
112    /// the `self.inner.fields()`
113    field_qualifiers: Vec<Option<TableReference>>,
114    /// Stores functional dependencies in the schema.
115    functional_dependencies: FunctionalDependencies,
116}
117
118impl DFSchema {
119    /// Creates an empty `DFSchema`
120    pub fn empty() -> Self {
121        Self {
122            inner: Arc::new(Schema::new([])),
123            field_qualifiers: vec![],
124            functional_dependencies: FunctionalDependencies::empty(),
125        }
126    }
127
128    /// Return a reference to the inner Arrow [`Schema`]
129    ///
130    /// Note this does not have the qualifier information
131    pub fn as_arrow(&self) -> &Schema {
132        self.inner.as_ref()
133    }
134
135    /// Return a reference to the inner Arrow [`SchemaRef`]
136    ///
137    /// Note this does not have the qualifier information
138    pub fn inner(&self) -> &SchemaRef {
139        &self.inner
140    }
141
142    /// Create a `DFSchema` from an Arrow schema where all the fields have a given qualifier
143    pub fn new_with_metadata(
144        qualified_fields: Vec<(Option<TableReference>, Arc<Field>)>,
145        metadata: HashMap<String, String>,
146    ) -> Result<Self> {
147        let (qualifiers, fields): (Vec<Option<TableReference>>, Vec<Arc<Field>>) =
148            qualified_fields.into_iter().unzip();
149
150        let schema = Arc::new(Schema::new_with_metadata(fields, metadata));
151
152        let dfschema = Self {
153            inner: schema,
154            field_qualifiers: qualifiers,
155            functional_dependencies: FunctionalDependencies::empty(),
156        };
157        dfschema.check_names()?;
158        Ok(dfschema)
159    }
160
161    /// Create a new `DFSchema` from a list of Arrow [Field]s
162    pub fn from_unqualified_fields(
163        fields: Fields,
164        metadata: HashMap<String, String>,
165    ) -> Result<Self> {
166        let field_count = fields.len();
167        let schema = Arc::new(Schema::new_with_metadata(fields, metadata));
168        let dfschema = Self {
169            inner: schema,
170            field_qualifiers: vec![None; field_count],
171            functional_dependencies: FunctionalDependencies::empty(),
172        };
173        dfschema.check_names()?;
174        Ok(dfschema)
175    }
176
177    /// Create a `DFSchema` from an Arrow schema and a given qualifier
178    ///
179    /// To create a schema from an Arrow schema without a qualifier, use
180    /// `DFSchema::try_from`.
181    pub fn try_from_qualified_schema(
182        qualifier: impl Into<TableReference>,
183        schema: &Schema,
184    ) -> Result<Self> {
185        let qualifier = qualifier.into();
186        let schema = DFSchema {
187            inner: schema.clone().into(),
188            field_qualifiers: vec![Some(qualifier); schema.fields.len()],
189            functional_dependencies: FunctionalDependencies::empty(),
190        };
191        schema.check_names()?;
192        Ok(schema)
193    }
194
195    /// Create a `DFSchema` from an Arrow schema where all the fields have a given qualifier
196    pub fn from_field_specific_qualified_schema(
197        qualifiers: Vec<Option<TableReference>>,
198        schema: &SchemaRef,
199    ) -> Result<Self> {
200        let dfschema = Self {
201            inner: Arc::clone(schema),
202            field_qualifiers: qualifiers,
203            functional_dependencies: FunctionalDependencies::empty(),
204        };
205        dfschema.check_names()?;
206        Ok(dfschema)
207    }
208
209    /// Check if the schema have some fields with the same name
210    pub fn check_names(&self) -> Result<()> {
211        let mut qualified_names = BTreeSet::new();
212        let mut unqualified_names = BTreeSet::new();
213
214        for (field, qualifier) in self.inner.fields().iter().zip(&self.field_qualifiers) {
215            if let Some(qualifier) = qualifier {
216                if !qualified_names.insert((qualifier, field.name())) {
217                    return _schema_err!(SchemaError::DuplicateQualifiedField {
218                        qualifier: Box::new(qualifier.clone()),
219                        name: field.name().to_string(),
220                    });
221                }
222            } else if !unqualified_names.insert(field.name()) {
223                return _schema_err!(SchemaError::DuplicateUnqualifiedField {
224                    name: field.name().to_string()
225                });
226            }
227        }
228
229        for (qualifier, name) in qualified_names {
230            if unqualified_names.contains(name) {
231                return _schema_err!(SchemaError::AmbiguousReference {
232                    field: Column::new(Some(qualifier.clone()), name)
233                });
234            }
235        }
236        Ok(())
237    }
238
239    /// Assigns functional dependencies.
240    pub fn with_functional_dependencies(
241        mut self,
242        functional_dependencies: FunctionalDependencies,
243    ) -> Result<Self> {
244        if functional_dependencies.is_valid(self.inner.fields.len()) {
245            self.functional_dependencies = functional_dependencies;
246            Ok(self)
247        } else {
248            _plan_err!(
249                "Invalid functional dependency: {:?}",
250                functional_dependencies
251            )
252        }
253    }
254
255    /// Create a new schema that contains the fields from this schema followed by the fields
256    /// from the supplied schema. An error will be returned if there are duplicate field names.
257    pub fn join(&self, schema: &DFSchema) -> Result<Self> {
258        let mut schema_builder = SchemaBuilder::new();
259        schema_builder.extend(self.inner.fields().iter().cloned());
260        schema_builder.extend(schema.fields().iter().cloned());
261        let new_schema = schema_builder.finish();
262
263        let mut new_metadata = self.inner.metadata.clone();
264        new_metadata.extend(schema.inner.metadata.clone());
265        let new_schema_with_metadata = new_schema.with_metadata(new_metadata);
266
267        let mut new_qualifiers = self.field_qualifiers.clone();
268        new_qualifiers.extend_from_slice(schema.field_qualifiers.as_slice());
269
270        let new_self = Self {
271            inner: Arc::new(new_schema_with_metadata),
272            field_qualifiers: new_qualifiers,
273            functional_dependencies: FunctionalDependencies::empty(),
274        };
275        new_self.check_names()?;
276        Ok(new_self)
277    }
278
279    /// Modify this schema by appending the fields from the supplied schema, ignoring any
280    /// duplicate fields.
281    pub fn merge(&mut self, other_schema: &DFSchema) {
282        if other_schema.inner.fields.is_empty() {
283            return;
284        }
285
286        let self_fields: HashSet<(Option<&TableReference>, &FieldRef)> =
287            self.iter().collect();
288        let self_unqualified_names: HashSet<&str> = self
289            .inner
290            .fields
291            .iter()
292            .map(|field| field.name().as_str())
293            .collect();
294
295        let mut schema_builder = SchemaBuilder::from(self.inner.fields.clone());
296        let mut qualifiers = Vec::new();
297        for (qualifier, field) in other_schema.iter() {
298            // skip duplicate columns
299            let duplicated_field = match qualifier {
300                Some(q) => self_fields.contains(&(Some(q), field)),
301                // for unqualified columns, check as unqualified name
302                None => self_unqualified_names.contains(field.name().as_str()),
303            };
304            if !duplicated_field {
305                schema_builder.push(Arc::clone(field));
306                qualifiers.push(qualifier.cloned());
307            }
308        }
309        let mut metadata = self.inner.metadata.clone();
310        metadata.extend(other_schema.inner.metadata.clone());
311
312        let finished = schema_builder.finish();
313        let finished_with_metadata = finished.with_metadata(metadata);
314        self.inner = finished_with_metadata.into();
315        self.field_qualifiers.extend(qualifiers);
316    }
317
318    /// Get a list of fields
319    pub fn fields(&self) -> &Fields {
320        &self.inner.fields
321    }
322
323    /// Returns an immutable reference of a specific `Field` instance selected using an
324    /// offset within the internal `fields` vector
325    pub fn field(&self, i: usize) -> &Field {
326        &self.inner.fields[i]
327    }
328
329    /// Returns an immutable reference of a specific `Field` instance selected using an
330    /// offset within the internal `fields` vector and its qualifier
331    pub fn qualified_field(&self, i: usize) -> (Option<&TableReference>, &Field) {
332        (self.field_qualifiers[i].as_ref(), self.field(i))
333    }
334
335    pub fn index_of_column_by_name(
336        &self,
337        qualifier: Option<&TableReference>,
338        name: &str,
339    ) -> Option<usize> {
340        let mut matches = self
341            .iter()
342            .enumerate()
343            .filter(|(_, (q, f))| match (qualifier, q) {
344                // field to lookup is qualified.
345                // current field is qualified and not shared between relations, compare both
346                // qualifier and name.
347                (Some(q), Some(field_q)) => q.resolved_eq(field_q) && f.name() == name,
348                // field to lookup is qualified but current field is unqualified.
349                (Some(_), None) => false,
350                // field to lookup is unqualified, no need to compare qualifier
351                (None, Some(_)) | (None, None) => f.name() == name,
352            })
353            .map(|(idx, _)| idx);
354        matches.next()
355    }
356
357    /// Find the index of the column with the given qualifier and name,
358    /// returning `None` if not found
359    ///
360    /// See [Self::index_of_column] for a version that returns an error if the
361    /// column is not found
362    pub fn maybe_index_of_column(&self, col: &Column) -> Option<usize> {
363        self.index_of_column_by_name(col.relation.as_ref(), &col.name)
364    }
365
366    /// Find the index of the column with the given qualifier and name,
367    /// returning `Err` if not found
368    ///
369    /// See [Self::maybe_index_of_column] for a version that returns `None` if
370    /// the column is not found
371    pub fn index_of_column(&self, col: &Column) -> Result<usize> {
372        self.maybe_index_of_column(col)
373            .ok_or_else(|| field_not_found(col.relation.clone(), &col.name, self))
374    }
375
376    /// Check if the column is in the current schema
377    pub fn is_column_from_schema(&self, col: &Column) -> bool {
378        self.index_of_column_by_name(col.relation.as_ref(), &col.name)
379            .is_some()
380    }
381
382    /// Find the field with the given name
383    pub fn field_with_name(
384        &self,
385        qualifier: Option<&TableReference>,
386        name: &str,
387    ) -> Result<&Field> {
388        if let Some(qualifier) = qualifier {
389            self.field_with_qualified_name(qualifier, name)
390        } else {
391            self.field_with_unqualified_name(name)
392        }
393    }
394
395    /// Find the qualified field with the given name
396    pub fn qualified_field_with_name(
397        &self,
398        qualifier: Option<&TableReference>,
399        name: &str,
400    ) -> Result<(Option<&TableReference>, &Field)> {
401        if let Some(qualifier) = qualifier {
402            let idx = self
403                .index_of_column_by_name(Some(qualifier), name)
404                .ok_or_else(|| field_not_found(Some(qualifier.clone()), name, self))?;
405            Ok((self.field_qualifiers[idx].as_ref(), self.field(idx)))
406        } else {
407            self.qualified_field_with_unqualified_name(name)
408        }
409    }
410
411    /// Find all fields having the given qualifier
412    pub fn fields_with_qualified(&self, qualifier: &TableReference) -> Vec<&Field> {
413        self.iter()
414            .filter(|(q, _)| q.map(|q| q.eq(qualifier)).unwrap_or(false))
415            .map(|(_, f)| f.as_ref())
416            .collect()
417    }
418
419    /// Find all fields indices having the given qualifier
420    pub fn fields_indices_with_qualified(
421        &self,
422        qualifier: &TableReference,
423    ) -> Vec<usize> {
424        self.iter()
425            .enumerate()
426            .filter_map(|(idx, (q, _))| q.and_then(|q| q.eq(qualifier).then_some(idx)))
427            .collect()
428    }
429
430    /// Find all fields that match the given name
431    pub fn fields_with_unqualified_name(&self, name: &str) -> Vec<&Field> {
432        self.fields()
433            .iter()
434            .filter(|field| field.name() == name)
435            .map(|f| f.as_ref())
436            .collect()
437    }
438
439    /// Find all fields that match the given name and return them with their qualifier
440    pub fn qualified_fields_with_unqualified_name(
441        &self,
442        name: &str,
443    ) -> Vec<(Option<&TableReference>, &Field)> {
444        self.iter()
445            .filter(|(_, field)| field.name() == name)
446            .map(|(qualifier, field)| (qualifier, field.as_ref()))
447            .collect()
448    }
449
450    /// Find all fields that match the given name and convert to column
451    pub fn columns_with_unqualified_name(&self, name: &str) -> Vec<Column> {
452        self.iter()
453            .filter(|(_, field)| field.name() == name)
454            .map(|(qualifier, field)| Column::new(qualifier.cloned(), field.name()))
455            .collect()
456    }
457
458    /// Return all `Column`s for the schema
459    pub fn columns(&self) -> Vec<Column> {
460        self.iter()
461            .map(|(qualifier, field)| {
462                Column::new(qualifier.cloned(), field.name().clone())
463            })
464            .collect()
465    }
466
467    /// Find the qualified field with the given unqualified name
468    pub fn qualified_field_with_unqualified_name(
469        &self,
470        name: &str,
471    ) -> Result<(Option<&TableReference>, &Field)> {
472        let matches = self.qualified_fields_with_unqualified_name(name);
473        match matches.len() {
474            0 => Err(unqualified_field_not_found(name, self)),
475            1 => Ok((matches[0].0, matches[0].1)),
476            _ => {
477                // When `matches` size > 1, it doesn't necessarily mean an `ambiguous name` problem.
478                // Because name may generate from Alias/... . It means that it don't own qualifier.
479                // For example:
480                //             Join on id = b.id
481                // Project a.id as id   TableScan b id
482                // In this case, there isn't `ambiguous name` problem. When `matches` just contains
483                // one field without qualifier, we should return it.
484                let fields_without_qualifier = matches
485                    .iter()
486                    .filter(|(q, _)| q.is_none())
487                    .collect::<Vec<_>>();
488                if fields_without_qualifier.len() == 1 {
489                    Ok((fields_without_qualifier[0].0, fields_without_qualifier[0].1))
490                } else {
491                    _schema_err!(SchemaError::AmbiguousReference {
492                        field: Column::new_unqualified(name.to_string(),),
493                    })
494                }
495            }
496        }
497    }
498
499    /// Find the field with the given name
500    pub fn field_with_unqualified_name(&self, name: &str) -> Result<&Field> {
501        self.qualified_field_with_unqualified_name(name)
502            .map(|(_, field)| field)
503    }
504
505    /// Find the field with the given qualified name
506    pub fn field_with_qualified_name(
507        &self,
508        qualifier: &TableReference,
509        name: &str,
510    ) -> Result<&Field> {
511        let idx = self
512            .index_of_column_by_name(Some(qualifier), name)
513            .ok_or_else(|| field_not_found(Some(qualifier.clone()), name, self))?;
514
515        Ok(self.field(idx))
516    }
517
518    /// Find the field with the given qualified column
519    pub fn qualified_field_from_column(
520        &self,
521        column: &Column,
522    ) -> Result<(Option<&TableReference>, &Field)> {
523        self.qualified_field_with_name(column.relation.as_ref(), &column.name)
524    }
525
526    /// Find if the field exists with the given name
527    pub fn has_column_with_unqualified_name(&self, name: &str) -> bool {
528        self.fields().iter().any(|field| field.name() == name)
529    }
530
531    /// Find if the field exists with the given qualified name
532    pub fn has_column_with_qualified_name(
533        &self,
534        qualifier: &TableReference,
535        name: &str,
536    ) -> bool {
537        self.iter()
538            .any(|(q, f)| q.map(|q| q.eq(qualifier)).unwrap_or(false) && f.name() == name)
539    }
540
541    /// Find if the field exists with the given qualified column
542    pub fn has_column(&self, column: &Column) -> bool {
543        match &column.relation {
544            Some(r) => self.has_column_with_qualified_name(r, &column.name),
545            None => self.has_column_with_unqualified_name(&column.name),
546        }
547    }
548
549    /// Check to see if unqualified field names matches field names in Arrow schema
550    pub fn matches_arrow_schema(&self, arrow_schema: &Schema) -> bool {
551        self.inner
552            .fields
553            .iter()
554            .zip(arrow_schema.fields().iter())
555            .all(|(dffield, arrowfield)| dffield.name() == arrowfield.name())
556    }
557
558    /// Check to see if fields in 2 Arrow schemas are compatible
559    #[deprecated(since = "47.0.0", note = "This method is no longer used")]
560    pub fn check_arrow_schema_type_compatible(
561        &self,
562        arrow_schema: &Schema,
563    ) -> Result<()> {
564        let self_arrow_schema: Schema = self.into();
565        self_arrow_schema
566            .fields()
567            .iter()
568            .zip(arrow_schema.fields().iter())
569            .try_for_each(|(l_field, r_field)| {
570                if !can_cast_types(r_field.data_type(), l_field.data_type()) {
571                    _plan_err!("Column {} (type: {}) is not compatible with column {} (type: {})",
572                                r_field.name(),
573                                r_field.data_type(),
574                                l_field.name(),
575                                l_field.data_type())
576                } else {
577                    Ok(())
578                }
579            })
580    }
581
582    /// Returns true if the two schemas have the same qualified named
583    /// fields with logically equivalent data types. Returns false otherwise.
584    ///
585    /// Use [DFSchema]::equivalent_names_and_types for stricter semantic type
586    /// equivalence checking.
587    pub fn logically_equivalent_names_and_types(&self, other: &Self) -> bool {
588        if self.fields().len() != other.fields().len() {
589            return false;
590        }
591        let self_fields = self.iter();
592        let other_fields = other.iter();
593        self_fields.zip(other_fields).all(|((q1, f1), (q2, f2))| {
594            q1 == q2
595                && f1.name() == f2.name()
596                && Self::datatype_is_logically_equal(f1.data_type(), f2.data_type())
597        })
598    }
599
600    #[deprecated(since = "47.0.0", note = "Use has_equivalent_names_and_types` instead")]
601    pub fn equivalent_names_and_types(&self, other: &Self) -> bool {
602        self.has_equivalent_names_and_types(other).is_ok()
603    }
604
605    /// Returns Ok if the two schemas have the same qualified named
606    /// fields with the compatible data types.
607    ///
608    /// Returns an `Err` with a message otherwise.
609    ///
610    /// This is a specialized version of Eq that ignores differences in
611    /// nullability and metadata.
612    ///
613    /// Use [DFSchema]::logically_equivalent_names_and_types for a weaker
614    /// logical type checking, which for example would consider a dictionary
615    /// encoded UTF8 array to be equivalent to a plain UTF8 array.
616    pub fn has_equivalent_names_and_types(&self, other: &Self) -> Result<()> {
617        // case 1 : schema length mismatch
618        if self.fields().len() != other.fields().len() {
619            _plan_err!(
620                "Schema mismatch: the schema length are not same \
621            Expected schema length: {}, got: {}",
622                self.fields().len(),
623                other.fields().len()
624            )
625        } else {
626            // case 2 : schema length match, but fields mismatch
627            // check if the fields name are the same and have the same data types
628            self.fields()
629                .iter()
630                .zip(other.fields().iter())
631                .try_for_each(|(f1, f2)| {
632                    if f1.name() != f2.name()
633                        || (!DFSchema::datatype_is_semantically_equal(
634                            f1.data_type(),
635                            f2.data_type(),
636                        ))
637                    {
638                        _plan_err!(
639                            "Schema mismatch: Expected field '{}' with type {:?}, \
640                            but got '{}' with type {:?}.",
641                            f1.name(),
642                            f1.data_type(),
643                            f2.name(),
644                            f2.data_type()
645                        )
646                    } else {
647                        Ok(())
648                    }
649                })
650        }
651    }
652
653    /// Checks if two [`DataType`]s are logically equal. This is a notably weaker constraint
654    /// than datatype_is_semantically_equal in that different representations of same data can be
655    /// logically but not semantically equivalent. Semantically equivalent types are always also
656    /// logically equivalent. For example:
657    /// - a Dictionary<K,V> type is logically equal to a plain V type
658    /// - a Dictionary<K1, V1> is also logically equal to Dictionary<K2, V1>
659    /// - Utf8 and Utf8View are logically equal
660    pub fn datatype_is_logically_equal(dt1: &DataType, dt2: &DataType) -> bool {
661        // check nested fields
662        match (dt1, dt2) {
663            (DataType::Dictionary(_, v1), DataType::Dictionary(_, v2)) => {
664                v1.as_ref() == v2.as_ref()
665            }
666            (DataType::Dictionary(_, v1), othertype) => v1.as_ref() == othertype,
667            (othertype, DataType::Dictionary(_, v1)) => v1.as_ref() == othertype,
668            (DataType::List(f1), DataType::List(f2))
669            | (DataType::LargeList(f1), DataType::LargeList(f2))
670            | (DataType::FixedSizeList(f1, _), DataType::FixedSizeList(f2, _)) => {
671                // Don't compare the names of the technical inner field
672                // Usually "item" but that's not mandated
673                Self::datatype_is_logically_equal(f1.data_type(), f2.data_type())
674            }
675            (DataType::Map(f1, _), DataType::Map(f2, _)) => {
676                // Don't compare the names of the technical inner fields
677                // Usually "entries", "key", "value" but that's not mandated
678                match (f1.data_type(), f2.data_type()) {
679                    (DataType::Struct(f1_inner), DataType::Struct(f2_inner)) => {
680                        f1_inner.len() == f2_inner.len()
681                            && f1_inner.iter().zip(f2_inner.iter()).all(|(f1, f2)| {
682                                Self::datatype_is_logically_equal(
683                                    f1.data_type(),
684                                    f2.data_type(),
685                                )
686                            })
687                    }
688                    _ => panic!("Map type should have an inner struct field"),
689                }
690            }
691            (DataType::Struct(fields1), DataType::Struct(fields2)) => {
692                let iter1 = fields1.iter();
693                let iter2 = fields2.iter();
694                fields1.len() == fields2.len() &&
695                        // all fields have to be the same
696                    iter1
697                    .zip(iter2)
698                        .all(|(f1, f2)| Self::field_is_logically_equal(f1, f2))
699            }
700            (DataType::Union(fields1, _), DataType::Union(fields2, _)) => {
701                let iter1 = fields1.iter();
702                let iter2 = fields2.iter();
703                fields1.len() == fields2.len() &&
704                    // all fields have to be the same
705                    iter1
706                        .zip(iter2)
707                        .all(|((t1, f1), (t2, f2))| t1 == t2 && Self::field_is_logically_equal(f1, f2))
708            }
709            // Utf8 and Utf8View are logically equivalent
710            (DataType::Utf8, DataType::Utf8View) => true,
711            (DataType::Utf8View, DataType::Utf8) => true,
712            _ => Self::datatype_is_semantically_equal(dt1, dt2),
713        }
714    }
715
716    /// Returns true of two [`DataType`]s are semantically equal (same
717    /// name and type), ignoring both metadata and nullability, and decimal precision/scale.
718    ///
719    /// request to upstream: <https://github.com/apache/arrow-rs/issues/3199>
720    pub fn datatype_is_semantically_equal(dt1: &DataType, dt2: &DataType) -> bool {
721        // check nested fields
722        match (dt1, dt2) {
723            (DataType::Dictionary(k1, v1), DataType::Dictionary(k2, v2)) => {
724                Self::datatype_is_semantically_equal(k1.as_ref(), k2.as_ref())
725                    && Self::datatype_is_semantically_equal(v1.as_ref(), v2.as_ref())
726            }
727            (DataType::List(f1), DataType::List(f2))
728            | (DataType::LargeList(f1), DataType::LargeList(f2))
729            | (DataType::FixedSizeList(f1, _), DataType::FixedSizeList(f2, _)) => {
730                // Don't compare the names of the technical inner field
731                // Usually "item" but that's not mandated
732                Self::datatype_is_semantically_equal(f1.data_type(), f2.data_type())
733            }
734            (DataType::Map(f1, _), DataType::Map(f2, _)) => {
735                // Don't compare the names of the technical inner fields
736                // Usually "entries", "key", "value" but that's not mandated
737                match (f1.data_type(), f2.data_type()) {
738                    (DataType::Struct(f1_inner), DataType::Struct(f2_inner)) => {
739                        f1_inner.len() == f2_inner.len()
740                            && f1_inner.iter().zip(f2_inner.iter()).all(|(f1, f2)| {
741                                Self::datatype_is_semantically_equal(
742                                    f1.data_type(),
743                                    f2.data_type(),
744                                )
745                            })
746                    }
747                    _ => panic!("Map type should have an inner struct field"),
748                }
749            }
750            (DataType::Struct(fields1), DataType::Struct(fields2)) => {
751                let iter1 = fields1.iter();
752                let iter2 = fields2.iter();
753                fields1.len() == fields2.len() &&
754                        // all fields have to be the same
755                    iter1
756                    .zip(iter2)
757                        .all(|(f1, f2)| Self::field_is_semantically_equal(f1, f2))
758            }
759            (DataType::Union(fields1, _), DataType::Union(fields2, _)) => {
760                let iter1 = fields1.iter();
761                let iter2 = fields2.iter();
762                fields1.len() == fields2.len() &&
763                    // all fields have to be the same
764                    iter1
765                        .zip(iter2)
766                        .all(|((t1, f1), (t2, f2))| t1 == t2 && Self::field_is_semantically_equal(f1, f2))
767            }
768            (
769                DataType::Decimal128(_l_precision, _l_scale),
770                DataType::Decimal128(_r_precision, _r_scale),
771            ) => true,
772            (
773                DataType::Decimal256(_l_precision, _l_scale),
774                DataType::Decimal256(_r_precision, _r_scale),
775            ) => true,
776            _ => dt1 == dt2,
777        }
778    }
779
780    fn field_is_logically_equal(f1: &Field, f2: &Field) -> bool {
781        f1.name() == f2.name()
782            && Self::datatype_is_logically_equal(f1.data_type(), f2.data_type())
783    }
784
785    fn field_is_semantically_equal(f1: &Field, f2: &Field) -> bool {
786        f1.name() == f2.name()
787            && Self::datatype_is_semantically_equal(f1.data_type(), f2.data_type())
788    }
789
790    /// Strip all field qualifier in schema
791    pub fn strip_qualifiers(self) -> Self {
792        DFSchema {
793            field_qualifiers: vec![None; self.inner.fields.len()],
794            inner: self.inner,
795            functional_dependencies: self.functional_dependencies,
796        }
797    }
798
799    /// Replace all field qualifier with new value in schema
800    pub fn replace_qualifier(self, qualifier: impl Into<TableReference>) -> Self {
801        let qualifier = qualifier.into();
802        DFSchema {
803            field_qualifiers: vec![Some(qualifier); self.inner.fields.len()],
804            inner: self.inner,
805            functional_dependencies: self.functional_dependencies,
806        }
807    }
808
809    /// Get list of fully-qualified field names in this schema
810    pub fn field_names(&self) -> Vec<String> {
811        self.iter()
812            .map(|(qualifier, field)| qualified_name(qualifier, field.name()))
813            .collect::<Vec<_>>()
814    }
815
816    /// Get metadata of this schema
817    pub fn metadata(&self) -> &HashMap<String, String> {
818        &self.inner.metadata
819    }
820
821    /// Get functional dependencies
822    pub fn functional_dependencies(&self) -> &FunctionalDependencies {
823        &self.functional_dependencies
824    }
825
826    /// Iterate over the qualifiers and fields in the DFSchema
827    pub fn iter(&self) -> impl Iterator<Item = (Option<&TableReference>, &FieldRef)> {
828        self.field_qualifiers
829            .iter()
830            .zip(self.inner.fields().iter())
831            .map(|(qualifier, field)| (qualifier.as_ref(), field))
832    }
833}
834
835impl From<DFSchema> for Schema {
836    /// Convert DFSchema into a Schema
837    fn from(df_schema: DFSchema) -> Self {
838        let fields: Fields = df_schema.inner.fields.clone();
839        Schema::new_with_metadata(fields, df_schema.inner.metadata.clone())
840    }
841}
842
843impl From<&DFSchema> for Schema {
844    /// Convert DFSchema reference into a Schema
845    fn from(df_schema: &DFSchema) -> Self {
846        let fields: Fields = df_schema.inner.fields.clone();
847        Schema::new_with_metadata(fields, df_schema.inner.metadata.clone())
848    }
849}
850
851/// Allow DFSchema to be converted into an Arrow `&Schema`
852impl AsRef<Schema> for DFSchema {
853    fn as_ref(&self) -> &Schema {
854        self.as_arrow()
855    }
856}
857
858/// Allow DFSchema to be converted into an Arrow `&SchemaRef` (to clone, for
859/// example)
860impl AsRef<SchemaRef> for DFSchema {
861    fn as_ref(&self) -> &SchemaRef {
862        self.inner()
863    }
864}
865
866/// Create a `DFSchema` from an Arrow schema
867impl TryFrom<Schema> for DFSchema {
868    type Error = DataFusionError;
869    fn try_from(schema: Schema) -> Result<Self, Self::Error> {
870        Self::try_from(Arc::new(schema))
871    }
872}
873
874impl TryFrom<SchemaRef> for DFSchema {
875    type Error = DataFusionError;
876    fn try_from(schema: SchemaRef) -> Result<Self, Self::Error> {
877        let field_count = schema.fields.len();
878        let dfschema = Self {
879            inner: schema,
880            field_qualifiers: vec![None; field_count],
881            functional_dependencies: FunctionalDependencies::empty(),
882        };
883        Ok(dfschema)
884    }
885}
886
887impl From<DFSchema> for SchemaRef {
888    fn from(df_schema: DFSchema) -> Self {
889        SchemaRef::new(df_schema.into())
890    }
891}
892
893// Hashing refers to a subset of fields considered in PartialEq.
894impl Hash for DFSchema {
895    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
896        self.inner.fields.hash(state);
897        self.inner.metadata.len().hash(state); // HashMap is not hashable
898    }
899}
900
901/// Convenience trait to convert Schema like things to DFSchema and DFSchemaRef with fewer keystrokes
902pub trait ToDFSchema
903where
904    Self: Sized,
905{
906    /// Attempt to create a DSSchema
907    fn to_dfschema(self) -> Result<DFSchema>;
908
909    /// Attempt to create a DSSchemaRef
910    fn to_dfschema_ref(self) -> Result<DFSchemaRef> {
911        Ok(Arc::new(self.to_dfschema()?))
912    }
913}
914
915impl ToDFSchema for Schema {
916    fn to_dfschema(self) -> Result<DFSchema> {
917        DFSchema::try_from(self)
918    }
919}
920
921impl ToDFSchema for SchemaRef {
922    fn to_dfschema(self) -> Result<DFSchema> {
923        DFSchema::try_from(self)
924    }
925}
926
927impl ToDFSchema for Vec<Field> {
928    fn to_dfschema(self) -> Result<DFSchema> {
929        let field_count = self.len();
930        let schema = Schema {
931            fields: self.into(),
932            metadata: HashMap::new(),
933        };
934        let dfschema = DFSchema {
935            inner: schema.into(),
936            field_qualifiers: vec![None; field_count],
937            functional_dependencies: FunctionalDependencies::empty(),
938        };
939        Ok(dfschema)
940    }
941}
942
943impl Display for DFSchema {
944    fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
945        write!(
946            f,
947            "fields:[{}], metadata:{:?}",
948            self.iter()
949                .map(|(q, f)| qualified_name(q, f.name()))
950                .collect::<Vec<String>>()
951                .join(", "),
952            self.inner.metadata
953        )
954    }
955}
956
957/// Provides schema information needed by certain methods of `Expr`
958/// (defined in the datafusion-common crate).
959///
960/// Note that this trait is implemented for &[DFSchema] which is
961/// widely used in the DataFusion codebase.
962pub trait ExprSchema: std::fmt::Debug {
963    /// Is this column reference nullable?
964    fn nullable(&self, col: &Column) -> Result<bool> {
965        Ok(self.field_from_column(col)?.is_nullable())
966    }
967
968    /// What is the datatype of this column?
969    fn data_type(&self, col: &Column) -> Result<&DataType> {
970        Ok(self.field_from_column(col)?.data_type())
971    }
972
973    /// Returns the column's optional metadata.
974    fn metadata(&self, col: &Column) -> Result<&HashMap<String, String>> {
975        Ok(self.field_from_column(col)?.metadata())
976    }
977
978    /// Return the column's datatype and nullability
979    fn data_type_and_nullable(&self, col: &Column) -> Result<(&DataType, bool)> {
980        let field = self.field_from_column(col)?;
981        Ok((field.data_type(), field.is_nullable()))
982    }
983
984    // Return the column's field
985    fn field_from_column(&self, col: &Column) -> Result<&Field>;
986}
987
988// Implement `ExprSchema` for `Arc<DFSchema>`
989impl<P: AsRef<DFSchema> + std::fmt::Debug> ExprSchema for P {
990    fn nullable(&self, col: &Column) -> Result<bool> {
991        self.as_ref().nullable(col)
992    }
993
994    fn data_type(&self, col: &Column) -> Result<&DataType> {
995        self.as_ref().data_type(col)
996    }
997
998    fn metadata(&self, col: &Column) -> Result<&HashMap<String, String>> {
999        ExprSchema::metadata(self.as_ref(), col)
1000    }
1001
1002    fn data_type_and_nullable(&self, col: &Column) -> Result<(&DataType, bool)> {
1003        self.as_ref().data_type_and_nullable(col)
1004    }
1005
1006    fn field_from_column(&self, col: &Column) -> Result<&Field> {
1007        self.as_ref().field_from_column(col)
1008    }
1009}
1010
1011impl ExprSchema for DFSchema {
1012    fn field_from_column(&self, col: &Column) -> Result<&Field> {
1013        match &col.relation {
1014            Some(r) => self.field_with_qualified_name(r, &col.name),
1015            None => self.field_with_unqualified_name(&col.name),
1016        }
1017    }
1018}
1019
1020/// DataFusion-specific extensions to [`Schema`].
1021pub trait SchemaExt {
1022    /// This is a specialized version of Eq that ignores differences
1023    /// in nullability and metadata.
1024    ///
1025    /// It works the same as [`DFSchema::equivalent_names_and_types`].
1026    fn equivalent_names_and_types(&self, other: &Self) -> bool;
1027
1028    /// Returns nothing if the two schemas have the same qualified named
1029    /// fields with logically equivalent data types. Returns internal error otherwise.
1030    ///
1031    /// Use [DFSchema]::equivalent_names_and_types for stricter semantic type
1032    /// equivalence checking.
1033    ///
1034    /// It is only used by insert into cases.
1035    fn logically_equivalent_names_and_types(&self, other: &Self) -> Result<()>;
1036}
1037
1038impl SchemaExt for Schema {
1039    fn equivalent_names_and_types(&self, other: &Self) -> bool {
1040        if self.fields().len() != other.fields().len() {
1041            return false;
1042        }
1043
1044        self.fields()
1045            .iter()
1046            .zip(other.fields().iter())
1047            .all(|(f1, f2)| {
1048                f1.name() == f2.name()
1049                    && DFSchema::datatype_is_semantically_equal(
1050                        f1.data_type(),
1051                        f2.data_type(),
1052                    )
1053            })
1054    }
1055
1056    // It is only used by insert into cases.
1057    fn logically_equivalent_names_and_types(&self, other: &Self) -> Result<()> {
1058        // case 1 : schema length mismatch
1059        if self.fields().len() != other.fields().len() {
1060            _plan_err!(
1061                "Inserting query must have the same schema length as the table. \
1062            Expected table schema length: {}, got: {}",
1063                self.fields().len(),
1064                other.fields().len()
1065            )
1066        } else {
1067            // case 2 : schema length match, but fields mismatch
1068            // check if the fields name are the same and have the same data types
1069            self.fields()
1070                .iter()
1071                .zip(other.fields().iter())
1072                .try_for_each(|(f1, f2)| {
1073                    if f1.name() != f2.name() || (!DFSchema::datatype_is_logically_equal(f1.data_type(), f2.data_type()) && !can_cast_types(f2.data_type(), f1.data_type())) {
1074                        _plan_err!(
1075                            "Inserting query schema mismatch: Expected table field '{}' with type {:?}, \
1076                            but got '{}' with type {:?}.",
1077                            f1.name(),
1078                            f1.data_type(),
1079                            f2.name(),
1080                            f2.data_type())
1081                    } else {
1082                        Ok(())
1083                    }
1084                })
1085        }
1086    }
1087}
1088
1089pub fn qualified_name(qualifier: Option<&TableReference>, name: &str) -> String {
1090    match qualifier {
1091        Some(q) => format!("{q}.{name}"),
1092        None => name.to_string(),
1093    }
1094}
1095
1096#[cfg(test)]
1097mod tests {
1098    use crate::assert_contains;
1099
1100    use super::*;
1101
1102    #[test]
1103    fn qualifier_in_name() -> Result<()> {
1104        let col = Column::from_name("t1.c0");
1105        let schema = DFSchema::try_from_qualified_schema("t1", &test_schema_1())?;
1106        // lookup with unqualified name "t1.c0"
1107        let err = schema.index_of_column(&col).unwrap_err();
1108        let expected = "Schema error: No field named \"t1.c0\". \
1109            Column names are case sensitive. \
1110            You can use double quotes to refer to the \"\"t1.c0\"\" column \
1111            or set the datafusion.sql_parser.enable_ident_normalization configuration. \
1112            Did you mean 't1.c0'?.";
1113        assert_eq!(err.strip_backtrace(), expected);
1114        Ok(())
1115    }
1116
1117    #[test]
1118    fn quoted_qualifiers_in_name() -> Result<()> {
1119        let col = Column::from_name("t1.c0");
1120        let schema = DFSchema::try_from_qualified_schema(
1121            "t1",
1122            &Schema::new(vec![
1123                Field::new("CapitalColumn", DataType::Boolean, true),
1124                Field::new("field.with.period", DataType::Boolean, true),
1125            ]),
1126        )?;
1127
1128        // lookup with unqualified name "t1.c0"
1129        let err = schema.index_of_column(&col).unwrap_err();
1130        let expected = "Schema error: No field named \"t1.c0\". \
1131            Valid fields are t1.\"CapitalColumn\", t1.\"field.with.period\".";
1132        assert_eq!(err.strip_backtrace(), expected);
1133        Ok(())
1134    }
1135
1136    #[test]
1137    fn from_unqualified_schema() -> Result<()> {
1138        let schema = DFSchema::try_from(test_schema_1())?;
1139        assert_eq!("fields:[c0, c1], metadata:{}", schema.to_string());
1140        Ok(())
1141    }
1142
1143    #[test]
1144    fn from_qualified_schema() -> Result<()> {
1145        let schema = DFSchema::try_from_qualified_schema("t1", &test_schema_1())?;
1146        assert_eq!("fields:[t1.c0, t1.c1], metadata:{}", schema.to_string());
1147        Ok(())
1148    }
1149
1150    #[test]
1151    fn test_from_field_specific_qualified_schema() -> Result<()> {
1152        let schema = DFSchema::from_field_specific_qualified_schema(
1153            vec![Some("t1".into()), None],
1154            &Arc::new(Schema::new(vec![
1155                Field::new("c0", DataType::Boolean, true),
1156                Field::new("c1", DataType::Boolean, true),
1157            ])),
1158        )?;
1159        assert_eq!("fields:[t1.c0, c1], metadata:{}", schema.to_string());
1160        Ok(())
1161    }
1162
1163    #[test]
1164    fn test_from_qualified_fields() -> Result<()> {
1165        let schema = DFSchema::new_with_metadata(
1166            vec![
1167                (
1168                    Some("t0".into()),
1169                    Arc::new(Field::new("c0", DataType::Boolean, true)),
1170                ),
1171                (None, Arc::new(Field::new("c1", DataType::Boolean, true))),
1172            ],
1173            HashMap::new(),
1174        )?;
1175        assert_eq!("fields:[t0.c0, c1], metadata:{}", schema.to_string());
1176        Ok(())
1177    }
1178
1179    #[test]
1180    fn from_qualified_schema_into_arrow_schema() -> Result<()> {
1181        let schema = DFSchema::try_from_qualified_schema("t1", &test_schema_1())?;
1182        let arrow_schema: Schema = schema.into();
1183        let expected = "Field { name: \"c0\", data_type: Boolean, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, \
1184        Field { name: \"c1\", data_type: Boolean, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }";
1185        assert_eq!(expected, arrow_schema.to_string());
1186        Ok(())
1187    }
1188
1189    #[test]
1190    fn join_qualified() -> Result<()> {
1191        let left = DFSchema::try_from_qualified_schema("t1", &test_schema_1())?;
1192        let right = DFSchema::try_from_qualified_schema("t2", &test_schema_1())?;
1193        let join = left.join(&right)?;
1194        assert_eq!(
1195            "fields:[t1.c0, t1.c1, t2.c0, t2.c1], metadata:{}",
1196            join.to_string()
1197        );
1198        // test valid access
1199        assert!(join
1200            .field_with_qualified_name(&TableReference::bare("t1"), "c0")
1201            .is_ok());
1202        assert!(join
1203            .field_with_qualified_name(&TableReference::bare("t2"), "c0")
1204            .is_ok());
1205        // test invalid access
1206        assert!(join.field_with_unqualified_name("c0").is_err());
1207        assert!(join.field_with_unqualified_name("t1.c0").is_err());
1208        assert!(join.field_with_unqualified_name("t2.c0").is_err());
1209        Ok(())
1210    }
1211
1212    #[test]
1213    fn join_qualified_duplicate() -> Result<()> {
1214        let left = DFSchema::try_from_qualified_schema("t1", &test_schema_1())?;
1215        let right = DFSchema::try_from_qualified_schema("t1", &test_schema_1())?;
1216        let join = left.join(&right);
1217        assert_eq!(
1218            join.unwrap_err().strip_backtrace(),
1219            "Schema error: Schema contains duplicate qualified field name t1.c0",
1220        );
1221        Ok(())
1222    }
1223
1224    #[test]
1225    fn join_unqualified_duplicate() -> Result<()> {
1226        let left = DFSchema::try_from(test_schema_1())?;
1227        let right = DFSchema::try_from(test_schema_1())?;
1228        let join = left.join(&right);
1229        assert_eq!(
1230            join.unwrap_err().strip_backtrace(),
1231            "Schema error: Schema contains duplicate unqualified field name c0"
1232        );
1233        Ok(())
1234    }
1235
1236    #[test]
1237    fn join_mixed() -> Result<()> {
1238        let left = DFSchema::try_from_qualified_schema("t1", &test_schema_1())?;
1239        let right = DFSchema::try_from(test_schema_2())?;
1240        let join = left.join(&right)?;
1241        assert_eq!(
1242            "fields:[t1.c0, t1.c1, c100, c101], metadata:{}",
1243            join.to_string()
1244        );
1245        // test valid access
1246        assert!(join
1247            .field_with_qualified_name(&TableReference::bare("t1"), "c0")
1248            .is_ok());
1249        assert!(join.field_with_unqualified_name("c0").is_ok());
1250        assert!(join.field_with_unqualified_name("c100").is_ok());
1251        assert!(join.field_with_name(None, "c100").is_ok());
1252        // test invalid access
1253        assert!(join.field_with_unqualified_name("t1.c0").is_err());
1254        assert!(join.field_with_unqualified_name("t1.c100").is_err());
1255        assert!(join
1256            .field_with_qualified_name(&TableReference::bare(""), "c100")
1257            .is_err());
1258        Ok(())
1259    }
1260
1261    #[test]
1262    fn join_mixed_duplicate() -> Result<()> {
1263        let left = DFSchema::try_from_qualified_schema("t1", &test_schema_1())?;
1264        let right = DFSchema::try_from(test_schema_1())?;
1265        let join = left.join(&right);
1266        assert_contains!(join.unwrap_err().to_string(),
1267                         "Schema error: Schema contains qualified \
1268                          field name t1.c0 and unqualified field name c0 which would be ambiguous");
1269        Ok(())
1270    }
1271
1272    #[test]
1273    fn helpful_error_messages() -> Result<()> {
1274        let schema = DFSchema::try_from_qualified_schema("t1", &test_schema_1())?;
1275        let expected_help = "Valid fields are t1.c0, t1.c1.";
1276        assert_contains!(
1277            schema
1278                .field_with_qualified_name(&TableReference::bare("x"), "y")
1279                .unwrap_err()
1280                .to_string(),
1281            expected_help
1282        );
1283        assert_contains!(
1284            schema
1285                .field_with_unqualified_name("y")
1286                .unwrap_err()
1287                .to_string(),
1288            expected_help
1289        );
1290        assert!(schema.index_of_column_by_name(None, "y").is_none());
1291        assert!(schema.index_of_column_by_name(None, "t1.c0").is_none());
1292
1293        Ok(())
1294    }
1295
1296    #[test]
1297    fn select_without_valid_fields() {
1298        let schema = DFSchema::empty();
1299
1300        let col = Column::from_qualified_name("t1.c0");
1301        let err = schema.index_of_column(&col).unwrap_err();
1302        let expected = "Schema error: No field named t1.c0.";
1303        assert_eq!(err.strip_backtrace(), expected);
1304
1305        // the same check without qualifier
1306        let col = Column::from_name("c0");
1307        let err = schema.index_of_column(&col).err().unwrap();
1308        let expected = "Schema error: No field named c0.";
1309        assert_eq!(err.strip_backtrace(), expected);
1310    }
1311
1312    #[test]
1313    fn into() {
1314        // Demonstrate how to convert back and forth between Schema, SchemaRef, DFSchema, and DFSchemaRef
1315        let arrow_schema = Schema::new_with_metadata(
1316            vec![Field::new("c0", DataType::Int64, true)],
1317            test_metadata(),
1318        );
1319        let arrow_schema_ref = Arc::new(arrow_schema.clone());
1320
1321        let df_schema = DFSchema {
1322            inner: Arc::clone(&arrow_schema_ref),
1323            field_qualifiers: vec![None; arrow_schema_ref.fields.len()],
1324            functional_dependencies: FunctionalDependencies::empty(),
1325        };
1326        let df_schema_ref = Arc::new(df_schema.clone());
1327
1328        {
1329            let arrow_schema = arrow_schema.clone();
1330            let arrow_schema_ref = Arc::clone(&arrow_schema_ref);
1331
1332            assert_eq!(df_schema, arrow_schema.to_dfschema().unwrap());
1333            assert_eq!(df_schema, arrow_schema_ref.to_dfschema().unwrap());
1334        }
1335
1336        {
1337            let arrow_schema = arrow_schema.clone();
1338            let arrow_schema_ref = Arc::clone(&arrow_schema_ref);
1339
1340            assert_eq!(df_schema_ref, arrow_schema.to_dfschema_ref().unwrap());
1341            assert_eq!(df_schema_ref, arrow_schema_ref.to_dfschema_ref().unwrap());
1342        }
1343
1344        // Now, consume the refs
1345        assert_eq!(df_schema_ref, arrow_schema.to_dfschema_ref().unwrap());
1346        assert_eq!(df_schema_ref, arrow_schema_ref.to_dfschema_ref().unwrap());
1347    }
1348
1349    fn test_schema_1() -> Schema {
1350        Schema::new(vec![
1351            Field::new("c0", DataType::Boolean, true),
1352            Field::new("c1", DataType::Boolean, true),
1353        ])
1354    }
1355    #[test]
1356    fn test_dfschema_to_schema_conversion() {
1357        let mut a_metadata = HashMap::new();
1358        a_metadata.insert("key".to_string(), "value".to_string());
1359        let a_field = Field::new("a", DataType::Int64, false).with_metadata(a_metadata);
1360
1361        let mut b_metadata = HashMap::new();
1362        b_metadata.insert("key".to_string(), "value".to_string());
1363        let b_field = Field::new("b", DataType::Int64, false).with_metadata(b_metadata);
1364
1365        let schema = Arc::new(Schema::new(vec![a_field, b_field]));
1366
1367        let df_schema = DFSchema {
1368            inner: Arc::clone(&schema),
1369            field_qualifiers: vec![None; schema.fields.len()],
1370            functional_dependencies: FunctionalDependencies::empty(),
1371        };
1372
1373        assert_eq!(df_schema.inner.metadata(), schema.metadata())
1374    }
1375
1376    #[test]
1377    fn test_contain_column() -> Result<()> {
1378        // qualified exists
1379        {
1380            let col = Column::from_qualified_name("t1.c0");
1381            let schema = DFSchema::try_from_qualified_schema("t1", &test_schema_1())?;
1382            assert!(schema.is_column_from_schema(&col));
1383        }
1384
1385        // qualified not exists
1386        {
1387            let col = Column::from_qualified_name("t1.c2");
1388            let schema = DFSchema::try_from_qualified_schema("t1", &test_schema_1())?;
1389            assert!(!schema.is_column_from_schema(&col));
1390        }
1391
1392        // unqualified exists
1393        {
1394            let col = Column::from_name("c0");
1395            let schema = DFSchema::try_from_qualified_schema("t1", &test_schema_1())?;
1396            assert!(schema.is_column_from_schema(&col));
1397        }
1398
1399        // unqualified not exists
1400        {
1401            let col = Column::from_name("c2");
1402            let schema = DFSchema::try_from_qualified_schema("t1", &test_schema_1())?;
1403            assert!(!schema.is_column_from_schema(&col));
1404        }
1405
1406        Ok(())
1407    }
1408
1409    #[test]
1410    fn test_datatype_is_logically_equal() {
1411        assert!(DFSchema::datatype_is_logically_equal(
1412            &DataType::Int8,
1413            &DataType::Int8
1414        ));
1415
1416        assert!(!DFSchema::datatype_is_logically_equal(
1417            &DataType::Int8,
1418            &DataType::Int16
1419        ));
1420
1421        // Test lists
1422
1423        // Succeeds if both have the same element type, disregards names and nullability
1424        assert!(DFSchema::datatype_is_logically_equal(
1425            &DataType::List(Field::new_list_field(DataType::Int8, true).into()),
1426            &DataType::List(Field::new("element", DataType::Int8, false).into())
1427        ));
1428
1429        // Fails if element type is different
1430        assert!(!DFSchema::datatype_is_logically_equal(
1431            &DataType::List(Field::new_list_field(DataType::Int8, true).into()),
1432            &DataType::List(Field::new_list_field(DataType::Int16, true).into())
1433        ));
1434
1435        // Test maps
1436        let map_field = DataType::Map(
1437            Field::new(
1438                "entries",
1439                DataType::Struct(Fields::from(vec![
1440                    Field::new("key", DataType::Int8, false),
1441                    Field::new("value", DataType::Int8, true),
1442                ])),
1443                true,
1444            )
1445            .into(),
1446            true,
1447        );
1448
1449        // Succeeds if both maps have the same key and value types, disregards names and nullability
1450        assert!(DFSchema::datatype_is_logically_equal(
1451            &map_field,
1452            &DataType::Map(
1453                Field::new(
1454                    "pairs",
1455                    DataType::Struct(Fields::from(vec![
1456                        Field::new("one", DataType::Int8, false),
1457                        Field::new("two", DataType::Int8, false)
1458                    ])),
1459                    true
1460                )
1461                .into(),
1462                true
1463            )
1464        ));
1465        // Fails if value type is different
1466        assert!(!DFSchema::datatype_is_logically_equal(
1467            &map_field,
1468            &DataType::Map(
1469                Field::new(
1470                    "entries",
1471                    DataType::Struct(Fields::from(vec![
1472                        Field::new("key", DataType::Int8, false),
1473                        Field::new("value", DataType::Int16, true)
1474                    ])),
1475                    true
1476                )
1477                .into(),
1478                true
1479            )
1480        ));
1481
1482        // Fails if key type is different
1483        assert!(!DFSchema::datatype_is_logically_equal(
1484            &map_field,
1485            &DataType::Map(
1486                Field::new(
1487                    "entries",
1488                    DataType::Struct(Fields::from(vec![
1489                        Field::new("key", DataType::Int16, false),
1490                        Field::new("value", DataType::Int8, true)
1491                    ])),
1492                    true
1493                )
1494                .into(),
1495                true
1496            )
1497        ));
1498
1499        // Test structs
1500
1501        let struct_field = DataType::Struct(Fields::from(vec![
1502            Field::new("a", DataType::Int8, true),
1503            Field::new("b", DataType::Int8, true),
1504        ]));
1505
1506        // Succeeds if both have same names and datatypes, ignores nullability
1507        assert!(DFSchema::datatype_is_logically_equal(
1508            &struct_field,
1509            &DataType::Struct(Fields::from(vec![
1510                Field::new("a", DataType::Int8, false),
1511                Field::new("b", DataType::Int8, true),
1512            ]))
1513        ));
1514
1515        // Fails if field names are different
1516        assert!(!DFSchema::datatype_is_logically_equal(
1517            &struct_field,
1518            &DataType::Struct(Fields::from(vec![
1519                Field::new("x", DataType::Int8, true),
1520                Field::new("y", DataType::Int8, true),
1521            ]))
1522        ));
1523
1524        // Fails if types are different
1525        assert!(!DFSchema::datatype_is_logically_equal(
1526            &struct_field,
1527            &DataType::Struct(Fields::from(vec![
1528                Field::new("a", DataType::Int16, true),
1529                Field::new("b", DataType::Int8, true),
1530            ]))
1531        ));
1532
1533        // Fails if more or less fields
1534        assert!(!DFSchema::datatype_is_logically_equal(
1535            &struct_field,
1536            &DataType::Struct(Fields::from(vec![Field::new("a", DataType::Int8, true),]))
1537        ));
1538    }
1539
1540    #[test]
1541    fn test_datatype_is_logically_equivalent_to_dictionary() {
1542        // Dictionary is logically equal to its value type
1543        assert!(DFSchema::datatype_is_logically_equal(
1544            &DataType::Utf8,
1545            &DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8))
1546        ));
1547    }
1548
1549    #[test]
1550    fn test_datatype_is_semantically_equal() {
1551        assert!(DFSchema::datatype_is_semantically_equal(
1552            &DataType::Int8,
1553            &DataType::Int8
1554        ));
1555
1556        assert!(!DFSchema::datatype_is_semantically_equal(
1557            &DataType::Int8,
1558            &DataType::Int16
1559        ));
1560
1561        // Test lists
1562
1563        // Succeeds if both have the same element type, disregards names and nullability
1564        assert!(DFSchema::datatype_is_semantically_equal(
1565            &DataType::List(Field::new_list_field(DataType::Int8, true).into()),
1566            &DataType::List(Field::new("element", DataType::Int8, false).into())
1567        ));
1568
1569        // Fails if element type is different
1570        assert!(!DFSchema::datatype_is_semantically_equal(
1571            &DataType::List(Field::new_list_field(DataType::Int8, true).into()),
1572            &DataType::List(Field::new_list_field(DataType::Int16, true).into())
1573        ));
1574
1575        // Test maps
1576        let map_field = DataType::Map(
1577            Field::new(
1578                "entries",
1579                DataType::Struct(Fields::from(vec![
1580                    Field::new("key", DataType::Int8, false),
1581                    Field::new("value", DataType::Int8, true),
1582                ])),
1583                true,
1584            )
1585            .into(),
1586            true,
1587        );
1588
1589        // Succeeds if both maps have the same key and value types, disregards names and nullability
1590        assert!(DFSchema::datatype_is_semantically_equal(
1591            &map_field,
1592            &DataType::Map(
1593                Field::new(
1594                    "pairs",
1595                    DataType::Struct(Fields::from(vec![
1596                        Field::new("one", DataType::Int8, false),
1597                        Field::new("two", DataType::Int8, false)
1598                    ])),
1599                    true
1600                )
1601                .into(),
1602                true
1603            )
1604        ));
1605        // Fails if value type is different
1606        assert!(!DFSchema::datatype_is_semantically_equal(
1607            &map_field,
1608            &DataType::Map(
1609                Field::new(
1610                    "entries",
1611                    DataType::Struct(Fields::from(vec![
1612                        Field::new("key", DataType::Int8, false),
1613                        Field::new("value", DataType::Int16, true)
1614                    ])),
1615                    true
1616                )
1617                .into(),
1618                true
1619            )
1620        ));
1621
1622        // Fails if key type is different
1623        assert!(!DFSchema::datatype_is_semantically_equal(
1624            &map_field,
1625            &DataType::Map(
1626                Field::new(
1627                    "entries",
1628                    DataType::Struct(Fields::from(vec![
1629                        Field::new("key", DataType::Int16, false),
1630                        Field::new("value", DataType::Int8, true)
1631                    ])),
1632                    true
1633                )
1634                .into(),
1635                true
1636            )
1637        ));
1638
1639        // Test structs
1640
1641        let struct_field = DataType::Struct(Fields::from(vec![
1642            Field::new("a", DataType::Int8, true),
1643            Field::new("b", DataType::Int8, true),
1644        ]));
1645
1646        // Succeeds if both have same names and datatypes, ignores nullability
1647        assert!(DFSchema::datatype_is_logically_equal(
1648            &struct_field,
1649            &DataType::Struct(Fields::from(vec![
1650                Field::new("a", DataType::Int8, false),
1651                Field::new("b", DataType::Int8, true),
1652            ]))
1653        ));
1654
1655        // Fails if field names are different
1656        assert!(!DFSchema::datatype_is_logically_equal(
1657            &struct_field,
1658            &DataType::Struct(Fields::from(vec![
1659                Field::new("x", DataType::Int8, true),
1660                Field::new("y", DataType::Int8, true),
1661            ]))
1662        ));
1663
1664        // Fails if types are different
1665        assert!(!DFSchema::datatype_is_logically_equal(
1666            &struct_field,
1667            &DataType::Struct(Fields::from(vec![
1668                Field::new("a", DataType::Int16, true),
1669                Field::new("b", DataType::Int8, true),
1670            ]))
1671        ));
1672
1673        // Fails if more or less fields
1674        assert!(!DFSchema::datatype_is_logically_equal(
1675            &struct_field,
1676            &DataType::Struct(Fields::from(vec![Field::new("a", DataType::Int8, true),]))
1677        ));
1678    }
1679
1680    #[test]
1681    fn test_datatype_is_not_semantically_equivalent_to_dictionary() {
1682        // Dictionary is not semantically equal to its value type
1683        assert!(!DFSchema::datatype_is_semantically_equal(
1684            &DataType::Utf8,
1685            &DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8))
1686        ));
1687    }
1688
1689    fn test_schema_2() -> Schema {
1690        Schema::new(vec![
1691            Field::new("c100", DataType::Boolean, true),
1692            Field::new("c101", DataType::Boolean, true),
1693        ])
1694    }
1695
1696    fn test_metadata() -> HashMap<String, String> {
1697        test_metadata_n(2)
1698    }
1699
1700    fn test_metadata_n(n: usize) -> HashMap<String, String> {
1701        (0..n).map(|i| (format!("k{i}"), format!("v{i}"))).collect()
1702    }
1703}