use std::{future::Future, pin::Pin}; use tokio::sync::mpsc; /// Given multiple futures with output = `Result, E>`, /// returns the the first one that returns either `Err(_)` or /// `Ok(Some(_))`. pub struct FuturesResolver { rx: mpsc::Receiver>, tx: mpsc::Sender>, } impl Default for FuturesResolver { fn default() -> Self { // We only need the first one, so the channel is of size 1. let (tx, rx) = mpsc::channel(1); Self { tx, rx } } } impl FuturesResolver { /// Insert new future into this resolver, they will start running /// right away. pub fn push(&self, fut: Fut) where Fut: Future, E>> + Send + 'static, { let tx = self.tx.clone(); tokio::spawn(async move { tokio::pin!(fut); Self::spawn_inner(fut, tx).await; }); } async fn spawn_inner( fut: Pin<&mut (dyn Future, E>> + Send)>, tx: mpsc::Sender>, ) { let res = tokio::select! { biased; _ = tx.closed() => return, res = fut => res, }; if let Some(res) = res.transpose() { // try_send can only fail due to being full or being closed. // // In both cases, this could means some other future has // completed first. // // For closed, it could additionally means that the task // is cancelled. tx.try_send(res).ok(); } } /// Insert multiple futures into this resolver, they will start running /// right away. pub fn extend(&self, iter: Iter) where Fut: Future, E>> + Send + 'static, Iter: IntoIterator, { iter.into_iter().for_each(|fut| self.push(fut)); } /// Return the resolution. pub fn resolve(self) -> impl Future, E>> { let mut rx = self.rx; drop(self.tx); async move { rx.recv().await.transpose() } } }