# Async [[Rust]]
## Resources
### On Pinning
- [`std::pin` docs](https://doc.rust-lang.org/std/pin/index.html)
- ([[Cloudflare]] blog) [Pin, Unpin, and why Rust needs them](https://blog.cloudflare.com/pin-and-unpin-in-rust/) published [[2021-08-26]]
## Notes
### How to `select!` on an `Option<impl Future>` struct field
```rust
struct RunnerIO {
shutdown_timer: Option<Pin<Box<Sleep>>>,
}
impl RunnerIO {
async fn next_event(&mut self) -> IOEvent {
use IOEvent::*;
let timer = Pin::new(&mut self.shutdown_timer).as_pin_mut();
tokio::select! {
_ = async { timer.unwrap().await },
if timer.is_some() => HardShutdown,
}
}
/// Start a timer which will trigger a HardShutdown event once complete
fn start_shutdown_timer(&mut self) {
// We don't want to overwrite an existing timer
if self.shutdown_timer.is_some() {
return;
}
let timer = time::sleep(Duration::from_secs(60));
let timer = Box::pin(timer);
self.shutdown_timer = Some(timer);
}
}
```
This is based on a [hack by Darksonn](https://github.com/tokio-rs/tokio/issues/2583#issuecomment-638212772):
- The assignment to `timer` (instead of using `self.shutdown_timer` directly) is needed to prevent moving `self` into the async closure
- Cannot do `Option<Sleep>` with `let timer = Pin::new(&mut self.shutdown_timer).as_pin_mut();` because the resulting `PhantomPinned` does not implement `Unpin`
- Cannot do `let timer = &mut self.shutdown_timer;` because the closure will attempt to move out of `*timer` which is behind a mutable reference
- Cannot do `let timer = self.shutdown_timer.as_pin_mut()` because `Option` does not have `.as_pin_mut()`
- Cannot do `let timer = Pin::new(&mut self.shutdown_timer);` because the closure tries to move out of the dereference
- Only `let timer = Pin::new(&mut self.shutdown_timer).as_pin_mut();` works
### How to run an async fn inside a sync fn that has thread-local access to a [[Tokio]] runtime without blocking other tasks
```rust
use tokio::runtime::Handle;
tokio::task::block_in_place(|| {
Handle::current().block_on(async move {
api::create_channel_monitor(&self.client, channel_monitor).await
})
})
```
### How to run an async fn inside a sync fn that does NOT have access to a Tokio runtime (e.g. an undecorated `fn main()`, or downstream of a `thread::spawn()`)
`Cargo.toml`
```toml
once_cell = "1"
```
Implementing code
```rust
use once_cell::sync::{Lazy, OnceCell};
use tokio::runtime::{Runtime, Builder};
/// A Tokio runtime which can be used to run async closures in sync fns
/// downstream of thread::spawn()
static PERSISTER_RUNTIME: Lazy<OnceCell<Runtime>> = Lazy::new(|| {
Builder::new_current_thread()
.enable_io()
.build()
.unwrap()
.into()
});
impl SomeLibrarySyncTrait for MyStruct {
fn persist_manager(
&self,
channel_manager: &SimpleArcChannelManager<_>,
) -> Result<(), io::Error> {
// Run an async fn inside a sync fn without a Tokio runtime
PERSISTER_RUNTIME
.get()
.unwrap()
.block_on(async move {
api::update_channel_manager(&self.client, channel_manager).await
})
}
}
```
### If you can't call `next()` on a `FuturesUnordered`, you might need to `use futures::StreamExt`
- Also relevant is `futures::TryFutureExt`
### `tokio::try_join!` requires error types to match
### Common mistake (non-compiling code):
```rust
async fn check(
&mut self,
account: &mut ManagedSubaccount,
rest: &Rest,
) -> anyhow::Result<()> {
let (_, positions_vec) = tokio::try_join!(
account.update_info().map_err(anyhow::Error::from),
rest.get_positions().map_err(anyhow::Error::from),
).context("Failed to pull account and position info")?;
}
```
##### Error:
```text
error[E0308]: mismatched types
--> src/director/mod.rs:107:47
|
107 | let (update_account, positions_vec) = tokio::try_join!(
| _______________________________________________^
108 | | account.update_info(),
109 | | rest.get_positions(),
110 | | // account.update_info().await.map_err(|_| anyhow!("Could not")),
111 | | // rest.get_positions().await.map_err(|_| anyhow!("Could not")),
112 | | )?;
| |_________^ expected struct `anyhow::Error`, found enum `ftx::rest::Error`
|
note: return type inferred to be `anyhow::Error` here
--> src/director/mod.rs:107:47
|
107 | let (update_account, positions_vec) = tokio::try_join!(
| _______________________________________________^
108 | | account.update_info(),
109 | | rest.get_positions(),
110 | | // account.update_info().await.map_err(|_| anyhow!("Could not")),
111 | | // rest.get_positions().await.map_err(|_| anyhow!("Could not")),
112 | | )?;
| |_________^
= note: this error originates in the macro `$crate::try_join` (in Nightly builds, run with -Z macro-backtrace for more info)
error: aborting due to previous error; 1 warning emitted
```
#### Solution: Map error and import `futures::TryFutureExt`
`anyhow::Error::from` is really useful for this:
```rust
use futures::TryFutureExt;
async fn check(
&mut self,
account: &mut ManagedSubaccount,
rest: &Rest,
) -> anyhow::Result<()> {
let (update_account, positions_vec) = tokio::try_join!(
account.update_info().map_err(anyhow::Error::from),
rest.get_positions().map_err(anyhow::Error::from),
).context("Failed to pull account and position info")?;
}
```
Note that if you don't include `futures::TryFutureExt`, you will get an error like this:
```text
error[E0599]: no method named `map_err` found for opaque type `impl futures::Future` in the current scope
--> src/director/mod.rs:108:35
|
108 | account.update_info().map_err(anyhow::Error::from),
| ^^^^^^^ method not found in `impl futures::Future`
|
::: /Users/fang/.cargo/registry/src/github.com-1ecc6299db9ec823/futures-util-0.3.15/src/future/try_future/mod.rs:305:8
|
305 | fn map_err<E, F>(self, f: F) -> MapErr<Self, F>
| ------- the method is available for `impl futures::Future` here
|
= help: items from traits can only be used if the trait is in scope
= note: the following trait is implemented but not in scope; perhaps add a `use` for it:
`use futures::TryFutureExt;`
help: consider `await`ing on the `Future` and calling the method on its `Output`
|
108 | account.update_info().await.map_err(anyhow::Error::from),
| ^^^^^^
```