async_compat/
lib.rs

1//! Compatibility adapter between tokio and futures.
2//!
3//! There are two kinds of compatibility issues between [tokio] and [futures]:
4//!
5//! 1. Tokio's types cannot be used outside tokio context, so any attempt to use them will panic.
6//!     - Solution: If you apply the [`Compat`] adapter to a future, the future will manually
7//!       enter the context of a global tokio runtime. If a runtime is already available via tokio
8//!       thread-locals, then it will be used. Otherwise, a new single-threaded runtime will be
9//!       created on demand. That does *not* mean the future is polled by the tokio runtime - it
10//!       only means the future sets a thread-local variable pointing to the global tokio runtime so
11//!       that tokio's types can be used inside it.
12//! 2. Tokio and futures have similar but different I/O traits `AsyncRead`, `AsyncWrite`,
13//!   `AsyncBufRead`, and `AsyncSeek`.
14//!     - Solution: When the [`Compat`] adapter is applied to an I/O type, it will implement traits
15//!       of the opposite kind. That's how you can use tokio-based types wherever futures-based
16//!       types are expected, and the other way around.
17//!
18//! You can apply the [`Compat`] adapter using the [`Compat::new()`] constructor or using any
19//! method from the [`CompatExt`] trait.
20//!
21//! # Examples
22//!
23//! This program reads lines from stdin and echoes them into stdout, except it's not going to work:
24//!
25//! ```compile_fail
26//! fn main() -> std::io::Result<()> {
27//!     futures::executor::block_on(async {
28//!         let stdin = tokio::io::stdin();
29//!         let mut stdout = tokio::io::stdout();
30//!
31//!         // The following line will not work for two reasons:
32//!         // 1. Runtime error because stdin and stdout are used outside tokio context.
33//!         // 2. Compilation error due to mismatched `AsyncRead` and `AsyncWrite` traits.
34//!         futures::io::copy(stdin, &mut stdout).await?;
35//!         Ok(())
36//!     })
37//! }
38//! ```
39//!
40//! To get around the compatibility issues, apply the [`Compat`] adapter to `stdin`, `stdout`, and
41//! [`futures::io::copy()`]:
42//!
43//! ```
44//! use async_compat::CompatExt;
45//!
46//! fn main() -> std::io::Result<()> {
47//!     futures::executor::block_on(async {
48//!         let stdin = tokio::io::stdin();
49//!         let mut stdout = tokio::io::stdout();
50//!
51//!         futures::io::copy(stdin.compat(), &mut stdout.compat_mut()).compat().await?;
52//!         Ok(())
53//!     })
54//! }
55//! ```
56//!
57//! It is also possible to apply [`Compat`] to the outer future passed to
58//! [`futures::executor::block_on()`] rather than [`futures::io::copy()`] itself.
59//! When applied to the outer future, individual inner futures don't need the adapter because
60//! they're all now inside tokio context:
61//!
62//! ```no_run
63//! use async_compat::{Compat, CompatExt};
64//!
65//! fn main() -> std::io::Result<()> {
66//!     futures::executor::block_on(Compat::new(async {
67//!         let stdin = tokio::io::stdin();
68//!         let mut stdout = tokio::io::stdout();
69//!
70//!         futures::io::copy(stdin.compat(), &mut stdout.compat_mut()).await?;
71//!         Ok(())
72//!     }))
73//! }
74//! ```
75//!
76//! The compatibility adapter converts between tokio-based and futures-based I/O types in any
77//! direction. Here's how we can write the same program by using futures-based I/O types inside
78//! tokio:
79//!
80//! ```no_run
81//! use async_compat::CompatExt;
82//! use blocking::Unblock;
83//!
84//! #[tokio::main]
85//! async fn main() -> std::io::Result<()> {
86//!     let mut stdin = Unblock::new(std::io::stdin());
87//!     let mut stdout = Unblock::new(std::io::stdout());
88//!
89//!     tokio::io::copy(&mut stdin.compat_mut(), &mut stdout.compat_mut()).await?;
90//!     Ok(())
91//! }
92//! ```
93//!
94//! Finally, we can use any tokio-based crate from any other async runtime.
95//! Here are [reqwest] and [warp] as an example:
96//!
97//! ```no_run
98//! use async_compat::{Compat, CompatExt};
99//! use warp::Filter;
100//!
101//! fn main() {
102//!     futures::executor::block_on(Compat::new(async {
103//!         // Make an HTTP GET request.
104//!         let response = reqwest::get("https://www.rust-lang.org").await.unwrap();
105//!         println!("{}", response.text().await.unwrap());
106//!
107//!         // Start an HTTP server.
108//!         let routes = warp::any().map(|| "Hello from warp!");
109//!         warp::serve(routes).run(([127, 0, 0, 1], 8080)).await;
110//!     }))
111//! }
112//! ```
113//!
114//! [blocking]: https://docs.rs/blocking
115//! [futures]: https://docs.rs/futures
116//! [reqwest]: https://docs.rs/reqwest
117//! [tokio]: https://docs.rs/tokio
118//! [warp]: https://docs.rs/warp
119//! [`futures::io::copy()`]: https://docs.rs/futures/0.3/futures/io/fn.copy.html
120//! [`futures::executor::block_on()`]: https://docs.rs/futures/0.3/futures/executor/fn.block_on.html
121
122#![allow(clippy::needless_doctest_main)]
123#![doc(
124    html_favicon_url = "https://raw.githubusercontent.com/smol-rs/smol/master/assets/images/logo_fullsize_transparent.png"
125)]
126#![doc(
127    html_logo_url = "https://raw.githubusercontent.com/smol-rs/smol/master/assets/images/logo_fullsize_transparent.png"
128)]
129
130use std::future::Future;
131use std::io;
132use std::pin::Pin;
133use std::task::{Context, Poll};
134use std::thread;
135
136use futures_core::ready;
137use once_cell::sync::Lazy;
138use pin_project_lite::pin_project;
139
140/// Applies the [`Compat`] adapter to futures and I/O types.
141pub trait CompatExt {
142    /// Applies the [`Compat`] adapter by value.
143    ///
144    /// # Examples
145    ///
146    /// ```
147    /// use async_compat::CompatExt;
148    ///
149    /// let stdout = tokio::io::stdout().compat();
150    /// ```
151    fn compat(self) -> Compat<Self>
152    where
153        Self: Sized;
154
155    /// Applies the [`Compat`] adapter by shared reference.
156    ///
157    /// # Examples
158    ///
159    /// ```
160    /// use async_compat::CompatExt;
161    ///
162    /// let original = tokio::io::stdout();
163    /// let stdout = original.compat_ref();
164    /// ```
165    fn compat_ref(&self) -> Compat<&Self>;
166
167    /// Applies the [`Compat`] adapter by mutable reference.
168    ///
169    /// # Examples
170    ///
171    /// ```
172    /// use async_compat::CompatExt;
173    ///
174    /// let mut original = tokio::io::stdout();
175    /// let stdout = original.compat_mut();
176    /// ```
177    fn compat_mut(&mut self) -> Compat<&mut Self>;
178}
179
180impl<T> CompatExt for T {
181    fn compat(self) -> Compat<Self>
182    where
183        Self: Sized,
184    {
185        Compat::new(self)
186    }
187
188    fn compat_ref(&self) -> Compat<&Self> {
189        Compat::new(self)
190    }
191
192    fn compat_mut(&mut self) -> Compat<&mut Self> {
193        Compat::new(self)
194    }
195}
196
197pin_project! {
198    /// Compatibility adapter for futures and I/O types.
199    #[derive(Clone)]
200    pub struct Compat<T> {
201        #[pin]
202        inner: Option<T>,
203        seek_pos: Option<io::SeekFrom>,
204    }
205
206    impl<T> PinnedDrop for Compat<T> {
207        fn drop(this: Pin<&mut Self>) {
208            if this.inner.is_some() {
209                // If the inner future wasn't moved out using into_inner,
210                // enter the tokio context while the inner value is dropped.
211                let _guard = TOKIO1.handle.enter();
212                this.project().inner.set(None);
213            }
214        }
215    }
216}
217
218impl<T> Compat<T> {
219    /// Applies the compatibility adapter to a future or an I/O type.
220    ///
221    /// # Examples
222    ///
223    /// Apply it to a future:
224    ///
225    /// ```
226    /// use async_compat::Compat;
227    /// use std::time::Duration;
228    ///
229    /// futures::executor::block_on(Compat::new(async {
230    ///     // We can use tokio's timers because we're inside tokio context.
231    ///     tokio::time::sleep(Duration::from_secs(1)).await;
232    /// }));
233    /// ```
234    ///
235    /// Apply it to an I/O type:
236    ///
237    /// ```
238    /// use async_compat::{Compat, CompatExt};
239    /// use futures::prelude::*;
240    ///
241    /// # fn main() -> std::io::Result<()> {
242    /// futures::executor::block_on(Compat::new(async {
243    ///     // The `write_all` method comes from `futures::io::AsyncWriteExt`.
244    ///     Compat::new(tokio::io::stdout()).write_all(b"hello\n").await?;
245    ///     Ok(())
246    /// }))
247    /// # }
248    /// ```
249    pub fn new(t: T) -> Compat<T> {
250        Compat {
251            inner: Some(t),
252            seek_pos: None,
253        }
254    }
255
256    /// Gets a shared reference to the inner value.
257    ///
258    /// # Examples
259    ///
260    /// ```
261    /// use async_compat::Compat;
262    /// use tokio::net::UdpSocket;
263    ///
264    /// # fn main() -> std::io::Result<()> {
265    /// futures::executor::block_on(Compat::new(async {
266    ///     let socket = Compat::new(UdpSocket::bind("127.0.0.1:0").await?);
267    ///     let addr = socket.get_ref().local_addr()?;
268    ///     Ok(())
269    /// }))
270    /// # }
271    /// ```
272    pub fn get_ref(&self) -> &T {
273        self.inner
274            .as_ref()
275            .expect("inner is only None when Compat is about to drop")
276    }
277
278    /// Gets a mutable reference to the inner value.
279    ///
280    /// # Examples
281    ///
282    /// ```no_run
283    /// use async_compat::Compat;
284    /// use tokio::net::TcpListener;
285    ///
286    /// # fn main() -> std::io::Result<()> {
287    /// futures::executor::block_on(Compat::new(async {
288    ///     let mut listener = Compat::new(TcpListener::bind("127.0.0.1:0").await?);
289    ///     let (stream, addr) = listener.get_mut().accept().await?;
290    ///     let stream = Compat::new(stream);
291    ///     Ok(())
292    /// }))
293    /// # }
294    /// ```
295    pub fn get_mut(&mut self) -> &mut T {
296        self.inner
297            .as_mut()
298            .expect("inner is only None when Compat is about to drop")
299    }
300
301    fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut T> {
302        self.project()
303            .inner
304            .as_pin_mut()
305            .expect("inner is only None when Compat is about to drop")
306    }
307
308    /// Unwraps the compatibility adapter.
309    ///
310    /// # Examples
311    ///
312    /// ```
313    /// use async_compat::Compat;
314    ///
315    /// let stdout = Compat::new(tokio::io::stdout());
316    /// let original = stdout.into_inner();
317    /// ```
318    pub fn into_inner(mut self) -> T {
319        self.inner
320            .take()
321            .expect("inner is only None when Compat is about to drop")
322    }
323}
324
325impl<T: Future> Future for Compat<T> {
326    type Output = T::Output;
327
328    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
329        let _guard = TOKIO1.handle.enter();
330        self.get_pin_mut().poll(cx)
331    }
332}
333
334impl<T: tokio::io::AsyncRead> futures_io::AsyncRead for Compat<T> {
335    fn poll_read(
336        self: Pin<&mut Self>,
337        cx: &mut Context<'_>,
338        buf: &mut [u8],
339    ) -> Poll<io::Result<usize>> {
340        let mut buf = tokio::io::ReadBuf::new(buf);
341        ready!(self.get_pin_mut().poll_read(cx, &mut buf))?;
342        Poll::Ready(Ok(buf.filled().len()))
343    }
344}
345
346impl<T: futures_io::AsyncRead> tokio::io::AsyncRead for Compat<T> {
347    fn poll_read(
348        self: Pin<&mut Self>,
349        cx: &mut Context<'_>,
350        buf: &mut tokio::io::ReadBuf<'_>,
351    ) -> Poll<io::Result<()>> {
352        let unfilled = buf.initialize_unfilled();
353        let poll = self.get_pin_mut().poll_read(cx, unfilled);
354        if let Poll::Ready(Ok(num)) = &poll {
355            buf.advance(*num);
356        }
357        poll.map_ok(|_| ())
358    }
359}
360
361impl<T: tokio::io::AsyncBufRead> futures_io::AsyncBufRead for Compat<T> {
362    fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<&[u8]>> {
363        self.get_pin_mut().poll_fill_buf(cx)
364    }
365
366    fn consume(self: Pin<&mut Self>, amt: usize) {
367        self.get_pin_mut().consume(amt)
368    }
369}
370
371impl<T: futures_io::AsyncBufRead> tokio::io::AsyncBufRead for Compat<T> {
372    fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<&[u8]>> {
373        self.get_pin_mut().poll_fill_buf(cx)
374    }
375
376    fn consume(self: Pin<&mut Self>, amt: usize) {
377        self.get_pin_mut().consume(amt)
378    }
379}
380
381impl<T: tokio::io::AsyncWrite> futures_io::AsyncWrite for Compat<T> {
382    fn poll_write(
383        self: Pin<&mut Self>,
384        cx: &mut Context<'_>,
385        buf: &[u8],
386    ) -> Poll<io::Result<usize>> {
387        self.get_pin_mut().poll_write(cx, buf)
388    }
389
390    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
391        self.get_pin_mut().poll_flush(cx)
392    }
393
394    fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
395        self.get_pin_mut().poll_shutdown(cx)
396    }
397}
398
399impl<T: futures_io::AsyncWrite> tokio::io::AsyncWrite for Compat<T> {
400    fn poll_write(
401        self: Pin<&mut Self>,
402        cx: &mut Context<'_>,
403        buf: &[u8],
404    ) -> Poll<io::Result<usize>> {
405        self.get_pin_mut().poll_write(cx, buf)
406    }
407
408    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
409        self.get_pin_mut().poll_flush(cx)
410    }
411
412    fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
413        self.get_pin_mut().poll_close(cx)
414    }
415}
416
417impl<T: tokio::io::AsyncSeek> futures_io::AsyncSeek for Compat<T> {
418    fn poll_seek(
419        mut self: Pin<&mut Self>,
420        cx: &mut Context,
421        pos: io::SeekFrom,
422    ) -> Poll<io::Result<u64>> {
423        if self.seek_pos != Some(pos) {
424            self.as_mut().get_pin_mut().start_seek(pos)?;
425            *self.as_mut().project().seek_pos = Some(pos);
426        }
427        let res = ready!(self.as_mut().get_pin_mut().poll_complete(cx));
428        *self.as_mut().project().seek_pos = None;
429        Poll::Ready(res)
430    }
431}
432
433impl<T: futures_io::AsyncSeek> tokio::io::AsyncSeek for Compat<T> {
434    fn start_seek(mut self: Pin<&mut Self>, pos: io::SeekFrom) -> io::Result<()> {
435        *self.as_mut().project().seek_pos = Some(pos);
436        Ok(())
437    }
438
439    fn poll_complete(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<u64>> {
440        let pos = match self.seek_pos {
441            None => {
442                // tokio 1.x AsyncSeek recommends calling poll_complete before start_seek.
443                // We don't have to guarantee that the value returned by
444                // poll_complete called without start_seek is correct,
445                // so we'll return 0.
446                return Poll::Ready(Ok(0));
447            }
448            Some(pos) => pos,
449        };
450        let res = ready!(self.as_mut().get_pin_mut().poll_seek(cx, pos));
451        *self.as_mut().project().seek_pos = None;
452        Poll::Ready(res)
453    }
454}
455
456static TOKIO1: Lazy<GlobalRuntime> = Lazy::new(|| {
457    let mut fallback_rt = None;
458    let handle = tokio::runtime::Handle::try_current().unwrap_or_else(|_| {
459        thread::Builder::new()
460            .name("async-compat/tokio-1".into())
461            .spawn(move || TOKIO1.fallback_rt.as_ref().unwrap().block_on(Pending))
462            .unwrap();
463        let rt = tokio::runtime::Builder::new_current_thread()
464            .enable_all()
465            .build()
466            .expect("cannot start tokio-1 runtime");
467
468        let handle = rt.handle().clone();
469
470        fallback_rt = Some(rt);
471
472        handle
473    });
474
475    GlobalRuntime {
476        handle,
477        fallback_rt,
478    }
479});
480
481struct GlobalRuntime {
482    /// The handle used for all `Compat` futures.
483    handle: tokio::runtime::Handle,
484    /// Only used if we couldn't acquire a handle to a runtime on creation.
485    fallback_rt: Option<tokio::runtime::Runtime>,
486}
487
488struct Pending;
489
490impl Future for Pending {
491    type Output = ();
492
493    fn poll(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Self::Output> {
494        Poll::Pending
495    }
496}
497
498#[cfg(test)]
499mod tests {
500    use crate::{CompatExt, TOKIO1};
501
502    #[test]
503    fn existing_tokio_runtime_is_reused_by_compat() {
504        tokio::runtime::Builder::new_multi_thread()
505            .enable_all()
506            .build()
507            .unwrap()
508            .block_on(async { println!("foo") }.compat());
509
510        assert!(TOKIO1.fallback_rt.is_none());
511    }
512
513    #[test]
514    fn tokio_runtime_is_reused_even_after_it_exits() {
515        tokio::runtime::Builder::new_multi_thread()
516            .enable_all()
517            .build()
518            .unwrap()
519            .block_on(async { println!("foo") });
520
521        futures::executor::block_on(async { println!("foo") }.compat());
522
523        assert!(TOKIO1.fallback_rt.is_none());
524    }
525}