# Async [[Rust]] ## Resources ### On Pinning - [`std::pin` docs](https://doc.rust-lang.org/std/pin/index.html) - ([[Cloudflare]] blog) [Pin, Unpin, and why Rust needs them](https://blog.cloudflare.com/pin-and-unpin-in-rust/) published [[2021-08-26]] ## Notes ### How to `select!` on an `Option<impl Future>` struct field ```rust struct RunnerIO { shutdown_timer: Option<Pin<Box<Sleep>>>, } impl RunnerIO { async fn next_event(&mut self) -> IOEvent { use IOEvent::*; let timer = Pin::new(&mut self.shutdown_timer).as_pin_mut(); tokio::select! { _ = async { timer.unwrap().await }, if timer.is_some() => HardShutdown, } } /// Start a timer which will trigger a HardShutdown event once complete fn start_shutdown_timer(&mut self) { // We don't want to overwrite an existing timer if self.shutdown_timer.is_some() { return; } let timer = time::sleep(Duration::from_secs(60)); let timer = Box::pin(timer); self.shutdown_timer = Some(timer); } } ``` This is based on a [hack by Darksonn](https://github.com/tokio-rs/tokio/issues/2583#issuecomment-638212772): - The assignment to `timer` (instead of using `self.shutdown_timer` directly) is needed to prevent moving `self` into the async closure - Cannot do `Option<Sleep>` with `let timer = Pin::new(&mut self.shutdown_timer).as_pin_mut();` because the resulting `PhantomPinned` does not implement `Unpin` - Cannot do `let timer = &mut self.shutdown_timer;` because the closure will attempt to move out of `*timer` which is behind a mutable reference - Cannot do `let timer = self.shutdown_timer.as_pin_mut()` because `Option` does not have `.as_pin_mut()` - Cannot do `let timer = Pin::new(&mut self.shutdown_timer);` because the closure tries to move out of the dereference - Only `let timer = Pin::new(&mut self.shutdown_timer).as_pin_mut();` works ### How to run an async fn inside a sync fn that has thread-local access to a [[Tokio]] runtime without blocking other tasks ```rust use tokio::runtime::Handle; tokio::task::block_in_place(|| { Handle::current().block_on(async move { api::create_channel_monitor(&self.client, channel_monitor).await }) }) ``` ### How to run an async fn inside a sync fn that does NOT have access to a Tokio runtime (e.g. an undecorated `fn main()`, or downstream of a `thread::spawn()`) `Cargo.toml` ```toml once_cell = "1" ``` Implementing code ```rust use once_cell::sync::{Lazy, OnceCell}; use tokio::runtime::{Runtime, Builder}; /// A Tokio runtime which can be used to run async closures in sync fns /// downstream of thread::spawn() static PERSISTER_RUNTIME: Lazy<OnceCell<Runtime>> = Lazy::new(|| { Builder::new_current_thread() .enable_io() .build() .unwrap() .into() }); impl SomeLibrarySyncTrait for MyStruct { fn persist_manager( &self, channel_manager: &SimpleArcChannelManager<_>, ) -> Result<(), io::Error> { // Run an async fn inside a sync fn without a Tokio runtime PERSISTER_RUNTIME .get() .unwrap() .block_on(async move { api::update_channel_manager(&self.client, channel_manager).await }) } } ``` ### If you can't call `next()` on a `FuturesUnordered`, you might need to `use futures::StreamExt` - Also relevant is `futures::TryFutureExt` ### `tokio::try_join!` requires error types to match ### Common mistake (non-compiling code): ```rust async fn check( &mut self, account: &mut ManagedSubaccount, rest: &Rest, ) -> anyhow::Result<()> { let (_, positions_vec) = tokio::try_join!( account.update_info().map_err(anyhow::Error::from), rest.get_positions().map_err(anyhow::Error::from), ).context("Failed to pull account and position info")?; } ``` ##### Error: ```text error[E0308]: mismatched types --> src/director/mod.rs:107:47 | 107 | let (update_account, positions_vec) = tokio::try_join!( | _______________________________________________^ 108 | | account.update_info(), 109 | | rest.get_positions(), 110 | | // account.update_info().await.map_err(|_| anyhow!("Could not")), 111 | | // rest.get_positions().await.map_err(|_| anyhow!("Could not")), 112 | | )?; | |_________^ expected struct `anyhow::Error`, found enum `ftx::rest::Error` | note: return type inferred to be `anyhow::Error` here --> src/director/mod.rs:107:47 | 107 | let (update_account, positions_vec) = tokio::try_join!( | _______________________________________________^ 108 | | account.update_info(), 109 | | rest.get_positions(), 110 | | // account.update_info().await.map_err(|_| anyhow!("Could not")), 111 | | // rest.get_positions().await.map_err(|_| anyhow!("Could not")), 112 | | )?; | |_________^ = note: this error originates in the macro `$crate::try_join` (in Nightly builds, run with -Z macro-backtrace for more info) error: aborting due to previous error; 1 warning emitted ``` #### Solution: Map error and import `futures::TryFutureExt` `anyhow::Error::from` is really useful for this: ```rust use futures::TryFutureExt; async fn check( &mut self, account: &mut ManagedSubaccount, rest: &Rest, ) -> anyhow::Result<()> { let (update_account, positions_vec) = tokio::try_join!( account.update_info().map_err(anyhow::Error::from), rest.get_positions().map_err(anyhow::Error::from), ).context("Failed to pull account and position info")?; } ``` Note that if you don't include `futures::TryFutureExt`, you will get an error like this: ```text error[E0599]: no method named `map_err` found for opaque type `impl futures::Future` in the current scope --> src/director/mod.rs:108:35 | 108 | account.update_info().map_err(anyhow::Error::from), | ^^^^^^^ method not found in `impl futures::Future` | ::: /Users/fang/.cargo/registry/src/github.com-1ecc6299db9ec823/futures-util-0.3.15/src/future/try_future/mod.rs:305:8 | 305 | fn map_err<E, F>(self, f: F) -> MapErr<Self, F> | ------- the method is available for `impl futures::Future` here | = help: items from traits can only be used if the trait is in scope = note: the following trait is implemented but not in scope; perhaps add a `use` for it: `use futures::TryFutureExt;` help: consider `await`ing on the `Future` and calling the method on its `Output` | 108 | account.update_info().await.map_err(anyhow::Error::from), | ^^^^^^ ```