1use super::{
19 ast::{
20 BuilderError, DerivedRelationBuilder, QueryBuilder, RelationBuilder,
21 SelectBuilder, TableRelationBuilder, TableWithJoinsBuilder,
22 },
23 rewrite::{
24 inject_column_aliases_into_subquery, normalize_union_schema,
25 rewrite_plan_for_sort_on_non_projected_fields,
26 subquery_alias_inner_query_and_columns, TableAliasRewriter,
27 },
28 utils::{
29 find_agg_node_within_select, find_unnest_node_within_select,
30 find_window_nodes_within_select, try_transform_to_simple_table_scan_with_filters,
31 unproject_sort_expr, unproject_unnest_expr, unproject_window_exprs,
32 },
33 Unparser,
34};
35use crate::unparser::ast::UnnestRelationBuilder;
36use crate::unparser::extension_unparser::{
37 UnparseToStatementResult, UnparseWithinStatementResult,
38};
39use crate::unparser::utils::{find_unnest_node_until_relation, unproject_agg_exprs};
40use crate::utils::UNNEST_PLACEHOLDER;
41use datafusion_common::{
42 internal_err, not_impl_err,
43 tree_node::{TransformedResult, TreeNode},
44 Column, DataFusionError, Result, ScalarValue, TableReference,
45};
46use datafusion_expr::expr::OUTER_REFERENCE_COLUMN_PREFIX;
47use datafusion_expr::{
48 expr::Alias, BinaryExpr, Distinct, Expr, JoinConstraint, JoinType, LogicalPlan,
49 LogicalPlanBuilder, Operator, Projection, SortExpr, TableScan, Unnest,
50 UserDefinedLogicalNode,
51};
52use sqlparser::ast::{self, Ident, OrderByKind, SetExpr, TableAliasColumnDef};
53use std::sync::Arc;
54
55pub fn plan_to_sql(plan: &LogicalPlan) -> Result<ast::Statement> {
92 let unparser = Unparser::default();
93 unparser.plan_to_sql(plan)
94}
95
96impl Unparser<'_> {
97 pub fn plan_to_sql(&self, plan: &LogicalPlan) -> Result<ast::Statement> {
98 let plan = normalize_union_schema(plan)?;
99
100 match plan {
101 LogicalPlan::Projection(_)
102 | LogicalPlan::Filter(_)
103 | LogicalPlan::Window(_)
104 | LogicalPlan::Aggregate(_)
105 | LogicalPlan::Sort(_)
106 | LogicalPlan::Join(_)
107 | LogicalPlan::Repartition(_)
108 | LogicalPlan::Union(_)
109 | LogicalPlan::TableScan(_)
110 | LogicalPlan::EmptyRelation(_)
111 | LogicalPlan::Subquery(_)
112 | LogicalPlan::SubqueryAlias(_)
113 | LogicalPlan::Limit(_)
114 | LogicalPlan::Statement(_)
115 | LogicalPlan::Values(_)
116 | LogicalPlan::Distinct(_) => self.select_to_sql_statement(&plan),
117 LogicalPlan::Dml(_) => self.dml_to_sql(&plan),
118 LogicalPlan::Extension(extension) => {
119 self.extension_to_statement(extension.node.as_ref())
120 }
121 LogicalPlan::Explain(_)
122 | LogicalPlan::Analyze(_)
123 | LogicalPlan::Ddl(_)
124 | LogicalPlan::Copy(_)
125 | LogicalPlan::DescribeTable(_)
126 | LogicalPlan::RecursiveQuery(_)
127 | LogicalPlan::Unnest(_) => not_impl_err!("Unsupported plan: {plan:?}"),
128 }
129 }
130
131 fn extension_to_statement(
135 &self,
136 node: &dyn UserDefinedLogicalNode,
137 ) -> Result<ast::Statement> {
138 let mut statement = None;
139 for unparser in &self.extension_unparsers {
140 match unparser.unparse_to_statement(node, self)? {
141 UnparseToStatementResult::Modified(stmt) => {
142 statement = Some(stmt);
143 break;
144 }
145 UnparseToStatementResult::Unmodified => {}
146 }
147 }
148 if let Some(statement) = statement {
149 Ok(statement)
150 } else {
151 not_impl_err!("Unsupported extension node: {node:?}")
152 }
153 }
154
155 fn extension_to_sql(
159 &self,
160 node: &dyn UserDefinedLogicalNode,
161 query: &mut Option<&mut QueryBuilder>,
162 select: &mut Option<&mut SelectBuilder>,
163 relation: &mut Option<&mut RelationBuilder>,
164 ) -> Result<()> {
165 for unparser in &self.extension_unparsers {
166 match unparser.unparse(node, self, query, select, relation)? {
167 UnparseWithinStatementResult::Modified => return Ok(()),
168 UnparseWithinStatementResult::Unmodified => {}
169 }
170 }
171 not_impl_err!("Unsupported extension node: {node:?}")
172 }
173
174 fn select_to_sql_statement(&self, plan: &LogicalPlan) -> Result<ast::Statement> {
175 let mut query_builder = Some(QueryBuilder::default());
176
177 let body = self.select_to_sql_expr(plan, &mut query_builder)?;
178
179 let query = query_builder.unwrap().body(Box::new(body)).build()?;
180
181 Ok(ast::Statement::Query(Box::new(query)))
182 }
183
184 fn select_to_sql_expr(
185 &self,
186 plan: &LogicalPlan,
187 query: &mut Option<QueryBuilder>,
188 ) -> Result<SetExpr> {
189 let mut select_builder = SelectBuilder::default();
190 select_builder.push_from(TableWithJoinsBuilder::default());
191 let mut relation_builder = RelationBuilder::default();
192 self.select_to_sql_recursively(
193 plan,
194 query,
195 &mut select_builder,
196 &mut relation_builder,
197 )?;
198
199 if let Some(body) = query.as_mut().and_then(|q| q.take_body()) {
201 return Ok(*body);
202 }
203
204 if !select_builder.already_projected() {
207 select_builder.projection(vec![ast::SelectItem::Wildcard(
208 ast::WildcardAdditionalOptions::default(),
209 )]);
210 }
211
212 let mut twj = select_builder.pop_from().unwrap();
213 twj.relation(relation_builder);
214 select_builder.push_from(twj);
215
216 Ok(SetExpr::Select(Box::new(select_builder.build()?)))
217 }
218
219 fn reconstruct_select_statement(
223 &self,
224 plan: &LogicalPlan,
225 p: &Projection,
226 select: &mut SelectBuilder,
227 ) -> Result<()> {
228 let mut exprs = p.expr.clone();
229
230 if let Some(unnest) = find_unnest_node_within_select(plan) {
232 exprs = exprs
233 .into_iter()
234 .map(|e| unproject_unnest_expr(e, unnest))
235 .collect::<Result<Vec<_>>>()?;
236 };
237
238 match (
239 find_agg_node_within_select(plan, true),
240 find_window_nodes_within_select(plan, None, true),
241 ) {
242 (Some(agg), window) => {
243 let window_option = window.as_deref();
244 let items = exprs
245 .into_iter()
246 .map(|proj_expr| {
247 let unproj = unproject_agg_exprs(proj_expr, agg, window_option)?;
248 self.select_item_to_sql(&unproj)
249 })
250 .collect::<Result<Vec<_>>>()?;
251
252 select.projection(items);
253 select.group_by(ast::GroupByExpr::Expressions(
254 agg.group_expr
255 .iter()
256 .map(|expr| self.expr_to_sql(expr))
257 .collect::<Result<Vec<_>>>()?,
258 vec![],
259 ));
260 }
261 (None, Some(window)) => {
262 let items = exprs
263 .into_iter()
264 .map(|proj_expr| {
265 let unproj = unproject_window_exprs(proj_expr, &window)?;
266 self.select_item_to_sql(&unproj)
267 })
268 .collect::<Result<Vec<_>>>()?;
269
270 select.projection(items);
271 }
272 _ => {
273 let items = exprs
274 .iter()
275 .map(|e| self.select_item_to_sql(e))
276 .collect::<Result<Vec<_>>>()?;
277 select.projection(items);
278 }
279 }
280 Ok(())
281 }
282
283 fn derive(
284 &self,
285 plan: &LogicalPlan,
286 relation: &mut RelationBuilder,
287 alias: Option<ast::TableAlias>,
288 lateral: bool,
289 ) -> Result<()> {
290 let mut derived_builder = DerivedRelationBuilder::default();
291 derived_builder.lateral(lateral).alias(alias).subquery({
292 let inner_statement = self.plan_to_sql(plan)?;
293 if let ast::Statement::Query(inner_query) = inner_statement {
294 inner_query
295 } else {
296 return internal_err!(
297 "Subquery must be a Query, but found {inner_statement:?}"
298 );
299 }
300 });
301 relation.derived(derived_builder);
302
303 Ok(())
304 }
305
306 fn derive_with_dialect_alias(
307 &self,
308 alias: &str,
309 plan: &LogicalPlan,
310 relation: &mut RelationBuilder,
311 lateral: bool,
312 ) -> Result<()> {
313 if self.dialect.requires_derived_table_alias() {
314 self.derive(
315 plan,
316 relation,
317 Some(self.new_table_alias(alias.to_string(), vec![])),
318 lateral,
319 )
320 } else {
321 self.derive(plan, relation, None, lateral)
322 }
323 }
324
325 #[cfg_attr(feature = "recursive_protection", recursive::recursive)]
326 fn select_to_sql_recursively(
327 &self,
328 plan: &LogicalPlan,
329 query: &mut Option<QueryBuilder>,
330 select: &mut SelectBuilder,
331 relation: &mut RelationBuilder,
332 ) -> Result<()> {
333 match plan {
334 LogicalPlan::TableScan(scan) => {
335 if let Some(unparsed_table_scan) = Self::unparse_table_scan_pushdown(
336 plan,
337 None,
338 select.already_projected(),
339 )? {
340 return self.select_to_sql_recursively(
341 &unparsed_table_scan,
342 query,
343 select,
344 relation,
345 );
346 }
347 let mut builder = TableRelationBuilder::default();
348 let mut table_parts = vec![];
349 if let Some(catalog_name) = scan.table_name.catalog() {
350 table_parts
351 .push(self.new_ident_quoted_if_needs(catalog_name.to_string()));
352 }
353 if let Some(schema_name) = scan.table_name.schema() {
354 table_parts
355 .push(self.new_ident_quoted_if_needs(schema_name.to_string()));
356 }
357 table_parts.push(
358 self.new_ident_quoted_if_needs(scan.table_name.table().to_string()),
359 );
360 builder.name(ast::ObjectName::from(table_parts));
361 relation.table(builder);
362
363 Ok(())
364 }
365 LogicalPlan::Projection(p) => {
366 if let Some(new_plan) = rewrite_plan_for_sort_on_non_projected_fields(p) {
367 return self
368 .select_to_sql_recursively(&new_plan, query, select, relation);
369 }
370
371 let unnest_input_type = if p.expr.len() == 1 {
375 Self::check_unnest_placeholder_with_outer_ref(&p.expr[0])
376 } else {
377 None
378 };
379 if self.dialect.unnest_as_table_factor() && unnest_input_type.is_some() {
380 if let LogicalPlan::Unnest(unnest) = &p.input.as_ref() {
381 if let Some(unnest_relation) =
382 self.try_unnest_to_table_factor_sql(unnest)?
383 {
384 relation.unnest(unnest_relation);
385 return self.select_to_sql_recursively(
386 p.input.as_ref(),
387 query,
388 select,
389 relation,
390 );
391 }
392 }
393 }
394
395 if select.already_projected() {
397 return self.derive_with_dialect_alias(
398 "derived_projection",
399 plan,
400 relation,
401 unnest_input_type
402 .filter(|t| matches!(t, UnnestInputType::OuterReference))
403 .is_some(),
404 );
405 }
406 self.reconstruct_select_statement(plan, p, select)?;
407 self.select_to_sql_recursively(p.input.as_ref(), query, select, relation)
408 }
409 LogicalPlan::Filter(filter) => {
410 if let Some(agg) =
411 find_agg_node_within_select(plan, select.already_projected())
412 {
413 let unprojected =
414 unproject_agg_exprs(filter.predicate.clone(), agg, None)?;
415 let filter_expr = self.expr_to_sql(&unprojected)?;
416 select.having(Some(filter_expr));
417 } else {
418 let filter_expr = self.expr_to_sql(&filter.predicate)?;
419 select.selection(Some(filter_expr));
420 }
421
422 self.select_to_sql_recursively(
423 filter.input.as_ref(),
424 query,
425 select,
426 relation,
427 )
428 }
429 LogicalPlan::Limit(limit) => {
430 if select.already_projected() {
432 return self.derive_with_dialect_alias(
433 "derived_limit",
434 plan,
435 relation,
436 false,
437 );
438 }
439 if let Some(fetch) = &limit.fetch {
440 let Some(query) = query.as_mut() else {
441 return internal_err!(
442 "Limit operator only valid in a statement context."
443 );
444 };
445 query.limit(Some(self.expr_to_sql(fetch)?));
446 }
447
448 if let Some(skip) = &limit.skip {
449 let Some(query) = query.as_mut() else {
450 return internal_err!(
451 "Offset operator only valid in a statement context."
452 );
453 };
454 query.offset(Some(ast::Offset {
455 rows: ast::OffsetRows::None,
456 value: self.expr_to_sql(skip)?,
457 }));
458 }
459
460 self.select_to_sql_recursively(
461 limit.input.as_ref(),
462 query,
463 select,
464 relation,
465 )
466 }
467 LogicalPlan::Sort(sort) => {
468 if select.already_projected() {
470 return self.derive_with_dialect_alias(
471 "derived_sort",
472 plan,
473 relation,
474 false,
475 );
476 }
477 let Some(query_ref) = query else {
478 return internal_err!(
479 "Sort operator only valid in a statement context."
480 );
481 };
482
483 if let Some(fetch) = sort.fetch {
484 query_ref.limit(Some(ast::Expr::value(ast::Value::Number(
485 fetch.to_string(),
486 false,
487 ))));
488 };
489
490 let agg = find_agg_node_within_select(plan, select.already_projected());
491 let sort_exprs: Vec<SortExpr> = sort
493 .expr
494 .iter()
495 .map(|sort_expr| {
496 unproject_sort_expr(sort_expr, agg, sort.input.as_ref())
497 })
498 .collect::<Result<Vec<_>>>()?;
499
500 query_ref.order_by(self.sorts_to_sql(&sort_exprs)?);
501
502 self.select_to_sql_recursively(
503 sort.input.as_ref(),
504 query,
505 select,
506 relation,
507 )
508 }
509 LogicalPlan::Aggregate(agg) => {
510 if !select.already_projected() {
512 let exprs: Vec<_> = agg
515 .aggr_expr
516 .iter()
517 .chain(agg.group_expr.iter())
518 .map(|expr| self.select_item_to_sql(expr))
519 .collect::<Result<Vec<_>>>()?;
520 select.projection(exprs);
521
522 select.group_by(ast::GroupByExpr::Expressions(
523 agg.group_expr
524 .iter()
525 .map(|expr| self.expr_to_sql(expr))
526 .collect::<Result<Vec<_>>>()?,
527 vec![],
528 ));
529 }
530
531 self.select_to_sql_recursively(
532 agg.input.as_ref(),
533 query,
534 select,
535 relation,
536 )
537 }
538 LogicalPlan::Distinct(distinct) => {
539 if select.already_projected() {
541 return self.derive_with_dialect_alias(
542 "derived_distinct",
543 plan,
544 relation,
545 false,
546 );
547 }
548 let (select_distinct, input) = match distinct {
549 Distinct::All(input) => (ast::Distinct::Distinct, input.as_ref()),
550 Distinct::On(on) => {
551 let exprs = on
552 .on_expr
553 .iter()
554 .map(|e| self.expr_to_sql(e))
555 .collect::<Result<Vec<_>>>()?;
556 let items = on
557 .select_expr
558 .iter()
559 .map(|e| self.select_item_to_sql(e))
560 .collect::<Result<Vec<_>>>()?;
561 if let Some(sort_expr) = &on.sort_expr {
562 if let Some(query_ref) = query {
563 query_ref.order_by(self.sorts_to_sql(sort_expr)?);
564 } else {
565 return internal_err!(
566 "Sort operator only valid in a statement context."
567 );
568 }
569 }
570 select.projection(items);
571 (ast::Distinct::On(exprs), on.input.as_ref())
572 }
573 };
574 select.distinct(Some(select_distinct));
575 self.select_to_sql_recursively(input, query, select, relation)
576 }
577 LogicalPlan::Join(join) => {
578 let mut table_scan_filters = vec![];
579 let (left_plan, right_plan) = match join.join_type {
580 JoinType::RightSemi | JoinType::RightAnti => {
581 (&join.right, &join.left)
582 }
583 _ => (&join.left, &join.right),
584 };
585 let already_projected = select.already_projected();
589
590 let left_plan =
591 match try_transform_to_simple_table_scan_with_filters(left_plan)? {
592 Some((plan, filters)) => {
593 table_scan_filters.extend(filters);
594 Arc::new(plan)
595 }
596 None => Arc::clone(left_plan),
597 };
598
599 self.select_to_sql_recursively(
600 left_plan.as_ref(),
601 query,
602 select,
603 relation,
604 )?;
605
606 let left_projection: Option<Vec<ast::SelectItem>> = if !already_projected
607 {
608 Some(select.pop_projections())
609 } else {
610 None
611 };
612
613 let right_plan =
614 match try_transform_to_simple_table_scan_with_filters(right_plan)? {
615 Some((plan, filters)) => {
616 table_scan_filters.extend(filters);
617 Arc::new(plan)
618 }
619 None => Arc::clone(right_plan),
620 };
621
622 let mut right_relation = RelationBuilder::default();
623
624 self.select_to_sql_recursively(
625 right_plan.as_ref(),
626 query,
627 select,
628 &mut right_relation,
629 )?;
630
631 let join_filters = if table_scan_filters.is_empty() {
632 join.filter.clone()
633 } else {
634 let Some(combined_filters) =
636 table_scan_filters.into_iter().reduce(|acc, filter| {
637 Expr::BinaryExpr(BinaryExpr {
638 left: Box::new(acc),
639 op: Operator::And,
640 right: Box::new(filter),
641 })
642 })
643 else {
644 return internal_err!("Failed to combine TableScan filters");
645 };
646
647 match &join.filter {
649 Some(filter) => Some(Expr::BinaryExpr(BinaryExpr {
650 left: Box::new(filter.clone()),
651 op: Operator::And,
652 right: Box::new(combined_filters),
653 })),
654 None => Some(combined_filters),
655 }
656 };
657
658 let join_constraint = self.join_constraint_to_sql(
659 join.join_constraint,
660 &join.on,
661 join_filters.as_ref(),
662 )?;
663
664 self.select_to_sql_recursively(
665 right_plan.as_ref(),
666 query,
667 select,
668 &mut right_relation,
669 )?;
670
671 let right_projection: Option<Vec<ast::SelectItem>> = if !already_projected
672 {
673 Some(select.pop_projections())
674 } else {
675 None
676 };
677
678 match join.join_type {
679 JoinType::LeftSemi
680 | JoinType::LeftAnti
681 | JoinType::LeftMark
682 | JoinType::RightSemi
683 | JoinType::RightAnti => {
684 let mut query_builder = QueryBuilder::default();
685 let mut from = TableWithJoinsBuilder::default();
686 let mut exists_select: SelectBuilder = SelectBuilder::default();
687 from.relation(right_relation);
688 exists_select.push_from(from);
689 if let Some(filter) = &join.filter {
690 exists_select.selection(Some(self.expr_to_sql(filter)?));
691 }
692 for (left, right) in &join.on {
693 exists_select.selection(Some(
694 self.expr_to_sql(&left.clone().eq(right.clone()))?,
695 ));
696 }
697 exists_select.projection(vec![ast::SelectItem::UnnamedExpr(
698 ast::Expr::value(ast::Value::Number("1".to_string(), false)),
699 )]);
700 query_builder.body(Box::new(SetExpr::Select(Box::new(
701 exists_select.build()?,
702 ))));
703
704 let negated = match join.join_type {
705 JoinType::LeftSemi
706 | JoinType::RightSemi
707 | JoinType::LeftMark => false,
708 JoinType::LeftAnti | JoinType::RightAnti => true,
709 _ => unreachable!(),
710 };
711 let exists_expr = ast::Expr::Exists {
712 subquery: Box::new(query_builder.build()?),
713 negated,
714 };
715 if join.join_type == JoinType::LeftMark {
716 let (table_ref, _) = right_plan.schema().qualified_field(0);
717 let column = self
718 .col_to_sql(&Column::new(table_ref.cloned(), "mark"))?;
719 select.replace_mark(&column, &exists_expr);
720 } else {
721 select.selection(Some(exists_expr));
722 }
723 if let Some(projection) = left_projection {
724 select.projection(projection);
725 }
726 }
727 JoinType::Inner
728 | JoinType::Left
729 | JoinType::Right
730 | JoinType::Full => {
731 let Ok(Some(relation)) = right_relation.build() else {
732 return internal_err!("Failed to build right relation");
733 };
734 let ast_join = ast::Join {
735 relation,
736 global: false,
737 join_operator: self
738 .join_operator_to_sql(join.join_type, join_constraint)?,
739 };
740 let mut from = select.pop_from().unwrap();
741 from.push_join(ast_join);
742 select.push_from(from);
743 if !already_projected {
744 let Some(left_projection) = left_projection else {
745 return internal_err!("Left projection is missing");
746 };
747
748 let Some(right_projection) = right_projection else {
749 return internal_err!("Right projection is missing");
750 };
751
752 let projection = left_projection
753 .into_iter()
754 .chain(right_projection.into_iter())
755 .collect();
756 select.projection(projection);
757 }
758 }
759 };
760
761 Ok(())
762 }
763 LogicalPlan::SubqueryAlias(plan_alias) => {
764 let (plan, mut columns) =
765 subquery_alias_inner_query_and_columns(plan_alias);
766 let unparsed_table_scan = Self::unparse_table_scan_pushdown(
767 plan,
768 Some(plan_alias.alias.clone()),
769 select.already_projected(),
770 )?;
771 if !select.already_projected() && unparsed_table_scan.is_none() {
774 select.projection(vec![ast::SelectItem::Wildcard(
775 ast::WildcardAdditionalOptions::default(),
776 )]);
777 }
778 let plan = unparsed_table_scan.unwrap_or_else(|| plan.clone());
779 if !columns.is_empty()
780 && !self.dialect.supports_column_alias_in_table_alias()
781 {
782 let rewritten_plan =
784 match inject_column_aliases_into_subquery(plan, columns) {
785 Ok(p) => p,
786 Err(e) => {
787 return internal_err!(
788 "Failed to transform SubqueryAlias plan: {e}"
789 )
790 }
791 };
792
793 columns = vec![];
794
795 self.select_to_sql_recursively(
796 &rewritten_plan,
797 query,
798 select,
799 relation,
800 )?;
801 } else {
802 self.select_to_sql_recursively(&plan, query, select, relation)?;
803 }
804
805 relation.alias(Some(
806 self.new_table_alias(plan_alias.alias.table().to_string(), columns),
807 ));
808
809 Ok(())
810 }
811 LogicalPlan::Union(union) => {
812 if select.already_projected() {
814 return self.derive_with_dialect_alias(
815 "derived_union",
816 plan,
817 relation,
818 false,
819 );
820 }
821
822 let input_exprs: Vec<SetExpr> = union
823 .inputs
824 .iter()
825 .map(|input| self.select_to_sql_expr(input, query))
826 .collect::<Result<Vec<_>>>()?;
827
828 if input_exprs.len() < 2 {
829 return internal_err!("UNION operator requires at least 2 inputs");
830 }
831
832 let union_expr = input_exprs
835 .into_iter()
836 .rev()
837 .reduce(|a, b| SetExpr::SetOperation {
838 op: ast::SetOperator::Union,
839 set_quantifier: ast::SetQuantifier::All,
840 left: Box::new(b),
841 right: Box::new(a),
842 })
843 .unwrap();
844
845 let Some(query) = query.as_mut() else {
846 return internal_err!(
847 "UNION ALL operator only valid in a statement context"
848 );
849 };
850 query.body(Box::new(union_expr));
851
852 Ok(())
853 }
854 LogicalPlan::Window(window) => {
855 self.select_to_sql_recursively(
857 window.input.as_ref(),
858 query,
859 select,
860 relation,
861 )
862 }
863 LogicalPlan::EmptyRelation(_) => {
864 if !relation.has_relation() {
867 relation.empty();
868 }
869 Ok(())
870 }
871 LogicalPlan::Extension(extension) => {
872 if let Some(query) = query.as_mut() {
873 self.extension_to_sql(
874 extension.node.as_ref(),
875 &mut Some(query),
876 &mut Some(select),
877 &mut Some(relation),
878 )
879 } else {
880 self.extension_to_sql(
881 extension.node.as_ref(),
882 &mut None,
883 &mut Some(select),
884 &mut Some(relation),
885 )
886 }
887 }
888 LogicalPlan::Unnest(unnest) => {
889 if !unnest.struct_type_columns.is_empty() {
890 return internal_err!(
891 "Struct type columns are not currently supported in UNNEST: {:?}",
892 unnest.struct_type_columns
893 );
894 }
895
896 if let LogicalPlan::Projection(p) = unnest.input.as_ref() {
904 self.select_to_sql_recursively(&p.input, query, select, relation)
906 } else {
907 internal_err!("Unnest input is not a Projection: {unnest:?}")
908 }
909 }
910 LogicalPlan::Subquery(subquery)
911 if find_unnest_node_until_relation(subquery.subquery.as_ref())
912 .is_some() =>
913 {
914 if self.dialect.unnest_as_table_factor() {
915 self.select_to_sql_recursively(
916 subquery.subquery.as_ref(),
917 query,
918 select,
919 relation,
920 )
921 } else {
922 self.derive_with_dialect_alias(
923 "derived_unnest",
924 subquery.subquery.as_ref(),
925 relation,
926 true,
927 )
928 }
929 }
930 _ => {
931 not_impl_err!("Unsupported operator: {plan:?}")
932 }
933 }
934 }
935
936 fn check_unnest_placeholder_with_outer_ref(expr: &Expr) -> Option<UnnestInputType> {
946 if let Expr::Alias(Alias { expr, .. }) = expr {
947 if let Expr::Column(Column { name, .. }) = expr.as_ref() {
948 if let Some(prefix) = name.strip_prefix(UNNEST_PLACEHOLDER) {
949 if prefix.starts_with(&format!("({}(", OUTER_REFERENCE_COLUMN_PREFIX))
950 {
951 return Some(UnnestInputType::OuterReference);
952 }
953 return Some(UnnestInputType::Scalar);
954 }
955 }
956 }
957 None
958 }
959
960 fn try_unnest_to_table_factor_sql(
961 &self,
962 unnest: &Unnest,
963 ) -> Result<Option<UnnestRelationBuilder>> {
964 let mut unnest_relation = UnnestRelationBuilder::default();
965 let LogicalPlan::Projection(projection) = unnest.input.as_ref() else {
966 return Ok(None);
967 };
968
969 if !matches!(projection.input.as_ref(), LogicalPlan::EmptyRelation(_)) {
970 return Ok(None);
978 };
979
980 let exprs = projection
981 .expr
982 .iter()
983 .map(|e| self.expr_to_sql(e))
984 .collect::<Result<Vec<_>>>()?;
985 unnest_relation.array_exprs(exprs);
986
987 Ok(Some(unnest_relation))
988 }
989
990 fn is_scan_with_pushdown(scan: &TableScan) -> bool {
991 scan.projection.is_some() || !scan.filters.is_empty() || scan.fetch.is_some()
992 }
993
994 fn unparse_table_scan_pushdown(
997 plan: &LogicalPlan,
998 alias: Option<TableReference>,
999 already_projected: bool,
1000 ) -> Result<Option<LogicalPlan>> {
1001 match plan {
1002 LogicalPlan::TableScan(table_scan) => {
1003 if !Self::is_scan_with_pushdown(table_scan) {
1004 return Ok(None);
1005 }
1006 let table_schema = table_scan.source.schema();
1007 let mut filter_alias_rewriter =
1008 alias.as_ref().map(|alias_name| TableAliasRewriter {
1009 table_schema: &table_schema,
1010 alias_name: alias_name.clone(),
1011 });
1012
1013 let mut builder = LogicalPlanBuilder::scan(
1014 table_scan.table_name.clone(),
1015 Arc::clone(&table_scan.source),
1016 None,
1017 )?;
1018 if let Some(ref alias) = alias {
1024 if table_scan.projection.is_some() || !table_scan.filters.is_empty() {
1025 builder = builder.alias(alias.clone())?;
1026 }
1027 }
1028
1029 if !already_projected {
1033 if let Some(project_vec) = &table_scan.projection {
1034 if project_vec.is_empty() {
1035 builder = builder.project(vec![Expr::Literal(
1036 ScalarValue::Int64(Some(1)),
1037 )])?;
1038 } else {
1039 let project_columns = project_vec
1040 .iter()
1041 .cloned()
1042 .map(|i| {
1043 let schema = table_scan.source.schema();
1044 let field = schema.field(i);
1045 if alias.is_some() {
1046 Column::new(alias.clone(), field.name().clone())
1047 } else {
1048 Column::new(
1049 Some(table_scan.table_name.clone()),
1050 field.name().clone(),
1051 )
1052 }
1053 })
1054 .collect::<Vec<_>>();
1055 builder = builder.project(project_columns)?;
1056 };
1057 }
1058 }
1059
1060 let filter_expr: Result<Option<Expr>> = table_scan
1061 .filters
1062 .iter()
1063 .cloned()
1064 .map(|expr| {
1065 if let Some(ref mut rewriter) = filter_alias_rewriter {
1066 expr.rewrite(rewriter).data()
1067 } else {
1068 Ok(expr)
1069 }
1070 })
1071 .reduce(|acc, expr_result| {
1072 acc.and_then(|acc_expr| {
1073 expr_result.map(|expr| acc_expr.and(expr))
1074 })
1075 })
1076 .transpose();
1077
1078 if let Some(filter) = filter_expr? {
1079 builder = builder.filter(filter)?;
1080 }
1081
1082 if let Some(fetch) = table_scan.fetch {
1083 builder = builder.limit(0, Some(fetch))?;
1084 }
1085
1086 if let Some(alias) = alias {
1091 if table_scan.projection.is_none() && table_scan.filters.is_empty() {
1092 builder = builder.alias(alias)?;
1093 }
1094 }
1095
1096 Ok(Some(builder.build()?))
1097 }
1098 LogicalPlan::SubqueryAlias(subquery_alias) => {
1099 let ret = Self::unparse_table_scan_pushdown(
1100 &subquery_alias.input,
1101 Some(subquery_alias.alias.clone()),
1102 already_projected,
1103 )?;
1104 if let Some(alias) = alias {
1105 if let Some(plan) = ret {
1106 let plan = LogicalPlanBuilder::new(plan).alias(alias)?.build()?;
1107 return Ok(Some(plan));
1108 }
1109 }
1110 Ok(ret)
1111 }
1112 LogicalPlan::Projection(projection) => {
1115 if let Some(plan) = Self::unparse_table_scan_pushdown(
1116 &projection.input,
1117 alias.clone(),
1118 already_projected,
1119 )? {
1120 let exprs = if alias.is_some() {
1121 let mut alias_rewriter =
1122 alias.as_ref().map(|alias_name| TableAliasRewriter {
1123 table_schema: plan.schema().as_arrow(),
1124 alias_name: alias_name.clone(),
1125 });
1126 projection
1127 .expr
1128 .iter()
1129 .cloned()
1130 .map(|expr| {
1131 if let Some(ref mut rewriter) = alias_rewriter {
1132 expr.rewrite(rewriter).data()
1133 } else {
1134 Ok(expr)
1135 }
1136 })
1137 .collect::<Result<Vec<_>>>()?
1138 } else {
1139 projection.expr.clone()
1140 };
1141 Ok(Some(
1142 LogicalPlanBuilder::from(plan).project(exprs)?.build()?,
1143 ))
1144 } else {
1145 Ok(None)
1146 }
1147 }
1148 _ => Ok(None),
1149 }
1150 }
1151
1152 fn select_item_to_sql(&self, expr: &Expr) -> Result<ast::SelectItem> {
1153 match expr {
1154 Expr::Alias(Alias { expr, name, .. }) => {
1155 let inner = self.expr_to_sql(expr)?;
1156
1157 Ok(ast::SelectItem::ExprWithAlias {
1158 expr: inner,
1159 alias: self.new_ident_quoted_if_needs(name.to_string()),
1160 })
1161 }
1162 _ => {
1163 let inner = self.expr_to_sql(expr)?;
1164
1165 Ok(ast::SelectItem::UnnamedExpr(inner))
1166 }
1167 }
1168 }
1169
1170 fn sorts_to_sql(&self, sort_exprs: &[SortExpr]) -> Result<OrderByKind> {
1171 Ok(OrderByKind::Expressions(
1172 sort_exprs
1173 .iter()
1174 .map(|sort_expr| self.sort_to_sql(sort_expr))
1175 .collect::<Result<Vec<_>>>()?,
1176 ))
1177 }
1178
1179 fn join_operator_to_sql(
1180 &self,
1181 join_type: JoinType,
1182 constraint: ast::JoinConstraint,
1183 ) -> Result<ast::JoinOperator> {
1184 Ok(match join_type {
1185 JoinType::Inner => match &constraint {
1186 ast::JoinConstraint::On(_)
1187 | ast::JoinConstraint::Using(_)
1188 | ast::JoinConstraint::Natural => ast::JoinOperator::Inner(constraint),
1189 ast::JoinConstraint::None => {
1190 ast::JoinOperator::CrossJoin
1193 }
1194 },
1195 JoinType::Left => ast::JoinOperator::LeftOuter(constraint),
1196 JoinType::Right => ast::JoinOperator::RightOuter(constraint),
1197 JoinType::Full => ast::JoinOperator::FullOuter(constraint),
1198 JoinType::LeftAnti => ast::JoinOperator::LeftAnti(constraint),
1199 JoinType::LeftSemi => ast::JoinOperator::LeftSemi(constraint),
1200 JoinType::RightAnti => ast::JoinOperator::RightAnti(constraint),
1201 JoinType::RightSemi => ast::JoinOperator::RightSemi(constraint),
1202 JoinType::LeftMark => unimplemented!("Unparsing of Left Mark join type"),
1203 })
1204 }
1205
1206 fn join_using_to_sql(
1210 &self,
1211 join_conditions: &[(Expr, Expr)],
1212 ) -> Option<ast::JoinConstraint> {
1213 let mut object_names = Vec::with_capacity(join_conditions.len());
1214 for (left, right) in join_conditions {
1215 match (left, right) {
1216 (
1217 Expr::Column(Column {
1218 relation: _,
1219 name: left_name,
1220 spans: _,
1221 }),
1222 Expr::Column(Column {
1223 relation: _,
1224 name: right_name,
1225 spans: _,
1226 }),
1227 ) if left_name == right_name => {
1228 let ident = self.new_ident_quoted_if_needs(left_name.to_string());
1232 object_names.push(ast::ObjectName::from(vec![ident]));
1233 }
1234 _ => return None,
1237 }
1238 }
1239 Some(ast::JoinConstraint::Using(object_names))
1240 }
1241
1242 fn join_constraint_to_sql(
1244 &self,
1245 constraint: JoinConstraint,
1246 conditions: &[(Expr, Expr)],
1247 filter: Option<&Expr>,
1248 ) -> Result<ast::JoinConstraint> {
1249 match (constraint, conditions, filter) {
1250 (JoinConstraint::On | JoinConstraint::Using, [], None) => {
1252 Ok(ast::JoinConstraint::None)
1253 }
1254
1255 (JoinConstraint::Using, conditions, None) => {
1256 match self.join_using_to_sql(conditions) {
1257 Some(using) => Ok(using),
1258 None => self.join_conditions_to_sql_on(conditions, None),
1261 }
1262 }
1263
1264 (JoinConstraint::On | JoinConstraint::Using, conditions, filter) => {
1272 self.join_conditions_to_sql_on(conditions, filter)
1273 }
1274 }
1275 }
1276
1277 fn join_conditions_to_sql_on(
1281 &self,
1282 join_conditions: &[(Expr, Expr)],
1283 filter: Option<&Expr>,
1284 ) -> Result<ast::JoinConstraint> {
1285 let mut condition = None;
1286 for (left, right) in join_conditions {
1288 let l = self.expr_to_sql(left)?;
1290 let r = self.expr_to_sql(right)?;
1291 let e = self.binary_op_to_sql(l, r, ast::BinaryOperator::Eq);
1292 condition = match condition {
1293 Some(expr) => Some(self.and_op_to_sql(expr, e)),
1294 None => Some(e),
1295 };
1296 }
1297
1298 condition = match (condition, filter) {
1300 (Some(expr), Some(filter)) => {
1301 Some(self.and_op_to_sql(expr, self.expr_to_sql(filter)?))
1302 }
1303 (Some(expr), None) => Some(expr),
1304 (None, Some(filter)) => Some(self.expr_to_sql(filter)?),
1305 (None, None) => None,
1306 };
1307
1308 let constraint = match condition {
1309 Some(filter) => ast::JoinConstraint::On(filter),
1310 None => ast::JoinConstraint::None,
1311 };
1312
1313 Ok(constraint)
1314 }
1315
1316 fn and_op_to_sql(&self, lhs: ast::Expr, rhs: ast::Expr) -> ast::Expr {
1317 self.binary_op_to_sql(lhs, rhs, ast::BinaryOperator::And)
1318 }
1319
1320 fn new_table_alias(&self, alias: String, columns: Vec<Ident>) -> ast::TableAlias {
1321 let columns = columns
1322 .into_iter()
1323 .map(|ident| TableAliasColumnDef {
1324 name: ident,
1325 data_type: None,
1326 })
1327 .collect();
1328 ast::TableAlias {
1329 name: self.new_ident_quoted_if_needs(alias),
1330 columns,
1331 }
1332 }
1333
1334 fn dml_to_sql(&self, plan: &LogicalPlan) -> Result<ast::Statement> {
1335 not_impl_err!("Unsupported plan: {plan:?}")
1336 }
1337}
1338
1339impl From<BuilderError> for DataFusionError {
1340 fn from(e: BuilderError) -> Self {
1341 DataFusionError::External(Box::new(e))
1342 }
1343}
1344
1345#[derive(Debug)]
1347enum UnnestInputType {
1348 OuterReference,
1350 Scalar,
1352}