datafusion_common/
functional_dependencies.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! FunctionalDependencies keeps track of functional dependencies
19//! inside DFSchema.
20
21use std::fmt::{Display, Formatter};
22use std::ops::Deref;
23use std::vec::IntoIter;
24
25use crate::utils::{merge_and_order_indices, set_difference};
26use crate::{DFSchema, HashSet, JoinType};
27
28/// This object defines a constraint on a table.
29#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash)]
30pub enum Constraint {
31    /// Columns with the given indices form a composite primary key (they are
32    /// jointly unique and not nullable):
33    PrimaryKey(Vec<usize>),
34    /// Columns with the given indices form a composite unique key:
35    Unique(Vec<usize>),
36}
37
38/// This object encapsulates a list of functional constraints:
39#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash)]
40pub struct Constraints {
41    inner: Vec<Constraint>,
42}
43
44impl Constraints {
45    /// Create empty constraints
46    pub fn empty() -> Self {
47        Constraints::new_unverified(vec![])
48    }
49
50    /// Create a new [`Constraints`] object from the given `constraints`.
51    /// Users should use the [`Constraints::empty`] or [`SqlToRel::new_constraint_from_table_constraints`] functions
52    /// for constructing [`Constraints`]. This constructor is for internal
53    /// purposes only and does not check whether the argument is valid. The user
54    /// is responsible for supplying a valid vector of [`Constraint`] objects.
55    ///
56    /// [`SqlToRel::new_constraint_from_table_constraints`]: https://docs.rs/datafusion/latest/datafusion/sql/planner/struct.SqlToRel.html#method.new_constraint_from_table_constraints
57    pub fn new_unverified(constraints: Vec<Constraint>) -> Self {
58        Self { inner: constraints }
59    }
60
61    /// Check whether constraints is empty
62    pub fn is_empty(&self) -> bool {
63        self.inner.is_empty()
64    }
65
66    /// Projects constraints using the given projection indices.
67    /// Returns None if any of the constraint columns are not included in the projection.
68    pub fn project(&self, proj_indices: &[usize]) -> Option<Self> {
69        let projected = self
70            .inner
71            .iter()
72            .filter_map(|constraint| {
73                match constraint {
74                    Constraint::PrimaryKey(indices) => {
75                        let new_indices =
76                            update_elements_with_matching_indices(indices, proj_indices);
77                        // Only keep constraint if all columns are preserved
78                        (new_indices.len() == indices.len())
79                            .then_some(Constraint::PrimaryKey(new_indices))
80                    }
81                    Constraint::Unique(indices) => {
82                        let new_indices =
83                            update_elements_with_matching_indices(indices, proj_indices);
84                        // Only keep constraint if all columns are preserved
85                        (new_indices.len() == indices.len())
86                            .then_some(Constraint::Unique(new_indices))
87                    }
88                }
89            })
90            .collect::<Vec<_>>();
91
92        (!projected.is_empty()).then_some(Constraints::new_unverified(projected))
93    }
94}
95
96impl Default for Constraints {
97    fn default() -> Self {
98        Constraints::empty()
99    }
100}
101
102impl IntoIterator for Constraints {
103    type Item = Constraint;
104    type IntoIter = IntoIter<Constraint>;
105
106    fn into_iter(self) -> Self::IntoIter {
107        self.inner.into_iter()
108    }
109}
110
111impl Display for Constraints {
112    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
113        let pk = self
114            .inner
115            .iter()
116            .map(|c| format!("{c:?}"))
117            .collect::<Vec<_>>();
118        let pk = pk.join(", ");
119        write!(f, "constraints=[{pk}]")
120    }
121}
122
123impl Deref for Constraints {
124    type Target = [Constraint];
125
126    fn deref(&self) -> &Self::Target {
127        self.inner.as_slice()
128    }
129}
130
131/// This object defines a functional dependence in the schema. A functional
132/// dependence defines a relationship between determinant keys and dependent
133/// columns. A determinant key is a column, or a set of columns, whose value
134/// uniquely determines values of some other (dependent) columns. If two rows
135/// have the same determinant key, dependent columns in these rows are
136/// necessarily the same. If the determinant key is unique, the set of
137/// dependent columns is equal to the entire schema and the determinant key can
138/// serve as a primary key. Note that a primary key may "downgrade" into a
139/// determinant key due to an operation such as a join, and this object is
140/// used to track dependence relationships in such cases. For more information
141/// on functional dependencies, see:
142/// <https://www.scaler.com/topics/dbms/functional-dependency-in-dbms/>
143#[derive(Debug, Clone, PartialEq, Eq)]
144pub struct FunctionalDependence {
145    // Column indices of the (possibly composite) determinant key:
146    pub source_indices: Vec<usize>,
147    // Column indices of dependent column(s):
148    pub target_indices: Vec<usize>,
149    /// Flag indicating whether one of the `source_indices` can receive NULL values.
150    /// For a data source, if the constraint in question is `Constraint::Unique`,
151    /// this flag is `true`. If the constraint in question is `Constraint::PrimaryKey`,
152    /// this flag is `false`.
153    /// Note that as the schema changes between different stages in a plan,
154    /// such as after LEFT JOIN or RIGHT JOIN operations, this property may
155    /// change.
156    pub nullable: bool,
157    // The functional dependency mode:
158    pub mode: Dependency,
159}
160
161/// Describes functional dependency mode.
162#[derive(Debug, Clone, Copy, PartialEq, Eq)]
163pub enum Dependency {
164    Single, // A determinant key may occur only once.
165    Multi,  // A determinant key may occur multiple times (in multiple rows).
166}
167
168impl FunctionalDependence {
169    // Creates a new functional dependence.
170    pub fn new(
171        source_indices: Vec<usize>,
172        target_indices: Vec<usize>,
173        nullable: bool,
174    ) -> Self {
175        Self {
176            source_indices,
177            target_indices,
178            nullable,
179            // Start with the least restrictive mode by default:
180            mode: Dependency::Multi,
181        }
182    }
183
184    pub fn with_mode(mut self, mode: Dependency) -> Self {
185        self.mode = mode;
186        self
187    }
188}
189
190/// This object encapsulates all functional dependencies in a given relation.
191#[derive(Debug, Clone, PartialEq, Eq)]
192pub struct FunctionalDependencies {
193    deps: Vec<FunctionalDependence>,
194}
195
196impl FunctionalDependencies {
197    /// Creates an empty `FunctionalDependencies` object.
198    pub fn empty() -> Self {
199        Self { deps: vec![] }
200    }
201
202    /// Creates a new `FunctionalDependencies` object from a vector of
203    /// `FunctionalDependence` objects.
204    pub fn new(dependencies: Vec<FunctionalDependence>) -> Self {
205        Self { deps: dependencies }
206    }
207
208    /// Creates a new `FunctionalDependencies` object from the given constraints.
209    pub fn new_from_constraints(
210        constraints: Option<&Constraints>,
211        n_field: usize,
212    ) -> Self {
213        if let Some(Constraints { inner: constraints }) = constraints {
214            // Construct dependency objects based on each individual constraint:
215            let dependencies = constraints
216                .iter()
217                .map(|constraint| {
218                    // All the field indices are associated with the whole table
219                    // since we are dealing with table level constraints:
220                    let dependency = match constraint {
221                        Constraint::PrimaryKey(indices) => FunctionalDependence::new(
222                            indices.to_vec(),
223                            (0..n_field).collect::<Vec<_>>(),
224                            false,
225                        ),
226                        Constraint::Unique(indices) => FunctionalDependence::new(
227                            indices.to_vec(),
228                            (0..n_field).collect::<Vec<_>>(),
229                            true,
230                        ),
231                    };
232                    // As primary keys are guaranteed to be unique, set the
233                    // functional dependency mode to `Dependency::Single`:
234                    dependency.with_mode(Dependency::Single)
235                })
236                .collect::<Vec<_>>();
237            Self::new(dependencies)
238        } else {
239            // There is no constraint, return an empty object:
240            Self::empty()
241        }
242    }
243
244    pub fn with_dependency(mut self, mode: Dependency) -> Self {
245        self.deps.iter_mut().for_each(|item| item.mode = mode);
246        self
247    }
248
249    /// Merges the given functional dependencies with these.
250    pub fn extend(&mut self, other: FunctionalDependencies) {
251        self.deps.extend(other.deps);
252    }
253
254    /// Sanity checks if functional dependencies are valid. For example, if
255    /// there are 10 fields, we cannot receive any index further than 9.
256    pub fn is_valid(&self, n_field: usize) -> bool {
257        self.deps.iter().all(
258            |FunctionalDependence {
259                 source_indices,
260                 target_indices,
261                 ..
262             }| {
263                source_indices
264                    .iter()
265                    .max()
266                    .map(|&max_index| max_index < n_field)
267                    .unwrap_or(true)
268                    && target_indices
269                        .iter()
270                        .max()
271                        .map(|&max_index| max_index < n_field)
272                        .unwrap_or(true)
273            },
274        )
275    }
276
277    /// Adds the `offset` value to `source_indices` and `target_indices` for
278    /// each functional dependency.
279    pub fn add_offset(&mut self, offset: usize) {
280        self.deps.iter_mut().for_each(
281            |FunctionalDependence {
282                 source_indices,
283                 target_indices,
284                 ..
285             }| {
286                *source_indices = add_offset_to_vec(source_indices, offset);
287                *target_indices = add_offset_to_vec(target_indices, offset);
288            },
289        )
290    }
291
292    /// Updates `source_indices` and `target_indices` of each functional
293    /// dependence using the index mapping given in `proj_indices`.
294    ///
295    /// Assume that `proj_indices` is \[2, 5, 8\] and we have a functional
296    /// dependence \[5\] (`source_indices`) -> \[5, 8\] (`target_indices`).
297    /// In the updated schema, fields at indices \[2, 5, 8\] will transform
298    /// to \[0, 1, 2\]. Therefore, the resulting functional dependence will
299    /// be \[1\] -> \[1, 2\].
300    pub fn project_functional_dependencies(
301        &self,
302        proj_indices: &[usize],
303        // The argument `n_out` denotes the schema field length, which is needed
304        // to correctly associate a `Single`-mode dependence with the whole table.
305        n_out: usize,
306    ) -> FunctionalDependencies {
307        let mut projected_func_dependencies = vec![];
308        for FunctionalDependence {
309            source_indices,
310            target_indices,
311            nullable,
312            mode,
313        } in &self.deps
314        {
315            let new_source_indices =
316                update_elements_with_matching_indices(source_indices, proj_indices);
317            let new_target_indices = if *mode == Dependency::Single {
318                // Associate with all of the fields in the schema:
319                (0..n_out).collect()
320            } else {
321                // Update associations according to projection:
322                update_elements_with_matching_indices(target_indices, proj_indices)
323            };
324            // All of the composite indices should still be valid after projection;
325            // otherwise, functional dependency cannot be propagated.
326            if new_source_indices.len() == source_indices.len() {
327                let new_func_dependence = FunctionalDependence::new(
328                    new_source_indices,
329                    new_target_indices,
330                    *nullable,
331                )
332                .with_mode(*mode);
333                projected_func_dependencies.push(new_func_dependence);
334            }
335        }
336        FunctionalDependencies::new(projected_func_dependencies)
337    }
338
339    /// This function joins this set of functional dependencies with the `other`
340    /// according to the given `join_type`.
341    pub fn join(
342        &self,
343        other: &FunctionalDependencies,
344        join_type: &JoinType,
345        left_cols_len: usize,
346    ) -> FunctionalDependencies {
347        // Get mutable copies of left and right side dependencies:
348        let mut right_func_dependencies = other.clone();
349        let mut left_func_dependencies = self.clone();
350
351        match join_type {
352            JoinType::Inner | JoinType::Left | JoinType::Right => {
353                // Add offset to right schema:
354                right_func_dependencies.add_offset(left_cols_len);
355
356                // Result may have multiple values, update the dependency mode:
357                left_func_dependencies =
358                    left_func_dependencies.with_dependency(Dependency::Multi);
359                right_func_dependencies =
360                    right_func_dependencies.with_dependency(Dependency::Multi);
361
362                if *join_type == JoinType::Left {
363                    // Downgrade the right side, since it may have additional NULL values:
364                    right_func_dependencies.downgrade_dependencies();
365                } else if *join_type == JoinType::Right {
366                    // Downgrade the left side, since it may have additional NULL values:
367                    left_func_dependencies.downgrade_dependencies();
368                }
369                // Combine left and right functional dependencies:
370                left_func_dependencies.extend(right_func_dependencies);
371                left_func_dependencies
372            }
373            JoinType::LeftSemi | JoinType::LeftAnti | JoinType::LeftMark => {
374                // These joins preserve functional dependencies of the left side:
375                left_func_dependencies
376            }
377            JoinType::RightSemi | JoinType::RightAnti => {
378                // These joins preserve functional dependencies of the right side:
379                right_func_dependencies
380            }
381            JoinType::Full => {
382                // All of the functional dependencies are lost in a FULL join:
383                FunctionalDependencies::empty()
384            }
385        }
386    }
387
388    /// This function downgrades a functional dependency when nullability becomes
389    /// a possibility:
390    /// - If the dependency in question is UNIQUE (i.e. nullable), a new null value
391    ///   invalidates the dependency.
392    /// - If the dependency in question is PRIMARY KEY (i.e. not nullable), a new
393    ///   null value turns it into UNIQUE mode.
394    fn downgrade_dependencies(&mut self) {
395        // Delete nullable dependencies, since they are no longer valid:
396        self.deps.retain(|item| !item.nullable);
397        self.deps.iter_mut().for_each(|item| item.nullable = true);
398    }
399
400    /// This function ensures that functional dependencies involving uniquely
401    /// occurring determinant keys cover their entire table in terms of
402    /// dependent columns.
403    pub fn extend_target_indices(&mut self, n_out: usize) {
404        self.deps.iter_mut().for_each(
405            |FunctionalDependence {
406                 mode,
407                 target_indices,
408                 ..
409             }| {
410                // If unique, cover the whole table:
411                if *mode == Dependency::Single {
412                    *target_indices = (0..n_out).collect::<Vec<_>>();
413                }
414            },
415        )
416    }
417}
418
419impl Deref for FunctionalDependencies {
420    type Target = [FunctionalDependence];
421
422    fn deref(&self) -> &Self::Target {
423        self.deps.as_slice()
424    }
425}
426
427/// Calculates functional dependencies for aggregate output, when there is a GROUP BY expression.
428pub fn aggregate_functional_dependencies(
429    aggr_input_schema: &DFSchema,
430    group_by_expr_names: &[String],
431    aggr_schema: &DFSchema,
432) -> FunctionalDependencies {
433    let mut aggregate_func_dependencies = vec![];
434    let aggr_input_fields = aggr_input_schema.field_names();
435    let aggr_fields = aggr_schema.fields();
436    // Association covers the whole table:
437    let target_indices = (0..aggr_schema.fields().len()).collect::<Vec<_>>();
438    // Get functional dependencies of the schema:
439    let func_dependencies = aggr_input_schema.functional_dependencies();
440    for FunctionalDependence {
441        source_indices,
442        nullable,
443        mode,
444        ..
445    } in &func_dependencies.deps
446    {
447        // Keep source indices in a `HashSet` to prevent duplicate entries:
448        let mut new_source_indices = vec![];
449        let mut new_source_field_names = vec![];
450        let source_field_names = source_indices
451            .iter()
452            .map(|&idx| &aggr_input_fields[idx])
453            .collect::<Vec<_>>();
454
455        for (idx, group_by_expr_name) in group_by_expr_names.iter().enumerate() {
456            // When one of the input determinant expressions matches with
457            // the GROUP BY expression, add the index of the GROUP BY
458            // expression as a new determinant key:
459            if source_field_names.contains(&group_by_expr_name) {
460                new_source_indices.push(idx);
461                new_source_field_names.push(group_by_expr_name.clone());
462            }
463        }
464        let existing_target_indices =
465            get_target_functional_dependencies(aggr_input_schema, group_by_expr_names);
466        let new_target_indices = get_target_functional_dependencies(
467            aggr_input_schema,
468            &new_source_field_names,
469        );
470        let mode = if existing_target_indices == new_target_indices
471            && new_target_indices.is_some()
472        {
473            // If dependency covers all GROUP BY expressions, mode will be `Single`:
474            Dependency::Single
475        } else {
476            // Otherwise, existing mode is preserved:
477            *mode
478        };
479        // All of the composite indices occur in the GROUP BY expression:
480        if new_source_indices.len() == source_indices.len() {
481            aggregate_func_dependencies.push(
482                FunctionalDependence::new(
483                    new_source_indices,
484                    target_indices.clone(),
485                    *nullable,
486                )
487                .with_mode(mode),
488            );
489        }
490    }
491
492    // When we have a GROUP BY key, we can guarantee uniqueness after
493    // aggregation:
494    if !group_by_expr_names.is_empty() {
495        let count = group_by_expr_names.len();
496        let source_indices = (0..count).collect::<Vec<_>>();
497        let nullable = source_indices
498            .iter()
499            .any(|idx| aggr_fields[*idx].is_nullable());
500        // If GROUP BY expressions do not already act as a determinant:
501        if !aggregate_func_dependencies.iter().any(|item| {
502            // If `item.source_indices` is a subset of GROUP BY expressions, we shouldn't add
503            // them since `item.source_indices` defines this relation already.
504
505            // The following simple comparison is working well because
506            // GROUP BY expressions come here as a prefix.
507            item.source_indices.iter().all(|idx| idx < &count)
508        }) {
509            // Add a new functional dependency associated with the whole table:
510            // Use nullable property of the GROUP BY expression:
511            aggregate_func_dependencies.push(
512                // Use nullable property of the GROUP BY expression:
513                FunctionalDependence::new(source_indices, target_indices, nullable)
514                    .with_mode(Dependency::Single),
515            );
516        }
517    }
518    FunctionalDependencies::new(aggregate_func_dependencies)
519}
520
521/// Returns target indices, for the determinant keys that are inside
522/// group by expressions.
523pub fn get_target_functional_dependencies(
524    schema: &DFSchema,
525    group_by_expr_names: &[String],
526) -> Option<Vec<usize>> {
527    let mut combined_target_indices = HashSet::new();
528    let dependencies = schema.functional_dependencies();
529    let field_names = schema.field_names();
530    for FunctionalDependence {
531        source_indices,
532        target_indices,
533        ..
534    } in &dependencies.deps
535    {
536        let source_key_names = source_indices
537            .iter()
538            .map(|id_key_idx| &field_names[*id_key_idx])
539            .collect::<Vec<_>>();
540        // If the GROUP BY expression contains a determinant key, we can use
541        // the associated fields after aggregation even if they are not part
542        // of the GROUP BY expression.
543        if source_key_names
544            .iter()
545            .all(|source_key_name| group_by_expr_names.contains(source_key_name))
546        {
547            combined_target_indices.extend(target_indices.iter());
548        }
549    }
550    (!combined_target_indices.is_empty()).then_some({
551        let mut result = combined_target_indices.into_iter().collect::<Vec<_>>();
552        result.sort();
553        result
554    })
555}
556
557/// Returns indices for the minimal subset of GROUP BY expressions that are
558/// functionally equivalent to the original set of GROUP BY expressions.
559pub fn get_required_group_by_exprs_indices(
560    schema: &DFSchema,
561    group_by_expr_names: &[String],
562) -> Option<Vec<usize>> {
563    let dependencies = schema.functional_dependencies();
564    let field_names = schema.field_names();
565    let mut groupby_expr_indices = group_by_expr_names
566        .iter()
567        .map(|group_by_expr_name| {
568            field_names
569                .iter()
570                .position(|field_name| field_name == group_by_expr_name)
571        })
572        .collect::<Option<Vec<_>>>()?;
573
574    groupby_expr_indices.sort();
575    for FunctionalDependence {
576        source_indices,
577        target_indices,
578        ..
579    } in &dependencies.deps
580    {
581        if source_indices
582            .iter()
583            .all(|source_idx| groupby_expr_indices.contains(source_idx))
584        {
585            // If all source indices are among GROUP BY expression indices, we
586            // can remove target indices from GROUP BY expression indices and
587            // use source indices instead.
588            groupby_expr_indices = set_difference(&groupby_expr_indices, target_indices);
589            groupby_expr_indices =
590                merge_and_order_indices(groupby_expr_indices, source_indices);
591        }
592    }
593    groupby_expr_indices
594        .iter()
595        .map(|idx| {
596            group_by_expr_names
597                .iter()
598                .position(|name| &field_names[*idx] == name)
599        })
600        .collect()
601}
602
603/// Updates entries inside the `entries` vector with their corresponding
604/// indices inside the `proj_indices` vector.
605fn update_elements_with_matching_indices(
606    entries: &[usize],
607    proj_indices: &[usize],
608) -> Vec<usize> {
609    entries
610        .iter()
611        .filter_map(|val| proj_indices.iter().position(|proj_idx| proj_idx == val))
612        .collect()
613}
614
615/// Adds `offset` value to each entry inside `in_data`.
616fn add_offset_to_vec<T: Copy + std::ops::Add<Output = T>>(
617    in_data: &[T],
618    offset: T,
619) -> Vec<T> {
620    in_data.iter().map(|&item| item + offset).collect()
621}
622
623#[cfg(test)]
624mod tests {
625    use super::*;
626
627    #[test]
628    fn constraints_iter() {
629        let constraints = Constraints::new_unverified(vec![
630            Constraint::PrimaryKey(vec![10]),
631            Constraint::Unique(vec![20]),
632        ]);
633        let mut iter = constraints.iter();
634        assert_eq!(iter.next(), Some(&Constraint::PrimaryKey(vec![10])));
635        assert_eq!(iter.next(), Some(&Constraint::Unique(vec![20])));
636        assert_eq!(iter.next(), None);
637    }
638
639    #[test]
640    fn test_project_constraints() {
641        let constraints = Constraints::new_unverified(vec![
642            Constraint::PrimaryKey(vec![1, 2]),
643            Constraint::Unique(vec![0, 3]),
644        ]);
645
646        // Project keeping columns 1,2,3
647        let projected = constraints.project(&[1, 2, 3]).unwrap();
648        assert_eq!(
649            projected,
650            Constraints::new_unverified(vec![Constraint::PrimaryKey(vec![0, 1])])
651        );
652
653        // Project keeping only column 0 - should return None as no constraints are preserved
654        assert!(constraints.project(&[0]).is_none());
655    }
656
657    #[test]
658    fn test_get_updated_id_keys() {
659        let fund_dependencies =
660            FunctionalDependencies::new(vec![FunctionalDependence::new(
661                vec![1],
662                vec![0, 1, 2],
663                true,
664            )]);
665        let res = fund_dependencies.project_functional_dependencies(&[1, 2], 2);
666        let expected = FunctionalDependencies::new(vec![FunctionalDependence::new(
667            vec![0],
668            vec![0, 1],
669            true,
670        )]);
671        assert_eq!(res, expected);
672    }
673}