1use std::collections::HashMap;
19use std::convert::{TryFrom, TryInto};
20use std::sync::Arc;
21
22use crate::common::proto_error;
23use crate::protobuf_common as protobuf;
24use arrow::array::{ArrayRef, AsArray};
25use arrow::buffer::Buffer;
26use arrow::csv::WriterBuilder;
27use arrow::datatypes::{
28 i256, DataType, Field, IntervalDayTimeType, IntervalMonthDayNanoType, IntervalUnit,
29 Schema, TimeUnit, UnionFields, UnionMode,
30};
31use arrow::ipc::{reader::read_record_batch, root_as_message};
32
33use datafusion_common::{
34 arrow_datafusion_err,
35 config::{
36 CsvOptions, JsonOptions, ParquetColumnOptions, ParquetOptions,
37 TableParquetOptions,
38 },
39 file_options::{csv_writer::CsvWriterOptions, json_writer::JsonWriterOptions},
40 parsers::CompressionTypeVariant,
41 plan_datafusion_err,
42 stats::Precision,
43 Column, ColumnStatistics, Constraint, Constraints, DFSchema, DFSchemaRef,
44 DataFusionError, JoinSide, ScalarValue, Statistics, TableReference,
45};
46
47#[derive(Debug)]
48pub enum Error {
49 General(String),
50
51 DataFusionError(DataFusionError),
52
53 MissingRequiredField(String),
54
55 AtLeastOneValue(String),
56
57 UnknownEnumVariant { name: String, value: i32 },
58}
59
60impl std::fmt::Display for Error {
61 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
62 match self {
63 Self::General(desc) => write!(f, "General error: {desc}"),
64
65 Self::DataFusionError(desc) => {
66 write!(f, "DataFusion error: {desc:?}")
67 }
68
69 Self::MissingRequiredField(name) => {
70 write!(f, "Missing required field {name}")
71 }
72 Self::AtLeastOneValue(name) => {
73 write!(f, "Must have at least one {name}, found 0")
74 }
75 Self::UnknownEnumVariant { name, value } => {
76 write!(f, "Unknown i32 value for {name} enum: {value}")
77 }
78 }
79 }
80}
81
82impl std::error::Error for Error {}
83
84impl From<DataFusionError> for Error {
85 fn from(e: DataFusionError) -> Self {
86 Error::DataFusionError(e)
87 }
88}
89
90impl Error {
91 pub fn required(field: impl Into<String>) -> Error {
92 Error::MissingRequiredField(field.into())
93 }
94
95 pub fn unknown(name: impl Into<String>, value: i32) -> Error {
96 Error::UnknownEnumVariant {
97 name: name.into(),
98 value,
99 }
100 }
101}
102
103impl From<Error> for DataFusionError {
104 fn from(e: Error) -> Self {
105 plan_datafusion_err!("{}", e)
106 }
107}
108
109pub trait FromOptionalField<T> {
112 fn optional(self) -> datafusion_common::Result<Option<T>, Error>;
117
118 fn required(self, field: impl Into<String>) -> datafusion_common::Result<T, Error>;
123}
124
125impl<T, U> FromOptionalField<U> for Option<T>
126where
127 T: TryInto<U, Error = Error>,
128{
129 fn optional(self) -> datafusion_common::Result<Option<U>, Error> {
130 self.map(|t| t.try_into()).transpose()
131 }
132
133 fn required(self, field: impl Into<String>) -> datafusion_common::Result<U, Error> {
134 match self {
135 None => Err(Error::required(field)),
136 Some(t) => t.try_into(),
137 }
138 }
139}
140
141impl From<protobuf::Column> for Column {
142 fn from(c: protobuf::Column) -> Self {
143 let protobuf::Column { relation, name } = c;
144
145 Self::new(relation.map(|r| r.relation), name)
146 }
147}
148
149impl From<&protobuf::Column> for Column {
150 fn from(c: &protobuf::Column) -> Self {
151 c.clone().into()
152 }
153}
154
155impl TryFrom<&protobuf::DfSchema> for DFSchema {
156 type Error = Error;
157
158 fn try_from(
159 df_schema: &protobuf::DfSchema,
160 ) -> datafusion_common::Result<Self, Self::Error> {
161 let df_fields = df_schema.columns.clone();
162 let qualifiers_and_fields: Vec<(Option<TableReference>, Arc<Field>)> = df_fields
163 .iter()
164 .map(|df_field| {
165 let field: Field = df_field.field.as_ref().required("field")?;
166 Ok((
167 df_field
168 .qualifier
169 .as_ref()
170 .map(|q| q.relation.clone().into()),
171 Arc::new(field),
172 ))
173 })
174 .collect::<datafusion_common::Result<Vec<_>, Error>>()?;
175
176 Ok(DFSchema::new_with_metadata(
177 qualifiers_and_fields,
178 df_schema.metadata.clone(),
179 )?)
180 }
181}
182
183impl TryFrom<protobuf::DfSchema> for DFSchemaRef {
184 type Error = Error;
185
186 fn try_from(
187 df_schema: protobuf::DfSchema,
188 ) -> datafusion_common::Result<Self, Self::Error> {
189 let dfschema: DFSchema = (&df_schema).try_into()?;
190 Ok(Arc::new(dfschema))
191 }
192}
193
194impl TryFrom<&protobuf::ArrowType> for DataType {
195 type Error = Error;
196
197 fn try_from(
198 arrow_type: &protobuf::ArrowType,
199 ) -> datafusion_common::Result<Self, Self::Error> {
200 arrow_type
201 .arrow_type_enum
202 .as_ref()
203 .required("arrow_type_enum")
204 }
205}
206
207impl TryFrom<&protobuf::arrow_type::ArrowTypeEnum> for DataType {
208 type Error = Error;
209 fn try_from(
210 arrow_type_enum: &protobuf::arrow_type::ArrowTypeEnum,
211 ) -> datafusion_common::Result<Self, Self::Error> {
212 use protobuf::arrow_type;
213 Ok(match arrow_type_enum {
214 arrow_type::ArrowTypeEnum::None(_) => DataType::Null,
215 arrow_type::ArrowTypeEnum::Bool(_) => DataType::Boolean,
216 arrow_type::ArrowTypeEnum::Uint8(_) => DataType::UInt8,
217 arrow_type::ArrowTypeEnum::Int8(_) => DataType::Int8,
218 arrow_type::ArrowTypeEnum::Uint16(_) => DataType::UInt16,
219 arrow_type::ArrowTypeEnum::Int16(_) => DataType::Int16,
220 arrow_type::ArrowTypeEnum::Uint32(_) => DataType::UInt32,
221 arrow_type::ArrowTypeEnum::Int32(_) => DataType::Int32,
222 arrow_type::ArrowTypeEnum::Uint64(_) => DataType::UInt64,
223 arrow_type::ArrowTypeEnum::Int64(_) => DataType::Int64,
224 arrow_type::ArrowTypeEnum::Float16(_) => DataType::Float16,
225 arrow_type::ArrowTypeEnum::Float32(_) => DataType::Float32,
226 arrow_type::ArrowTypeEnum::Float64(_) => DataType::Float64,
227 arrow_type::ArrowTypeEnum::Utf8(_) => DataType::Utf8,
228 arrow_type::ArrowTypeEnum::Utf8View(_) => DataType::Utf8View,
229 arrow_type::ArrowTypeEnum::LargeUtf8(_) => DataType::LargeUtf8,
230 arrow_type::ArrowTypeEnum::Binary(_) => DataType::Binary,
231 arrow_type::ArrowTypeEnum::BinaryView(_) => DataType::BinaryView,
232 arrow_type::ArrowTypeEnum::FixedSizeBinary(size) => {
233 DataType::FixedSizeBinary(*size)
234 }
235 arrow_type::ArrowTypeEnum::LargeBinary(_) => DataType::LargeBinary,
236 arrow_type::ArrowTypeEnum::Date32(_) => DataType::Date32,
237 arrow_type::ArrowTypeEnum::Date64(_) => DataType::Date64,
238 arrow_type::ArrowTypeEnum::Duration(time_unit) => {
239 DataType::Duration(parse_i32_to_time_unit(time_unit)?)
240 }
241 arrow_type::ArrowTypeEnum::Timestamp(protobuf::Timestamp {
242 time_unit,
243 timezone,
244 }) => DataType::Timestamp(
245 parse_i32_to_time_unit(time_unit)?,
246 match timezone.len() {
247 0 => None,
248 _ => Some(timezone.as_str().into()),
249 },
250 ),
251 arrow_type::ArrowTypeEnum::Time32(time_unit) => {
252 DataType::Time32(parse_i32_to_time_unit(time_unit)?)
253 }
254 arrow_type::ArrowTypeEnum::Time64(time_unit) => {
255 DataType::Time64(parse_i32_to_time_unit(time_unit)?)
256 }
257 arrow_type::ArrowTypeEnum::Interval(interval_unit) => {
258 DataType::Interval(parse_i32_to_interval_unit(interval_unit)?)
259 }
260 arrow_type::ArrowTypeEnum::Decimal(protobuf::Decimal {
261 precision,
262 scale,
263 }) => DataType::Decimal128(*precision as u8, *scale as i8),
264 arrow_type::ArrowTypeEnum::Decimal256(protobuf::Decimal256Type {
265 precision,
266 scale,
267 }) => DataType::Decimal256(*precision as u8, *scale as i8),
268 arrow_type::ArrowTypeEnum::List(list) => {
269 let list_type =
270 list.as_ref().field_type.as_deref().required("field_type")?;
271 DataType::List(Arc::new(list_type))
272 }
273 arrow_type::ArrowTypeEnum::LargeList(list) => {
274 let list_type =
275 list.as_ref().field_type.as_deref().required("field_type")?;
276 DataType::LargeList(Arc::new(list_type))
277 }
278 arrow_type::ArrowTypeEnum::FixedSizeList(list) => {
279 let list_type =
280 list.as_ref().field_type.as_deref().required("field_type")?;
281 let list_size = list.list_size;
282 DataType::FixedSizeList(Arc::new(list_type), list_size)
283 }
284 arrow_type::ArrowTypeEnum::Struct(strct) => DataType::Struct(
285 parse_proto_fields_to_fields(&strct.sub_field_types)?.into(),
286 ),
287 arrow_type::ArrowTypeEnum::Union(union) => {
288 let union_mode = protobuf::UnionMode::try_from(union.union_mode)
289 .map_err(|_| Error::unknown("UnionMode", union.union_mode))?;
290 let union_mode = match union_mode {
291 protobuf::UnionMode::Dense => UnionMode::Dense,
292 protobuf::UnionMode::Sparse => UnionMode::Sparse,
293 };
294 let union_fields = parse_proto_fields_to_fields(&union.union_types)?;
295
296 let type_ids: Vec<_> = match union.type_ids.is_empty() {
298 true => (0..union_fields.len() as i8).collect(),
299 false => union.type_ids.iter().map(|i| *i as i8).collect(),
300 };
301
302 DataType::Union(UnionFields::new(type_ids, union_fields), union_mode)
303 }
304 arrow_type::ArrowTypeEnum::Dictionary(dict) => {
305 let key_datatype = dict.as_ref().key.as_deref().required("key")?;
306 let value_datatype = dict.as_ref().value.as_deref().required("value")?;
307 DataType::Dictionary(Box::new(key_datatype), Box::new(value_datatype))
308 }
309 arrow_type::ArrowTypeEnum::Map(map) => {
310 let field: Field =
311 map.as_ref().field_type.as_deref().required("field_type")?;
312 let keys_sorted = map.keys_sorted;
313 DataType::Map(Arc::new(field), keys_sorted)
314 }
315 })
316 }
317}
318
319impl TryFrom<&protobuf::Field> for Field {
320 type Error = Error;
321 fn try_from(field: &protobuf::Field) -> Result<Self, Self::Error> {
322 let datatype = field.arrow_type.as_deref().required("arrow_type")?;
323 let field = Self::new(field.name.as_str(), datatype, field.nullable)
324 .with_metadata(field.metadata.clone());
325 Ok(field)
326 }
327}
328
329impl TryFrom<&protobuf::Schema> for Schema {
330 type Error = Error;
331
332 fn try_from(
333 schema: &protobuf::Schema,
334 ) -> datafusion_common::Result<Self, Self::Error> {
335 let fields = schema
336 .columns
337 .iter()
338 .map(Field::try_from)
339 .collect::<datafusion_common::Result<Vec<_>, _>>()?;
340 Ok(Self::new_with_metadata(fields, schema.metadata.clone()))
341 }
342}
343
344impl TryFrom<&protobuf::ScalarValue> for ScalarValue {
345 type Error = Error;
346
347 fn try_from(
348 scalar: &protobuf::ScalarValue,
349 ) -> datafusion_common::Result<Self, Self::Error> {
350 use protobuf::scalar_value::Value;
351
352 let value = scalar
353 .value
354 .as_ref()
355 .ok_or_else(|| Error::required("value"))?;
356
357 Ok(match value {
358 Value::BoolValue(v) => Self::Boolean(Some(*v)),
359 Value::Utf8Value(v) => Self::Utf8(Some(v.to_owned())),
360 Value::Utf8ViewValue(v) => Self::Utf8View(Some(v.to_owned())),
361 Value::LargeUtf8Value(v) => Self::LargeUtf8(Some(v.to_owned())),
362 Value::Int8Value(v) => Self::Int8(Some(*v as i8)),
363 Value::Int16Value(v) => Self::Int16(Some(*v as i16)),
364 Value::Int32Value(v) => Self::Int32(Some(*v)),
365 Value::Int64Value(v) => Self::Int64(Some(*v)),
366 Value::Uint8Value(v) => Self::UInt8(Some(*v as u8)),
367 Value::Uint16Value(v) => Self::UInt16(Some(*v as u16)),
368 Value::Uint32Value(v) => Self::UInt32(Some(*v)),
369 Value::Uint64Value(v) => Self::UInt64(Some(*v)),
370 Value::Float32Value(v) => Self::Float32(Some(*v)),
371 Value::Float64Value(v) => Self::Float64(Some(*v)),
372 Value::Date32Value(v) => Self::Date32(Some(*v)),
373 Value::ListValue(v)
375 | Value::FixedSizeListValue(v)
376 | Value::LargeListValue(v)
377 | Value::StructValue(v)
378 | Value::MapValue(v) => {
379 let protobuf::ScalarNestedValue {
380 ipc_message,
381 arrow_data,
382 dictionaries,
383 schema,
384 } = &v;
385
386 let schema: Schema = if let Some(schema_ref) = schema {
387 schema_ref.try_into()?
388 } else {
389 return Err(Error::General(
390 "Invalid schema while deserializing ScalarValue::List"
391 .to_string(),
392 ));
393 };
394
395 let message = root_as_message(ipc_message.as_slice()).map_err(|e| {
396 Error::General(format!(
397 "Error IPC message while deserializing ScalarValue::List: {e}"
398 ))
399 })?;
400 let buffer = Buffer::from(arrow_data.as_slice());
401
402 let ipc_batch = message.header_as_record_batch().ok_or_else(|| {
403 Error::General(
404 "Unexpected message type deserializing ScalarValue::List"
405 .to_string(),
406 )
407 })?;
408
409 let dict_by_id: HashMap<i64,ArrayRef> = dictionaries.iter().map(|protobuf::scalar_nested_value::Dictionary { ipc_message, arrow_data }| {
410 let message = root_as_message(ipc_message.as_slice()).map_err(|e| {
411 Error::General(format!(
412 "Error IPC message while deserializing ScalarValue::List dictionary message: {e}"
413 ))
414 })?;
415 let buffer = Buffer::from(arrow_data.as_slice());
416
417 let dict_batch = message.header_as_dictionary_batch().ok_or_else(|| {
418 Error::General(
419 "Unexpected message type deserializing ScalarValue::List dictionary message"
420 .to_string(),
421 )
422 })?;
423
424 let id = dict_batch.id();
425
426 let record_batch = read_record_batch(
427 &buffer,
428 dict_batch.data().unwrap(),
429 Arc::new(schema.clone()),
430 &Default::default(),
431 None,
432 &message.version(),
433 )?;
434
435 let values: ArrayRef = Arc::clone(record_batch.column(0));
436
437 Ok((id, values))
438 }).collect::<datafusion_common::Result<HashMap<_, _>>>()?;
439
440 let record_batch = read_record_batch(
441 &buffer,
442 ipc_batch,
443 Arc::new(schema),
444 &dict_by_id,
445 None,
446 &message.version(),
447 )
448 .map_err(|e| arrow_datafusion_err!(e))
449 .map_err(|e| e.context("Decoding ScalarValue::List Value"))?;
450 let arr = record_batch.column(0);
451 match value {
452 Value::ListValue(_) => {
453 Self::List(arr.as_list::<i32>().to_owned().into())
454 }
455 Value::LargeListValue(_) => {
456 Self::LargeList(arr.as_list::<i64>().to_owned().into())
457 }
458 Value::FixedSizeListValue(_) => {
459 Self::FixedSizeList(arr.as_fixed_size_list().to_owned().into())
460 }
461 Value::StructValue(_) => {
462 Self::Struct(arr.as_struct().to_owned().into())
463 }
464 Value::MapValue(_) => Self::Map(arr.as_map().to_owned().into()),
465 _ => unreachable!(),
466 }
467 }
468 Value::NullValue(v) => {
469 let null_type: DataType = v.try_into()?;
470 null_type.try_into().map_err(Error::DataFusionError)?
471 }
472 Value::Decimal128Value(val) => {
473 let array = vec_to_array(val.value.clone());
474 Self::Decimal128(
475 Some(i128::from_be_bytes(array)),
476 val.p as u8,
477 val.s as i8,
478 )
479 }
480 Value::Decimal256Value(val) => {
481 let array = vec_to_array(val.value.clone());
482 Self::Decimal256(
483 Some(i256::from_be_bytes(array)),
484 val.p as u8,
485 val.s as i8,
486 )
487 }
488 Value::Date64Value(v) => Self::Date64(Some(*v)),
489 Value::Time32Value(v) => {
490 let time_value =
491 v.value.as_ref().ok_or_else(|| Error::required("value"))?;
492 match time_value {
493 protobuf::scalar_time32_value::Value::Time32SecondValue(t) => {
494 Self::Time32Second(Some(*t))
495 }
496 protobuf::scalar_time32_value::Value::Time32MillisecondValue(t) => {
497 Self::Time32Millisecond(Some(*t))
498 }
499 }
500 }
501 Value::Time64Value(v) => {
502 let time_value =
503 v.value.as_ref().ok_or_else(|| Error::required("value"))?;
504 match time_value {
505 protobuf::scalar_time64_value::Value::Time64MicrosecondValue(t) => {
506 Self::Time64Microsecond(Some(*t))
507 }
508 protobuf::scalar_time64_value::Value::Time64NanosecondValue(t) => {
509 Self::Time64Nanosecond(Some(*t))
510 }
511 }
512 }
513 Value::IntervalYearmonthValue(v) => Self::IntervalYearMonth(Some(*v)),
514 Value::DurationSecondValue(v) => Self::DurationSecond(Some(*v)),
515 Value::DurationMillisecondValue(v) => Self::DurationMillisecond(Some(*v)),
516 Value::DurationMicrosecondValue(v) => Self::DurationMicrosecond(Some(*v)),
517 Value::DurationNanosecondValue(v) => Self::DurationNanosecond(Some(*v)),
518 Value::TimestampValue(v) => {
519 let timezone = if v.timezone.is_empty() {
520 None
521 } else {
522 Some(v.timezone.as_str().into())
523 };
524
525 let ts_value =
526 v.value.as_ref().ok_or_else(|| Error::required("value"))?;
527
528 match ts_value {
529 protobuf::scalar_timestamp_value::Value::TimeMicrosecondValue(t) => {
530 Self::TimestampMicrosecond(Some(*t), timezone)
531 }
532 protobuf::scalar_timestamp_value::Value::TimeNanosecondValue(t) => {
533 Self::TimestampNanosecond(Some(*t), timezone)
534 }
535 protobuf::scalar_timestamp_value::Value::TimeSecondValue(t) => {
536 Self::TimestampSecond(Some(*t), timezone)
537 }
538 protobuf::scalar_timestamp_value::Value::TimeMillisecondValue(t) => {
539 Self::TimestampMillisecond(Some(*t), timezone)
540 }
541 }
542 }
543 Value::DictionaryValue(v) => {
544 let index_type: DataType = v
545 .index_type
546 .as_ref()
547 .ok_or_else(|| Error::required("index_type"))?
548 .try_into()?;
549
550 let value: Self = v
551 .value
552 .as_ref()
553 .ok_or_else(|| Error::required("value"))?
554 .as_ref()
555 .try_into()?;
556
557 Self::Dictionary(Box::new(index_type), Box::new(value))
558 }
559 Value::BinaryValue(v) => Self::Binary(Some(v.clone())),
560 Value::BinaryViewValue(v) => Self::BinaryView(Some(v.clone())),
561 Value::LargeBinaryValue(v) => Self::LargeBinary(Some(v.clone())),
562 Value::IntervalDaytimeValue(v) => Self::IntervalDayTime(Some(
563 IntervalDayTimeType::make_value(v.days, v.milliseconds),
564 )),
565 Value::IntervalMonthDayNano(v) => Self::IntervalMonthDayNano(Some(
566 IntervalMonthDayNanoType::make_value(v.months, v.days, v.nanos),
567 )),
568 Value::UnionValue(val) => {
569 let mode = match val.mode {
570 0 => UnionMode::Sparse,
571 1 => UnionMode::Dense,
572 id => Err(Error::unknown("UnionMode", id))?,
573 };
574 let ids = val
575 .fields
576 .iter()
577 .map(|f| f.field_id as i8)
578 .collect::<Vec<_>>();
579 let fields = val
580 .fields
581 .iter()
582 .map(|f| f.field.clone())
583 .collect::<Option<Vec<_>>>();
584 let fields = fields.ok_or_else(|| Error::required("UnionField"))?;
585 let fields = parse_proto_fields_to_fields(&fields)?;
586 let fields = UnionFields::new(ids, fields);
587 let v_id = val.value_id as i8;
588 let val = match &val.value {
589 None => None,
590 Some(val) => {
591 let val: ScalarValue = val
592 .as_ref()
593 .try_into()
594 .map_err(|_| Error::General("Invalid Scalar".to_string()))?;
595 Some((v_id, Box::new(val)))
596 }
597 };
598 Self::Union(val, fields, mode)
599 }
600 Value::FixedSizeBinaryValue(v) => {
601 Self::FixedSizeBinary(v.length, Some(v.clone().values))
602 }
603 })
604 }
605}
606
607impl From<protobuf::TimeUnit> for TimeUnit {
608 fn from(time_unit: protobuf::TimeUnit) -> Self {
609 match time_unit {
610 protobuf::TimeUnit::Second => TimeUnit::Second,
611 protobuf::TimeUnit::Millisecond => TimeUnit::Millisecond,
612 protobuf::TimeUnit::Microsecond => TimeUnit::Microsecond,
613 protobuf::TimeUnit::Nanosecond => TimeUnit::Nanosecond,
614 }
615 }
616}
617
618impl From<protobuf::IntervalUnit> for IntervalUnit {
619 fn from(interval_unit: protobuf::IntervalUnit) -> Self {
620 match interval_unit {
621 protobuf::IntervalUnit::YearMonth => IntervalUnit::YearMonth,
622 protobuf::IntervalUnit::DayTime => IntervalUnit::DayTime,
623 protobuf::IntervalUnit::MonthDayNano => IntervalUnit::MonthDayNano,
624 }
625 }
626}
627
628impl From<protobuf::Constraints> for Constraints {
629 fn from(constraints: protobuf::Constraints) -> Self {
630 Constraints::new_unverified(
631 constraints
632 .constraints
633 .into_iter()
634 .map(|item| item.into())
635 .collect(),
636 )
637 }
638}
639
640impl From<protobuf::Constraint> for Constraint {
641 fn from(value: protobuf::Constraint) -> Self {
642 match value.constraint_mode.unwrap() {
643 protobuf::constraint::ConstraintMode::PrimaryKey(elem) => {
644 Constraint::PrimaryKey(
645 elem.indices.into_iter().map(|item| item as usize).collect(),
646 )
647 }
648 protobuf::constraint::ConstraintMode::Unique(elem) => Constraint::Unique(
649 elem.indices.into_iter().map(|item| item as usize).collect(),
650 ),
651 }
652 }
653}
654
655impl From<&protobuf::ColumnStats> for ColumnStatistics {
656 fn from(cs: &protobuf::ColumnStats) -> ColumnStatistics {
657 ColumnStatistics {
658 null_count: if let Some(nc) = &cs.null_count {
659 nc.clone().into()
660 } else {
661 Precision::Absent
662 },
663 max_value: if let Some(max) = &cs.max_value {
664 max.clone().into()
665 } else {
666 Precision::Absent
667 },
668 min_value: if let Some(min) = &cs.min_value {
669 min.clone().into()
670 } else {
671 Precision::Absent
672 },
673 sum_value: if let Some(sum) = &cs.sum_value {
674 sum.clone().into()
675 } else {
676 Precision::Absent
677 },
678 distinct_count: if let Some(dc) = &cs.distinct_count {
679 dc.clone().into()
680 } else {
681 Precision::Absent
682 },
683 }
684 }
685}
686
687impl From<protobuf::Precision> for Precision<usize> {
688 fn from(s: protobuf::Precision) -> Self {
689 let Ok(precision_type) = s.precision_info.try_into() else {
690 return Precision::Absent;
691 };
692 match precision_type {
693 protobuf::PrecisionInfo::Exact => {
694 if let Some(val) = s.val {
695 if let Ok(ScalarValue::UInt64(Some(val))) =
696 ScalarValue::try_from(&val)
697 {
698 Precision::Exact(val as usize)
699 } else {
700 Precision::Absent
701 }
702 } else {
703 Precision::Absent
704 }
705 }
706 protobuf::PrecisionInfo::Inexact => {
707 if let Some(val) = s.val {
708 if let Ok(ScalarValue::UInt64(Some(val))) =
709 ScalarValue::try_from(&val)
710 {
711 Precision::Inexact(val as usize)
712 } else {
713 Precision::Absent
714 }
715 } else {
716 Precision::Absent
717 }
718 }
719 protobuf::PrecisionInfo::Absent => Precision::Absent,
720 }
721 }
722}
723
724impl From<protobuf::Precision> for Precision<ScalarValue> {
725 fn from(s: protobuf::Precision) -> Self {
726 let Ok(precision_type) = s.precision_info.try_into() else {
727 return Precision::Absent;
728 };
729 match precision_type {
730 protobuf::PrecisionInfo::Exact => {
731 if let Some(val) = s.val {
732 if let Ok(val) = ScalarValue::try_from(&val) {
733 Precision::Exact(val)
734 } else {
735 Precision::Absent
736 }
737 } else {
738 Precision::Absent
739 }
740 }
741 protobuf::PrecisionInfo::Inexact => {
742 if let Some(val) = s.val {
743 if let Ok(val) = ScalarValue::try_from(&val) {
744 Precision::Inexact(val)
745 } else {
746 Precision::Absent
747 }
748 } else {
749 Precision::Absent
750 }
751 }
752 protobuf::PrecisionInfo::Absent => Precision::Absent,
753 }
754 }
755}
756
757impl From<protobuf::JoinSide> for JoinSide {
758 fn from(t: protobuf::JoinSide) -> Self {
759 match t {
760 protobuf::JoinSide::LeftSide => JoinSide::Left,
761 protobuf::JoinSide::RightSide => JoinSide::Right,
762 protobuf::JoinSide::None => JoinSide::None,
763 }
764 }
765}
766
767impl From<&protobuf::Constraint> for Constraint {
768 fn from(value: &protobuf::Constraint) -> Self {
769 match &value.constraint_mode {
770 Some(protobuf::constraint::ConstraintMode::PrimaryKey(elem)) => {
771 Constraint::PrimaryKey(
772 elem.indices.iter().map(|&item| item as usize).collect(),
773 )
774 }
775 Some(protobuf::constraint::ConstraintMode::Unique(elem)) => {
776 Constraint::Unique(
777 elem.indices.iter().map(|&item| item as usize).collect(),
778 )
779 }
780 None => panic!("constraint_mode not set"),
781 }
782 }
783}
784
785impl TryFrom<&protobuf::Constraints> for Constraints {
786 type Error = DataFusionError;
787
788 fn try_from(
789 constraints: &protobuf::Constraints,
790 ) -> datafusion_common::Result<Self, Self::Error> {
791 Ok(Constraints::new_unverified(
792 constraints
793 .constraints
794 .iter()
795 .map(|item| item.into())
796 .collect(),
797 ))
798 }
799}
800
801impl TryFrom<&protobuf::Statistics> for Statistics {
802 type Error = DataFusionError;
803
804 fn try_from(
805 s: &protobuf::Statistics,
806 ) -> datafusion_common::Result<Self, Self::Error> {
807 Ok(Statistics {
809 num_rows: if let Some(nr) = &s.num_rows {
810 nr.clone().into()
811 } else {
812 Precision::Absent
813 },
814 total_byte_size: if let Some(tbs) = &s.total_byte_size {
815 tbs.clone().into()
816 } else {
817 Precision::Absent
818 },
819 column_statistics: s.column_stats.iter().map(|s| s.into()).collect(),
821 })
822 }
823}
824
825impl From<protobuf::CompressionTypeVariant> for CompressionTypeVariant {
826 fn from(value: protobuf::CompressionTypeVariant) -> Self {
827 match value {
828 protobuf::CompressionTypeVariant::Gzip => Self::GZIP,
829 protobuf::CompressionTypeVariant::Bzip2 => Self::BZIP2,
830 protobuf::CompressionTypeVariant::Xz => Self::XZ,
831 protobuf::CompressionTypeVariant::Zstd => Self::ZSTD,
832 protobuf::CompressionTypeVariant::Uncompressed => Self::UNCOMPRESSED,
833 }
834 }
835}
836
837impl From<CompressionTypeVariant> for protobuf::CompressionTypeVariant {
838 fn from(value: CompressionTypeVariant) -> Self {
839 match value {
840 CompressionTypeVariant::GZIP => Self::Gzip,
841 CompressionTypeVariant::BZIP2 => Self::Bzip2,
842 CompressionTypeVariant::XZ => Self::Xz,
843 CompressionTypeVariant::ZSTD => Self::Zstd,
844 CompressionTypeVariant::UNCOMPRESSED => Self::Uncompressed,
845 }
846 }
847}
848
849impl TryFrom<&protobuf::CsvWriterOptions> for CsvWriterOptions {
850 type Error = DataFusionError;
851
852 fn try_from(
853 opts: &protobuf::CsvWriterOptions,
854 ) -> datafusion_common::Result<Self, Self::Error> {
855 let write_options = csv_writer_options_from_proto(opts)?;
856 let compression: CompressionTypeVariant = opts.compression().into();
857 Ok(CsvWriterOptions::new(write_options, compression))
858 }
859}
860
861impl TryFrom<&protobuf::JsonWriterOptions> for JsonWriterOptions {
862 type Error = DataFusionError;
863
864 fn try_from(
865 opts: &protobuf::JsonWriterOptions,
866 ) -> datafusion_common::Result<Self, Self::Error> {
867 let compression: CompressionTypeVariant = opts.compression().into();
868 Ok(JsonWriterOptions::new(compression))
869 }
870}
871
872impl TryFrom<&protobuf::CsvOptions> for CsvOptions {
873 type Error = DataFusionError;
874
875 fn try_from(
876 proto_opts: &protobuf::CsvOptions,
877 ) -> datafusion_common::Result<Self, Self::Error> {
878 Ok(CsvOptions {
879 has_header: proto_opts.has_header.first().map(|h| *h != 0),
880 delimiter: proto_opts.delimiter[0],
881 quote: proto_opts.quote[0],
882 terminator: proto_opts.terminator.first().copied(),
883 escape: proto_opts.escape.first().copied(),
884 double_quote: proto_opts.has_header.first().map(|h| *h != 0),
885 newlines_in_values: proto_opts.newlines_in_values.first().map(|h| *h != 0),
886 compression: proto_opts.compression().into(),
887 schema_infer_max_rec: proto_opts.schema_infer_max_rec.map(|h| h as usize),
888 date_format: (!proto_opts.date_format.is_empty())
889 .then(|| proto_opts.date_format.clone()),
890 datetime_format: (!proto_opts.datetime_format.is_empty())
891 .then(|| proto_opts.datetime_format.clone()),
892 timestamp_format: (!proto_opts.timestamp_format.is_empty())
893 .then(|| proto_opts.timestamp_format.clone()),
894 timestamp_tz_format: (!proto_opts.timestamp_tz_format.is_empty())
895 .then(|| proto_opts.timestamp_tz_format.clone()),
896 time_format: (!proto_opts.time_format.is_empty())
897 .then(|| proto_opts.time_format.clone()),
898 null_value: (!proto_opts.null_value.is_empty())
899 .then(|| proto_opts.null_value.clone()),
900 null_regex: (!proto_opts.null_regex.is_empty())
901 .then(|| proto_opts.null_regex.clone()),
902 comment: proto_opts.comment.first().copied(),
903 })
904 }
905}
906
907impl TryFrom<&protobuf::ParquetOptions> for ParquetOptions {
908 type Error = DataFusionError;
909
910 fn try_from(
911 value: &protobuf::ParquetOptions,
912 ) -> datafusion_common::Result<Self, Self::Error> {
913 #[allow(deprecated)] Ok(ParquetOptions {
915 enable_page_index: value.enable_page_index,
916 pruning: value.pruning,
917 skip_metadata: value.skip_metadata,
918 metadata_size_hint: value
919 .metadata_size_hint_opt
920 .map(|opt| match opt {
921 protobuf::parquet_options::MetadataSizeHintOpt::MetadataSizeHint(v) => Some(v as usize),
922 })
923 .unwrap_or(None),
924 pushdown_filters: value.pushdown_filters,
925 reorder_filters: value.reorder_filters,
926 data_pagesize_limit: value.data_pagesize_limit as usize,
927 write_batch_size: value.write_batch_size as usize,
928 writer_version: value.writer_version.clone(),
929 compression: value.compression_opt.clone().map(|opt| match opt {
930 protobuf::parquet_options::CompressionOpt::Compression(v) => Some(v),
931 }).unwrap_or(None),
932 dictionary_enabled: value.dictionary_enabled_opt.as_ref().map(|protobuf::parquet_options::DictionaryEnabledOpt::DictionaryEnabled(v)| *v),
933 dictionary_page_size_limit: value.dictionary_page_size_limit as usize,
935 statistics_enabled: value
936 .statistics_enabled_opt.clone()
937 .map(|opt| match opt {
938 protobuf::parquet_options::StatisticsEnabledOpt::StatisticsEnabled(v) => Some(v),
939 })
940 .unwrap_or(None),
941 max_statistics_size: value
942 .max_statistics_size_opt.as_ref()
943 .map(|opt| match opt {
944 protobuf::parquet_options::MaxStatisticsSizeOpt::MaxStatisticsSize(v) => Some(*v as usize),
945 })
946 .unwrap_or(None),
947 max_row_group_size: value.max_row_group_size as usize,
948 created_by: value.created_by.clone(),
949 column_index_truncate_length: value
950 .column_index_truncate_length_opt.as_ref()
951 .map(|opt| match opt {
952 protobuf::parquet_options::ColumnIndexTruncateLengthOpt::ColumnIndexTruncateLength(v) => Some(*v as usize),
953 })
954 .unwrap_or(None),
955 statistics_truncate_length: value
956 .statistics_truncate_length_opt.as_ref()
957 .map(|opt| match opt {
958 protobuf::parquet_options::StatisticsTruncateLengthOpt::StatisticsTruncateLength(v) => Some(*v as usize),
959 })
960 .unwrap_or(None),
961 data_page_row_count_limit: value.data_page_row_count_limit as usize,
962 encoding: value
963 .encoding_opt.clone()
964 .map(|opt| match opt {
965 protobuf::parquet_options::EncodingOpt::Encoding(v) => Some(v),
966 })
967 .unwrap_or(None),
968 bloom_filter_on_read: value.bloom_filter_on_read,
969 bloom_filter_on_write: value.bloom_filter_on_write,
970 bloom_filter_fpp: value.clone()
971 .bloom_filter_fpp_opt
972 .map(|opt| match opt {
973 protobuf::parquet_options::BloomFilterFppOpt::BloomFilterFpp(v) => Some(v),
974 })
975 .unwrap_or(None),
976 bloom_filter_ndv: value.clone()
977 .bloom_filter_ndv_opt
978 .map(|opt| match opt {
979 protobuf::parquet_options::BloomFilterNdvOpt::BloomFilterNdv(v) => Some(v),
980 })
981 .unwrap_or(None),
982 allow_single_file_parallelism: value.allow_single_file_parallelism,
983 maximum_parallel_row_group_writers: value.maximum_parallel_row_group_writers as usize,
984 maximum_buffered_record_batches_per_stream: value.maximum_buffered_record_batches_per_stream as usize,
985 schema_force_view_types: value.schema_force_view_types,
986 binary_as_string: value.binary_as_string,
987 coerce_int96: value.coerce_int96_opt.clone().map(|opt| match opt {
988 protobuf::parquet_options::CoerceInt96Opt::CoerceInt96(v) => Some(v),
989 }).unwrap_or(None),
990 skip_arrow_metadata: value.skip_arrow_metadata,
991 })
992 }
993}
994
995impl TryFrom<&protobuf::ParquetColumnOptions> for ParquetColumnOptions {
996 type Error = DataFusionError;
997 fn try_from(
998 value: &protobuf::ParquetColumnOptions,
999 ) -> datafusion_common::Result<Self, Self::Error> {
1000 #[allow(deprecated)] Ok(ParquetColumnOptions {
1002 compression: value.compression_opt.clone().map(|opt| match opt {
1003 protobuf::parquet_column_options::CompressionOpt::Compression(v) => Some(v),
1004 }).unwrap_or(None),
1005 dictionary_enabled: value.dictionary_enabled_opt.as_ref().map(|protobuf::parquet_column_options::DictionaryEnabledOpt::DictionaryEnabled(v)| *v),
1006 statistics_enabled: value
1007 .statistics_enabled_opt.clone()
1008 .map(|opt| match opt {
1009 protobuf::parquet_column_options::StatisticsEnabledOpt::StatisticsEnabled(v) => Some(v),
1010 })
1011 .unwrap_or(None),
1012 max_statistics_size: value
1013 .max_statistics_size_opt
1014 .map(|opt| match opt {
1015 protobuf::parquet_column_options::MaxStatisticsSizeOpt::MaxStatisticsSize(v) => Some(v as usize),
1016 })
1017 .unwrap_or(None),
1018 encoding: value
1019 .encoding_opt.clone()
1020 .map(|opt| match opt {
1021 protobuf::parquet_column_options::EncodingOpt::Encoding(v) => Some(v),
1022 })
1023 .unwrap_or(None),
1024 bloom_filter_enabled: value.bloom_filter_enabled_opt.map(|opt| match opt {
1025 protobuf::parquet_column_options::BloomFilterEnabledOpt::BloomFilterEnabled(v) => Some(v),
1026 })
1027 .unwrap_or(None),
1028 bloom_filter_fpp: value
1029 .bloom_filter_fpp_opt
1030 .map(|opt| match opt {
1031 protobuf::parquet_column_options::BloomFilterFppOpt::BloomFilterFpp(v) => Some(v),
1032 })
1033 .unwrap_or(None),
1034 bloom_filter_ndv: value
1035 .bloom_filter_ndv_opt
1036 .map(|opt| match opt {
1037 protobuf::parquet_column_options::BloomFilterNdvOpt::BloomFilterNdv(v) => Some(v),
1038 })
1039 .unwrap_or(None),
1040 })
1041 }
1042}
1043
1044impl TryFrom<&protobuf::TableParquetOptions> for TableParquetOptions {
1045 type Error = DataFusionError;
1046 fn try_from(
1047 value: &protobuf::TableParquetOptions,
1048 ) -> datafusion_common::Result<Self, Self::Error> {
1049 let mut column_specific_options: HashMap<String, ParquetColumnOptions> =
1050 HashMap::new();
1051 for protobuf::ParquetColumnSpecificOptions {
1052 column_name,
1053 options: maybe_options,
1054 } in &value.column_specific_options
1055 {
1056 if let Some(options) = maybe_options {
1057 column_specific_options.insert(column_name.clone(), options.try_into()?);
1058 }
1059 }
1060 Ok(TableParquetOptions {
1061 global: value
1062 .global
1063 .as_ref()
1064 .map(|v| v.try_into())
1065 .unwrap()
1066 .unwrap(),
1067 column_specific_options,
1068 key_value_metadata: Default::default(),
1069 })
1070 }
1071}
1072
1073impl TryFrom<&protobuf::JsonOptions> for JsonOptions {
1074 type Error = DataFusionError;
1075
1076 fn try_from(
1077 proto_opts: &protobuf::JsonOptions,
1078 ) -> datafusion_common::Result<Self, Self::Error> {
1079 let compression: protobuf::CompressionTypeVariant = proto_opts.compression();
1080 Ok(JsonOptions {
1081 compression: compression.into(),
1082 schema_infer_max_rec: proto_opts.schema_infer_max_rec.map(|h| h as usize),
1083 })
1084 }
1085}
1086
1087pub fn parse_i32_to_time_unit(value: &i32) -> datafusion_common::Result<TimeUnit, Error> {
1088 protobuf::TimeUnit::try_from(*value)
1089 .map(|t| t.into())
1090 .map_err(|_| Error::unknown("TimeUnit", *value))
1091}
1092
1093pub fn parse_i32_to_interval_unit(
1094 value: &i32,
1095) -> datafusion_common::Result<IntervalUnit, Error> {
1096 protobuf::IntervalUnit::try_from(*value)
1097 .map(|t| t.into())
1098 .map_err(|_| Error::unknown("IntervalUnit", *value))
1099}
1100
1101fn vec_to_array<T, const N: usize>(v: Vec<T>) -> [T; N] {
1103 v.try_into().unwrap_or_else(|v: Vec<T>| {
1104 panic!("Expected a Vec of length {} but it was {}", N, v.len())
1105 })
1106}
1107
1108pub fn parse_proto_fields_to_fields<'a, I>(
1110 fields: I,
1111) -> std::result::Result<Vec<Field>, Error>
1112where
1113 I: IntoIterator<Item = &'a protobuf::Field>,
1114{
1115 fields
1116 .into_iter()
1117 .map(Field::try_from)
1118 .collect::<datafusion_common::Result<_, _>>()
1119}
1120
1121pub(crate) fn csv_writer_options_from_proto(
1122 writer_options: &protobuf::CsvWriterOptions,
1123) -> datafusion_common::Result<WriterBuilder> {
1124 let mut builder = WriterBuilder::new();
1125 if !writer_options.delimiter.is_empty() {
1126 if let Some(delimiter) = writer_options.delimiter.chars().next() {
1127 if delimiter.is_ascii() {
1128 builder = builder.with_delimiter(delimiter as u8);
1129 } else {
1130 return Err(proto_error("CSV Delimiter is not ASCII"));
1131 }
1132 } else {
1133 return Err(proto_error("Error parsing CSV Delimiter"));
1134 }
1135 }
1136 if !writer_options.quote.is_empty() {
1137 if let Some(quote) = writer_options.quote.chars().next() {
1138 if quote.is_ascii() {
1139 builder = builder.with_quote(quote as u8);
1140 } else {
1141 return Err(proto_error("CSV Quote is not ASCII"));
1142 }
1143 } else {
1144 return Err(proto_error("Error parsing CSV Quote"));
1145 }
1146 }
1147 if !writer_options.escape.is_empty() {
1148 if let Some(escape) = writer_options.escape.chars().next() {
1149 if escape.is_ascii() {
1150 builder = builder.with_escape(escape as u8);
1151 } else {
1152 return Err(proto_error("CSV Escape is not ASCII"));
1153 }
1154 } else {
1155 return Err(proto_error("Error parsing CSV Escape"));
1156 }
1157 }
1158 Ok(builder
1159 .with_header(writer_options.has_header)
1160 .with_date_format(writer_options.date_format.clone())
1161 .with_datetime_format(writer_options.datetime_format.clone())
1162 .with_timestamp_format(writer_options.timestamp_format.clone())
1163 .with_time_format(writer_options.time_format.clone())
1164 .with_null(writer_options.null_value.clone())
1165 .with_double_quote(writer_options.double_quote))
1166}