1#![deny(unused_crate_dependencies)]
4
5use anyhow::{anyhow, Error, Result};
6use clap::{Parser, Subcommand};
7use duckdb as _;
8use libduckdb_sys as _;
9use stac::{geoparquet::Compression, Collection, Format, Item, Links, Migrate, Validate};
10use stac_api::{GetItems, GetSearch, Search};
11use stac_server::Backend;
12use std::{collections::HashMap, io::Write, str::FromStr};
13use tokio::{io::AsyncReadExt, net::TcpListener, runtime::Handle};
14use tracing::metadata::Level;
15
16#[derive(Debug, Parser)]
18pub struct Stacrs {
19 #[command(subcommand)]
20 command: Command,
21
22 #[arg(
31 short = 'i',
32 long = "input-format",
33 global = true,
34 verbatim_doc_comment
35 )]
36 input_format: Option<Format>,
37
38 #[arg(long = "opt", global = true, verbatim_doc_comment)]
42 options: Vec<KeyValue>,
43
44 #[arg(
53 short = 'o',
54 long = "output-format",
55 global = true,
56 verbatim_doc_comment
57 )]
58 output_format: Option<Format>,
59
60 #[arg(short = 'c', long = "compact-json", global = true)]
65 compact_json: Option<bool>,
66
67 #[arg(long = "parquet-compression", global = true, verbatim_doc_comment)]
82 parquet_compression: Option<Compression>,
83
84 #[arg(
85 long,
86 short = 'v',
87 action = clap::ArgAction::Count,
88 global = true,
89 help = ErrorLevel::verbose_help(),
90 long_help = ErrorLevel::verbose_long_help(),
91 )]
92 verbose: u8,
93
94 #[arg(
95 long,
96 short = 'q',
97 action = clap::ArgAction::Count,
98 global = true,
99 help = ErrorLevel::quiet_help(),
100 long_help = ErrorLevel::quiet_long_help(),
101 conflicts_with = "verbose",
102 )]
103 quiet: u8,
104}
105
106#[derive(Debug, Subcommand)]
108#[allow(clippy::large_enum_variant)]
109pub enum Command {
110 Translate {
112 infile: Option<String>,
116
117 outfile: Option<String>,
121
122 #[arg(long = "migrate", default_value_t = false)]
127 migrate: bool,
128
129 #[arg(long = "to")]
134 to: Option<String>,
135 },
136
137 Search {
139 href: String,
141
142 outfile: Option<String>,
146
147 #[arg(long = "use-duckdb")]
153 use_duckdb: Option<bool>,
154
155 #[arg(short = 'n', long = "max-items")]
157 max_items: Option<usize>,
158
159 #[arg(long = "intersects")]
163 intersects: Option<String>,
164
165 #[arg(long = "ids")]
167 ids: Option<String>,
168
169 #[arg(long = "collections")]
171 collections: Option<String>,
172
173 #[arg(long = "bbox")]
175 bbox: Option<String>,
176
177 #[arg(long = "datetime")]
182 datetime: Option<String>,
183
184 #[arg(long = "fields")]
186 fields: Option<String>,
187
188 #[arg(long = "sortby")]
190 sortby: Option<String>,
191
192 #[arg(long = "filter")]
194 filter: Option<String>,
195
196 #[arg(long = "limit")]
198 limit: Option<String>,
199 },
200
201 Serve {
203 hrefs: Vec<String>,
205
206 #[arg(short = 'a', long = "addr", default_value = "127.0.0.1:7822")]
208 addr: String,
209
210 #[arg(long = "pgstac")]
214 pgstac: Option<String>,
215
216 #[arg(long = "load-collection-items", default_value_t = true)]
218 load_collection_items: bool,
219
220 #[arg(long, default_value_t = true)]
222 create_collections: bool,
223 },
224
225 Validate {
230 infile: Option<String>,
234 },
235}
236
237#[derive(Debug)]
238#[allow(dead_code, clippy::large_enum_variant)]
239enum Value {
240 Stac(stac::Value),
241 Json(serde_json::Value),
242}
243
244#[derive(Debug, Clone)]
245struct KeyValue(String, String);
246
247#[derive(Copy, Clone, Debug, Default)]
248struct ErrorLevel;
249
250impl Stacrs {
251 pub async fn run(self) -> Result<()> {
253 tracing_subscriber::fmt()
254 .with_max_level(self.log_level())
255 .init();
256 match self.command {
257 Command::Translate {
258 ref infile,
259 ref outfile,
260 migrate,
261 ref to,
262 } => {
263 let mut value = self.get(infile.as_deref()).await?;
264 if migrate {
265 value = value.migrate(
266 &to.as_deref()
267 .map(|s| s.parse().unwrap())
268 .unwrap_or_default(),
269 )?;
270 } else if let Some(to) = to {
271 eprintln!("WARNING: --to was passed ({to}) without --migrate, value will not be migrated");
272 }
273 self.put(outfile.as_deref(), value.into()).await
274 }
275 Command::Search {
276 ref href,
277 ref outfile,
278 ref use_duckdb,
279 ref max_items,
280 ref intersects,
281 ref ids,
282 ref collections,
283 ref bbox,
284 ref datetime,
285 ref fields,
286 ref sortby,
287 ref filter,
288 ref limit,
289 } => {
290 let use_duckdb = use_duckdb.unwrap_or_else(|| {
291 matches!(Format::infer_from_href(href), Some(Format::Geoparquet(_)))
292 });
293 let get_items = GetItems {
294 bbox: bbox.clone(),
295 datetime: datetime.clone(),
296 fields: fields.clone(),
297 sortby: sortby.clone(),
298 filter: filter.clone(),
299 limit: limit.clone(),
300 ..Default::default()
301 };
302 let get_search = GetSearch {
303 intersects: intersects.clone(),
304 ids: ids.clone(),
305 collections: collections.clone(),
306 items: get_items,
307 };
308 let search: Search = get_search.try_into()?;
309 let item_collection = if use_duckdb {
310 stac_duckdb::search(href, search, *max_items)?
311 } else {
312 stac_api::client::search(href, search, *max_items).await?
313 };
314 self.put(
315 outfile.as_deref(),
316 serde_json::to_value(item_collection)?.into(),
317 )
318 .await
319 }
320 Command::Serve {
321 ref hrefs,
322 ref addr,
323 ref pgstac,
324 load_collection_items,
325 create_collections,
326 } => {
327 let mut collections = Vec::new();
328 let mut items: HashMap<String, Vec<stac::Item>> = HashMap::new();
329 for href in hrefs {
330 let value = self.get(Some(href.as_str())).await?;
331 match value {
332 stac::Value::Collection(collection) => {
333 if load_collection_items {
334 for link in collection.iter_item_links() {
335 let value = self.get(Some(link.href.as_str())).await?;
336 if let stac::Value::Item(item) = value {
337 items.entry(collection.id.clone()).or_default().push(item);
338 } else {
339 return Err(anyhow!(
340 "item link was not an item: {value:?}"
341 ));
342 }
343 }
344 }
345 collections.push(collection);
346 }
347 stac::Value::ItemCollection(item_collection) => {
348 for item in item_collection.items {
349 if let Some(collection) = item.collection.clone() {
350 items.entry(collection).or_default().push(item);
351 } else {
352 return Err(anyhow!("item without a collection: {item:?}"));
353 }
354 }
355 }
356 stac::Value::Item(item) => {
357 if let Some(collection) = item.collection.clone() {
358 items.entry(collection).or_default().push(item);
359 } else {
360 return Err(anyhow!("item without a collection: {item:?}"));
361 }
362 }
363 _ => return Err(anyhow!("don't know how to load value: {value:?}")),
364 }
365 }
366
367 #[allow(unused_variables)]
368 if let Some(pgstac) = pgstac {
369 #[cfg(feature = "pgstac")]
370 {
371 let backend =
372 stac_server::PgstacBackend::new_from_stringlike(pgstac).await?;
373 load_and_serve(addr, backend, collections, items, create_collections).await
374 }
375 #[cfg(not(feature = "pgstac"))]
376 {
377 Err(anyhow!("stacrs is not compiled with pgstac support"))
378 }
379 } else {
380 let backend = stac_server::MemoryBackend::new();
381 load_and_serve(addr, backend, collections, items, create_collections).await
382 }
383 }
384 Command::Validate { ref infile } => {
385 let value = self.get(infile.as_deref()).await?;
386 let result = Handle::current()
387 .spawn_blocking(move || value.validate())
388 .await?;
389 if let Err(error) = result {
390 if let stac::Error::Validation(errors) = error {
391 if let Some(format) = self.output_format {
392 if let Format::Json(_) = format {
393 let value = errors
394 .into_iter()
395 .map(|error| error.into_json())
396 .collect::<Vec<_>>();
397 if self.compact_json.unwrap_or_default() {
398 serde_json::to_writer(std::io::stdout(), &value)?;
399 } else {
400 serde_json::to_writer_pretty(std::io::stdout(), &value)?;
401 }
402 println!();
403 } else {
404 return Err(anyhow!("invalid output format: {}", format));
405 }
406 } else {
407 for error in errors {
408 println!("{}", error);
409 }
410 }
411 }
412 std::io::stdout().flush()?;
413 Err(anyhow!("one or more validation errors"))
414 } else {
415 Ok(())
416 }
417 }
418 }
419 }
420
421 async fn get(&self, href: Option<&str>) -> Result<stac::Value> {
422 let href = href.and_then(|s| if s == "-" { None } else { Some(s) });
423 let format = self.input_format(href);
424 if let Some(href) = href {
425 let value: stac::Value = format.get_opts(href, self.opts()).await?;
426 Ok(value)
427 } else {
428 let mut buf = Vec::new();
429 let _ = tokio::io::stdin().read_to_end(&mut buf).await?;
430 let value: stac::Value = format.from_bytes(buf)?;
431 Ok(value)
432 }
433 }
434
435 async fn put(&self, href: Option<&str>, value: Value) -> Result<()> {
436 let href = href.and_then(|s| if s == "-" { None } else { Some(s) });
437 let format = self.output_format(href);
438 if let Some(href) = href {
439 let opts = self.opts();
440 let _ = match value {
441 Value::Json(json) => format.put_opts(href, json, opts).await?,
442 Value::Stac(stac) => format.put_opts(href, stac, opts).await?,
443 };
444 Ok(())
445 } else {
446 let mut bytes = match value {
447 Value::Json(json) => format.into_vec(json)?,
448 Value::Stac(stac) => format.into_vec(stac)?,
449 };
450 bytes.push(b'\n');
452 std::io::stdout().write_all(&bytes)?;
453 Ok(())
454 }
455 }
456
457 pub fn log_level(&self) -> Option<Level> {
458 level_enum(self.verbosity())
459 }
460
461 fn verbosity(&self) -> i8 {
462 level_value(ErrorLevel::default()) - (self.quiet as i8) + (self.verbose as i8)
463 }
464
465 pub fn input_format(&self, href: Option<&str>) -> Format {
467 if let Some(input_format) = self.input_format {
468 input_format
469 } else if let Some(href) = href {
470 Format::infer_from_href(href).unwrap_or_default()
471 } else {
472 Format::json()
473 }
474 }
475
476 pub fn output_format(&self, href: Option<&str>) -> Format {
478 let format = if let Some(format) = self.output_format {
479 format
480 } else if let Some(href) = href {
481 Format::infer_from_href(href).unwrap_or_default()
482 } else {
483 Format::Json(true)
484 };
485 if matches!(format, Format::Geoparquet(_)) {
486 Format::Geoparquet(self.parquet_compression.or(Some(Compression::SNAPPY)))
487 } else if let Format::Json(pretty) = format {
488 Format::Json(self.compact_json.map(|c| !c).unwrap_or(pretty))
489 } else {
490 format
491 }
492 }
493
494 fn opts(&self) -> Vec<(String, String)> {
495 self.options
496 .iter()
497 .cloned()
498 .map(|kv| (kv.0, kv.1))
499 .collect()
500 }
501}
502
503impl ErrorLevel {
504 fn default() -> Option<Level> {
505 Some(Level::ERROR)
506 }
507
508 fn verbose_help() -> Option<&'static str> {
509 Some("Increase verbosity")
510 }
511
512 fn verbose_long_help() -> Option<&'static str> {
513 None
514 }
515
516 fn quiet_help() -> Option<&'static str> {
517 Some("Decrease verbosity")
518 }
519
520 fn quiet_long_help() -> Option<&'static str> {
521 None
522 }
523}
524
525impl From<stac::Value> for Value {
526 fn from(value: stac::Value) -> Self {
527 Value::Stac(value)
528 }
529}
530
531impl From<serde_json::Value> for Value {
532 fn from(value: serde_json::Value) -> Self {
533 Value::Json(value)
534 }
535}
536
537impl FromStr for KeyValue {
538 type Err = Error;
539
540 fn from_str(s: &str) -> Result<Self> {
541 if let Some((key, value)) = s.split_once('=') {
542 Ok(KeyValue(key.to_string(), value.to_string()))
543 } else {
544 Err(anyhow!("invalid key=value: {s}"))
545 }
546 }
547}
548
549async fn load_and_serve(
550 addr: &str,
551 mut backend: impl Backend,
552 collections: Vec<Collection>,
553 mut items: HashMap<String, Vec<Item>>,
554 create_collections: bool,
555) -> Result<()> {
556 for collection in collections {
557 let items = items.remove(&collection.id);
558 backend.add_collection(collection).await?;
559 if let Some(items) = items {
560 backend.add_items(items).await?;
561 }
562 }
563 if create_collections {
564 for (collection_id, items) in items {
565 let collection = Collection::from_id_and_items(collection_id, &items);
566 backend.add_collection(collection).await?;
567 backend.add_items(items).await?;
568 }
569 } else if !items.is_empty() {
570 return Err(anyhow!(
571 "items don't have a collection and `create_collections` is false"
572 ));
573 }
574 let root = format!("http://{}", addr);
575 let api = stac_server::Api::new(backend, &root)?;
576 let router = stac_server::routes::from_api(api);
577 let listener = TcpListener::bind(&addr).await?;
578 eprintln!("Serving a STAC API at {}", root);
579 axum::serve(listener, router).await.map_err(Error::from)
580}
581
582fn level_enum(verbosity: i8) -> Option<Level> {
583 match verbosity {
584 i8::MIN..=-1 => None,
585 0 => Some(Level::ERROR),
586 1 => Some(Level::WARN),
587 2 => Some(Level::INFO),
588 3 => Some(Level::DEBUG),
589 4..=i8::MAX => Some(Level::TRACE),
590 }
591}
592
593fn level_value(level: Option<Level>) -> i8 {
594 match level {
595 None => -1,
596 Some(Level::ERROR) => 0,
597 Some(Level::WARN) => 1,
598 Some(Level::INFO) => 2,
599 Some(Level::DEBUG) => 3,
600 Some(Level::TRACE) => 4,
601 }
602}
603
604#[cfg(test)]
605mod tests {
606 use super::Stacrs;
607 use assert_cmd::Command;
608 use clap::Parser;
609 use rstest::{fixture, rstest};
610 use stac::{geoparquet::Compression, Format};
611
612 #[fixture]
613 fn command() -> Command {
614 Command::cargo_bin("stacrs").unwrap()
615 }
616
617 #[rstest]
618 fn translate_json(mut command: Command) {
619 command
620 .arg("translate")
621 .arg("examples/simple-item.json")
622 .assert()
623 .success();
624 }
625
626 #[rstest]
627 fn migrate(mut command: Command) {
628 command
629 .arg("translate")
630 .arg("../../spec-examples/v1.0.0/simple-item.json")
631 .arg("--migrate")
632 .assert()
633 .success();
634 }
635
636 #[test]
637 fn input_format() {
638 let stacrs = Stacrs::parse_from(["stacrs", "translate"]);
639 assert_eq!(stacrs.input_format(None), Format::Json(false));
640
641 let stacrs = Stacrs::parse_from(["stacrs", "translate"]);
642 assert_eq!(stacrs.input_format(Some("file.json")), Format::Json(false));
643
644 let stacrs = Stacrs::parse_from(["stacrs", "translate"]);
645 assert_eq!(stacrs.input_format(Some("file.ndjson")), Format::NdJson);
646
647 let stacrs = Stacrs::parse_from(["stacrs", "translate"]);
648 assert_eq!(
649 stacrs.input_format(Some("file.parquet")),
650 Format::Geoparquet(None)
651 );
652
653 let stacrs = Stacrs::parse_from(["stacrs", "--input-format", "json", "translate"]);
654 assert_eq!(stacrs.input_format(None), Format::Json(false));
655
656 let stacrs = Stacrs::parse_from(["stacrs", "--input-format", "ndjson", "translate"]);
657 assert_eq!(stacrs.input_format(None), Format::NdJson);
658
659 let stacrs = Stacrs::parse_from(["stacrs", "--input-format", "parquet", "translate"]);
660 assert_eq!(stacrs.input_format(None), Format::Geoparquet(None));
661
662 let stacrs = Stacrs::parse_from([
663 "stacrs",
664 "--input-format",
665 "json",
666 "--compact-json",
667 "false",
668 "translate",
669 ]);
670 assert_eq!(stacrs.input_format(None), Format::Json(false));
671 }
672
673 #[test]
674 fn output_format() {
675 let stacrs = Stacrs::parse_from(["stacrs", "translate"]);
676 assert_eq!(stacrs.output_format(None), Format::Json(true));
677
678 let stacrs = Stacrs::parse_from(["stacrs", "translate"]);
679 assert_eq!(stacrs.output_format(Some("file.json")), Format::Json(false));
680
681 let stacrs = Stacrs::parse_from(["stacrs", "translate"]);
682 assert_eq!(stacrs.output_format(Some("file.ndjson")), Format::NdJson);
683
684 let stacrs = Stacrs::parse_from(["stacrs", "translate"]);
685 assert_eq!(
686 stacrs.output_format(Some("file.parquet")),
687 Format::Geoparquet(Some(Compression::SNAPPY))
688 );
689
690 let stacrs = Stacrs::parse_from(["stacrs", "--output-format", "json", "translate"]);
691 assert_eq!(stacrs.output_format(None), Format::Json(false));
692
693 let stacrs = Stacrs::parse_from(["stacrs", "--output-format", "ndjson", "translate"]);
694 assert_eq!(stacrs.output_format(None), Format::NdJson);
695
696 let stacrs = Stacrs::parse_from(["stacrs", "--output-format", "parquet", "translate"]);
697 assert_eq!(
698 stacrs.output_format(None),
699 Format::Geoparquet(Some(Compression::SNAPPY))
700 );
701
702 let stacrs = Stacrs::parse_from([
703 "stacrs",
704 "--output-format",
705 "json",
706 "--compact-json",
707 "false",
708 "translate",
709 ]);
710 assert_eq!(stacrs.output_format(None), Format::Json(true));
711
712 let stacrs = Stacrs::parse_from([
713 "stacrs",
714 "--output-format",
715 "parquet",
716 "--parquet-compression",
717 "lzo",
718 "translate",
719 ]);
720 assert_eq!(
721 stacrs.output_format(None),
722 Format::Geoparquet(Some(Compression::LZO))
723 );
724 }
725
726 #[rstest]
727 fn validate(mut command: Command) {
728 command
729 .arg("validate")
730 .arg("examples/simple-item.json")
731 .assert()
732 .success();
733 command
734 .arg("validate")
735 .arg("data/invalid-item.json")
736 .assert()
737 .failure();
738 }
739}