1pub mod auth;
2pub mod field_mask;
3pub mod serde;
4pub mod url;
5
6pub use auth::{GetToken, NoToken};
7pub use field_mask::FieldMask;
8
9use std::io::{Cursor, Read, Seek, SeekFrom, Write};
10use std::str::FromStr;
11use std::time::Duration;
12
13use hyper::header::{HeaderMap, AUTHORIZATION, CONTENT_LENGTH, CONTENT_TYPE, USER_AGENT};
14use hyper::Method;
15use hyper::StatusCode;
16use itertools::Itertools;
17use mime::Mime;
18use tokio::time::sleep;
19
20const LINE_ENDING: &str = "\r\n";
21
22pub type Body = http_body_util::combinators::BoxBody<hyper::body::Bytes, hyper::Error>;
24
25pub type Response = hyper::Response<Body>;
27
28pub type Client<C> = hyper_util::client::legacy::Client<C, Body>;
30
31pub trait Connector:
33 hyper_util::client::legacy::connect::Connect + Clone + Send + Sync + 'static
34{
35}
36
37impl<T> Connector for T where
38 T: hyper_util::client::legacy::connect::Connect + Clone + Send + Sync + 'static
39{
40}
41
42pub enum Retry {
43 Abort,
45 After(Duration),
47}
48
49#[derive(PartialEq, Eq)]
50pub enum UploadProtocol {
51 Simple,
52 Resumable,
53}
54
55pub trait Hub {}
59
60pub trait MethodsBuilder {}
62
63pub trait CallBuilder {}
65
66pub trait Resource {}
69
70pub trait ResponseResult {}
72
73pub trait RequestValue {}
75
76pub trait UnusedType {}
79
80pub trait Part {}
83
84pub trait NestedType {}
87
88pub trait ReadSeek: Seek + Read + Send {}
90impl<T: Seek + Read + Send> ReadSeek for T {}
91
92pub trait ToParts {
94 fn to_parts(&self) -> String;
95}
96
97pub trait Delegate: Send {
103 fn begin(&mut self, _info: MethodInfo) {}
110
111 fn http_error(&mut self, _err: &hyper_util::client::legacy::Error) -> Retry {
119 Retry::Abort
120 }
121
122 fn api_key(&mut self) -> Option<String> {
126 None
127 }
128
129 fn token(
135 &mut self,
136 e: Box<dyn std::error::Error + Send + Sync>,
137 ) -> std::result::Result<Option<String>, Box<dyn std::error::Error + Send + Sync>> {
138 Err(e)
139 }
140
141 fn upload_url(&mut self) -> Option<String> {
149 None
150 }
151
152 fn store_upload_url(&mut self, url: Option<&str>) {
159 let _ = url;
160 }
161
162 fn response_json_decode_error(
171 &mut self,
172 json_encoded_value: &str,
173 json_decode_error: &serde_json::Error,
174 ) {
175 let _ = json_encoded_value;
176 let _ = json_decode_error;
177 }
178
179 fn http_failure(&mut self, _: &Response, _err: Option<&serde_json::Value>) -> Retry {
188 Retry::Abort
189 }
190
191 fn pre_request(&mut self) {}
195
196 fn chunk_size(&mut self) -> u64 {
200 1 << 23
201 }
202
203 fn cancel_chunk_upload(&mut self, chunk: &ContentRange) -> bool {
208 let _ = chunk;
209 false
210 }
211
212 fn finished(&mut self, is_success: bool) {
221 let _ = is_success;
222 }
223}
224
225#[derive(Default)]
228pub struct DefaultDelegate;
229
230impl Delegate for DefaultDelegate {}
231
232#[derive(Debug)]
233pub enum Error {
234 HttpError(hyper_util::client::legacy::Error),
236
237 UploadSizeLimitExceeded(u64, u64),
240
241 BadRequest(serde_json::Value),
244
245 MissingAPIKey,
248
249 MissingToken(Box<dyn std::error::Error + Send + Sync>),
251
252 Cancelled,
254
255 FieldClash(&'static str),
257
258 JsonDecodeError(String, serde_json::Error),
261
262 Failure(Response),
264
265 Io(std::io::Error),
267}
268
269impl std::fmt::Display for Error {
270 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
271 match self {
272 Error::Io(err) => err.fmt(f),
273 Error::HttpError(err) => err.fmt(f),
274 Error::UploadSizeLimitExceeded(resource_size, max_size) => writeln!(
275 f,
276 "The media size {} exceeds the maximum allowed upload size of {}",
277 resource_size, max_size
278 ),
279 Error::MissingAPIKey => {
280 writeln!(
281 f,
282 "The application's API key was not found in the configuration"
283 )?;
284 writeln!(
285 f,
286 "It is used as there are no Scopes defined for this method."
287 )
288 }
289 Error::BadRequest(message) => writeln!(f, "Bad Request: {}", message),
290 Error::MissingToken(e) => writeln!(f, "Token retrieval failed: {}", e),
291 Error::Cancelled => writeln!(f, "Operation cancelled by delegate"),
292 Error::FieldClash(field) => writeln!(
293 f,
294 "The custom parameter '{}' is already provided natively by the CallBuilder.",
295 field
296 ),
297 Error::JsonDecodeError(json_str, err) => writeln!(f, "{}: {}", err, json_str),
298 Error::Failure(response) => {
299 writeln!(f, "Http status indicates failure: {:?}", response)
300 }
301 }
302 }
303}
304
305impl std::error::Error for Error {
306 fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
307 match *self {
308 Error::HttpError(ref err) => err.source(),
309 Error::JsonDecodeError(_, ref err) => err.source(),
310 _ => None,
311 }
312 }
313}
314
315impl From<std::io::Error> for Error {
316 fn from(err: std::io::Error) -> Self {
317 Error::Io(err)
318 }
319}
320
321pub type Result<T> = std::result::Result<T, Error>;
323
324pub struct MethodInfo {
326 pub id: &'static str,
327 pub http_method: Method,
328}
329
330const BOUNDARY: &str = "MDuXWGyeE33QFXGchb2VFWc4Z7945d";
331
332#[derive(Default)]
337pub struct MultiPartReader<'a> {
338 raw_parts: Vec<(HeaderMap, &'a mut (dyn Read + Send))>,
339 current_part: Option<(Cursor<Vec<u8>>, &'a mut (dyn Read + Send))>,
340 last_part_boundary: Option<Cursor<Vec<u8>>>,
341}
342
343impl<'a> MultiPartReader<'a> {
344 pub fn mime_type() -> Mime {
348 Mime::from_str(&format!("multipart/related;boundary={}", BOUNDARY)).expect("valid mimetype")
349 }
350
351 pub fn reserve_exact(&mut self, cap: usize) {
353 self.raw_parts.reserve_exact(cap);
354 }
355
356 pub fn add_part(
368 &mut self,
369 reader: &'a mut (dyn Read + Send),
370 size: u64,
371 mime_type: Mime,
372 ) -> &mut MultiPartReader<'a> {
373 let mut headers = HeaderMap::new();
374 headers.insert(
375 CONTENT_TYPE,
376 hyper::header::HeaderValue::from_str(mime_type.as_ref()).unwrap(),
377 );
378 headers.insert(CONTENT_LENGTH, size.into());
379 self.raw_parts.push((headers, reader));
380 self
381 }
382
383 fn is_depleted(&self) -> bool {
385 self.raw_parts.is_empty()
386 && self.current_part.is_none()
387 && self.last_part_boundary.is_none()
388 }
389
390 fn is_last_part(&self) -> bool {
392 self.raw_parts.is_empty() && self.current_part.is_some()
393 }
394}
395
396impl<'a> Read for MultiPartReader<'a> {
397 fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
398 match (
399 self.raw_parts.len(),
400 self.current_part.is_none(),
401 self.last_part_boundary.is_none(),
402 ) {
403 (_, _, false) => {
404 let br = self
405 .last_part_boundary
406 .as_mut()
407 .unwrap()
408 .read(buf)
409 .unwrap_or(0);
410 if br < buf.len() {
411 self.last_part_boundary = None;
412 }
413 return Ok(br);
414 }
415 (0, true, true) => return Ok(0),
416 (n, true, _) if n > 0 => {
417 let (headers, reader) = self.raw_parts.remove(0);
418 let mut c = Cursor::new(Vec::<u8>::new());
419 (write!(
422 &mut c,
423 "{}--{}{}{}{}{}",
424 LINE_ENDING,
425 BOUNDARY,
426 LINE_ENDING,
427 headers
428 .iter()
429 .map(|(k, v)| format!("{}: {}", k, v.to_str().unwrap()))
430 .join(LINE_ENDING),
431 LINE_ENDING,
432 LINE_ENDING,
433 ))?;
434 c.rewind()?;
435 self.current_part = Some((c, reader));
436 }
437 _ => {}
438 }
439
440 let (hb, rr) = {
442 let &mut (ref mut c, ref mut reader) = self.current_part.as_mut().unwrap();
443 let b = c.read(buf).unwrap_or(0);
444 (b, reader.read(&mut buf[b..]))
445 };
446
447 match rr {
448 Ok(bytes_read) => {
449 if hb < buf.len() && bytes_read == 0 {
450 if self.is_last_part() {
451 self.last_part_boundary = Some(Cursor::new(
454 format!("{}--{}--{}", LINE_ENDING, BOUNDARY, LINE_ENDING).into_bytes(),
455 ))
456 }
457 self.current_part = None;
459 }
460 let mut total_bytes_read = hb + bytes_read;
461 while total_bytes_read < buf.len() && !self.is_depleted() {
462 match self.read(&mut buf[total_bytes_read..]) {
463 Ok(br) => total_bytes_read += br,
464 Err(err) => return Err(err),
465 }
466 }
467 Ok(total_bytes_read)
468 }
469 Err(err) => {
470 self.current_part = None;
472 self.last_part_boundary = None;
473 self.raw_parts.clear();
474 Err(err)
475 }
476 }
477 }
478}
479
480#[derive(PartialEq, Eq, Debug, Clone)]
485pub struct XUploadContentType(pub Mime);
486
487impl std::ops::Deref for XUploadContentType {
488 type Target = Mime;
489 fn deref(&self) -> &Mime {
490 &self.0
491 }
492}
493impl std::ops::DerefMut for XUploadContentType {
494 fn deref_mut(&mut self) -> &mut Mime {
495 &mut self.0
496 }
497}
498impl std::fmt::Display for XUploadContentType {
499 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
500 std::fmt::Display::fmt(&**self, f)
501 }
502}
503
504#[derive(Clone, PartialEq, Eq, Debug)]
505pub struct Chunk {
506 pub first: u64,
507 pub last: u64,
508}
509
510impl std::fmt::Display for Chunk {
511 fn fmt(&self, fmt: &mut std::fmt::Formatter) -> std::fmt::Result {
512 (write!(fmt, "{}-{}", self.first, self.last)).ok();
513 Ok(())
514 }
515}
516
517impl FromStr for Chunk {
518 type Err = &'static str;
519
520 fn from_str(s: &str) -> std::result::Result<Chunk, &'static str> {
522 let parts: Vec<&str> = s.split('-').collect();
523 if parts.len() != 2 {
524 return Err("Expected two parts: %i-%i");
525 }
526 Ok(Chunk {
527 first: match FromStr::from_str(parts[0]) {
528 Ok(d) => d,
529 _ => return Err("Couldn't parse 'first' as digit"),
530 },
531 last: match FromStr::from_str(parts[1]) {
532 Ok(d) => d,
533 _ => return Err("Couldn't parse 'last' as digit"),
534 },
535 })
536 }
537}
538
539#[derive(Clone, PartialEq, Eq, Debug)]
541pub struct ContentRange {
542 pub range: Option<Chunk>,
543 pub total_length: u64,
544}
545
546impl ContentRange {
547 pub fn header_value(&self) -> String {
548 format!(
549 "bytes {}/{}",
550 match self.range {
551 Some(ref c) => format!("{}", c),
552 None => "*".to_string(),
553 },
554 self.total_length
555 )
556 }
557}
558
559#[derive(Clone, PartialEq, Eq, Debug)]
560pub struct RangeResponseHeader(pub Chunk);
561
562impl RangeResponseHeader {
563 fn from_bytes(raw: &[u8]) -> Self {
564 if !raw.is_empty() {
565 if let Ok(s) = std::str::from_utf8(raw) {
566 const PREFIX: &str = "bytes ";
567 if let Some(stripped) = s.strip_prefix(PREFIX) {
568 if let Ok(c) = <Chunk as FromStr>::from_str(stripped) {
569 return RangeResponseHeader(c);
570 }
571 }
572 }
573 }
574
575 panic!("Unable to parse Range header {:?}", raw)
576 }
577}
578
579pub struct ResumableUploadHelper<'a, A: 'a, C>
581where
582 C: Connector,
583{
584 pub client: &'a Client<C>,
585 pub delegate: &'a mut dyn Delegate,
586 pub start_at: Option<u64>,
587 pub auth: &'a A,
588 pub user_agent: &'a str,
589 pub auth_header: String,
590 pub url: &'a str,
591 pub reader: &'a mut dyn ReadSeek,
592 pub media_type: Mime,
593 pub content_length: u64,
594}
595
596impl<'a, A, C> ResumableUploadHelper<'a, A, C>
597where
598 C: Connector,
599{
600 async fn query_transfer_status(
601 &mut self,
602 ) -> std::result::Result<u64, std::result::Result<Response, hyper_util::client::legacy::Error>>
603 {
604 loop {
605 match self
606 .client
607 .request(
608 hyper::Request::builder()
609 .method(hyper::Method::POST)
610 .uri(self.url)
611 .header(USER_AGENT, self.user_agent.to_string())
612 .header(
613 "Content-Range",
614 ContentRange {
615 range: None,
616 total_length: self.content_length,
617 }
618 .header_value(),
619 )
620 .header(AUTHORIZATION, self.auth_header.clone())
621 .body(to_body::<String>(None))
622 .unwrap(),
623 )
624 .await
625 {
626 Ok(r) => {
627 let headers = r.headers().clone();
629 let h: RangeResponseHeader = match headers.get("Range") {
630 Some(hh) if r.status() == StatusCode::PERMANENT_REDIRECT => {
631 RangeResponseHeader::from_bytes(hh.as_bytes())
632 }
633 None | Some(_) => {
634 let (parts, body) = r.into_parts();
635 let body = to_body(to_bytes(body).await);
636 let response = Response::from_parts(parts, body);
637 if let Retry::After(d) = self.delegate.http_failure(&response, None) {
638 sleep(d).await;
639 continue;
640 }
641 return Err(Ok(response));
642 }
643 };
644 return Ok(h.0.last);
645 }
646 Err(err) => {
647 if let Retry::After(d) = self.delegate.http_error(&err) {
648 sleep(d).await;
649 continue;
650 }
651 return Err(Err(err));
652 }
653 }
654 }
655 }
656
657 pub async fn upload(
661 &mut self,
662 ) -> Option<std::result::Result<Response, hyper_util::client::legacy::Error>> {
663 let mut start = match self.start_at {
664 Some(s) => s,
665 None => match self.query_transfer_status().await {
666 Ok(s) => s,
667 Err(result) => return Some(result),
668 },
669 };
670
671 const MIN_CHUNK_SIZE: u64 = 1 << 18;
672 let chunk_size = match self.delegate.chunk_size() {
673 cs if cs > MIN_CHUNK_SIZE => cs,
674 _ => MIN_CHUNK_SIZE,
675 };
676
677 loop {
678 self.reader.seek(SeekFrom::Start(start)).unwrap();
679
680 let request_size = match self.content_length - start {
681 rs if rs > chunk_size => chunk_size,
682 rs => rs,
683 };
684
685 let mut section_reader = self.reader.take(request_size);
686 let mut bytes = vec![];
687 section_reader.read_to_end(&mut bytes).unwrap();
688 let range_header = ContentRange {
689 range: Some(Chunk {
690 first: start,
691 last: start + request_size - 1,
692 }),
693 total_length: self.content_length,
694 };
695 if self.delegate.cancel_chunk_upload(&range_header) {
696 return None;
697 }
698 match self
699 .client
700 .request(
701 hyper::Request::builder()
702 .uri(self.url)
703 .method(hyper::Method::POST)
704 .header("Content-Range", range_header.header_value())
705 .header(CONTENT_TYPE, format!("{}", self.media_type))
706 .header(USER_AGENT, self.user_agent.to_string())
707 .body(to_body(bytes.into()))
708 .unwrap(),
709 )
710 .await
711 {
712 Ok(response) => {
713 start += request_size;
714
715 if response.status() == StatusCode::PERMANENT_REDIRECT {
716 continue;
717 }
718
719 let (parts, body) = response.into_parts();
720 let success = parts.status.is_success();
721 let bytes = to_bytes(body).await.unwrap_or_default();
722 let error = if !success {
723 serde_json::from_str(&to_string(&bytes)).ok()
724 } else {
725 None
726 };
727 let response = to_response(parts, bytes.into());
728
729 if !success {
730 if let Retry::After(d) =
731 self.delegate.http_failure(&response, error.as_ref())
732 {
733 sleep(d).await;
734 continue;
735 }
736 }
737 return Some(Ok(response));
738 }
739 Err(err) => {
740 if let Retry::After(d) = self.delegate.http_error(&err) {
741 sleep(d).await;
742 continue;
743 }
744 return Some(Err(err));
745 }
746 }
747 }
748 }
749}
750
751pub fn remove_json_null_values(value: &mut serde_json::value::Value) {
753 match value {
754 serde_json::value::Value::Object(map) => {
755 map.retain(|_, value| !value.is_null());
756 map.values_mut().for_each(remove_json_null_values);
757 }
758 serde_json::value::Value::Array(arr) => {
759 arr.retain(|value| !value.is_null());
760 arr.iter_mut().for_each(remove_json_null_values);
761 }
762 _ => {}
763 }
764}
765
766#[doc(hidden)]
767pub fn to_body<T>(bytes: Option<T>) -> Body
768where
769 T: Into<hyper::body::Bytes>,
770{
771 use http_body_util::BodyExt;
772
773 fn falliable(_: std::convert::Infallible) -> hyper::Error {
774 unreachable!()
775 }
776
777 let bytes = bytes.map(Into::into).unwrap_or_default();
778 Body::new(http_body_util::Full::from(bytes).map_err(falliable))
779}
780
781#[doc(hidden)]
782pub async fn to_bytes<T>(body: T) -> Option<hyper::body::Bytes>
783where
784 T: hyper::body::Body,
785{
786 use http_body_util::BodyExt;
787 body.collect().await.ok().map(|value| value.to_bytes())
788}
789
790#[doc(hidden)]
791pub fn to_string(bytes: &hyper::body::Bytes) -> std::borrow::Cow<'_, str> {
792 String::from_utf8_lossy(bytes)
793}
794
795#[doc(hidden)]
796pub fn to_response<T>(parts: http::response::Parts, body: Option<T>) -> Response
797where
798 T: Into<hyper::body::Bytes>,
799{
800 Response::from_parts(parts, to_body(body))
801}
802
803#[cfg(test)]
804mod tests {
805 use std::default::Default;
806 use std::str::FromStr;
807
808 use ::serde::{Deserialize, Serialize};
809
810 use super::*;
811
812 #[test]
813 fn serde() {
814 #[derive(Default, Serialize, Deserialize)]
815 struct Foo {
816 opt: Option<String>,
817 req: u32,
818 opt_vec: Option<Vec<String>>,
819 vec: Vec<String>,
820 }
821
822 let f: Foo = Default::default();
823 serde_json::to_string(&f).unwrap(); let j = "{\"opt\":null,\"req\":0,\"vec\":[]}";
826 let _f: Foo = serde_json::from_str(j).unwrap();
827
828 #[derive(Default, Serialize, Deserialize)]
833 struct Bar {
834 #[serde(rename = "snooSnoo")]
835 snoo_snoo: String,
836 }
837 serde_json::to_string(&<Bar as Default>::default()).unwrap();
838
839 let j = "{\"snooSnoo\":\"foo\"}";
840 let b: Bar = serde_json::from_str(j).unwrap();
841 assert_eq!(b.snoo_snoo, "foo");
842
843 }
852
853 #[test]
854 fn byte_range_from_str() {
855 assert_eq!(
856 <Chunk as FromStr>::from_str("2-42"),
857 Ok(Chunk { first: 2, last: 42 })
858 )
859 }
860
861 #[test]
862 fn dyn_delegate_is_send() {
863 fn with_send(_x: impl Send) {}
864
865 let mut dd = DefaultDelegate::default();
866 let dlg: &mut dyn Delegate = &mut dd;
867 with_send(dlg);
868 }
869
870 #[test]
871 fn test_mime() {
872 let mime = MultiPartReader::mime_type();
873
874 assert_eq!(mime::MULTIPART, mime.type_());
875 assert_eq!("related", mime.subtype());
876 assert_eq!(
877 Some(BOUNDARY),
878 mime.get_param("boundary").map(|x| x.as_str())
879 );
880 }
881}