diff --git a/Cargo.lock b/Cargo.lock index 1e38ed72..1671667e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -179,7 +179,6 @@ dependencies = [ "crates_io_api", "detect-targets", "either", - "futures-util", "home", "itertools", "jobslot", diff --git a/crates/binstalk/Cargo.toml b/crates/binstalk/Cargo.toml index ef7e79c0..3262d695 100644 --- a/crates/binstalk/Cargo.toml +++ b/crates/binstalk/Cargo.toml @@ -19,7 +19,6 @@ compact_str = { version = "0.6.1", features = ["serde"] } crates_io_api = { version = "0.8.1", default-features = false } detect-targets = { version = "0.1.5", path = "../detect-targets" } either = "1.8.1" -futures-util = { version = "0.3.26", default-features = false, features = ["std"] } home = "0.5.4" itertools = "0.10.5" jobslot = { version = "0.2.8", features = ["tokio"] } diff --git a/crates/binstalk/src/fetchers/gh_crate_meta.rs b/crates/binstalk/src/fetchers/gh_crate_meta.rs index 47a1a9c7..5d25bb3c 100644 --- a/crates/binstalk/src/fetchers/gh_crate_meta.rs +++ b/crates/binstalk/src/fetchers/gh_crate_meta.rs @@ -2,7 +2,6 @@ use std::{borrow::Cow, future::Future, iter, path::Path, sync::Arc}; use compact_str::{CompactString, ToCompactString}; use either::Either; -use futures_util::stream::{FuturesUnordered, StreamExt}; use itertools::Itertools; use once_cell::sync::OnceCell; use serde::Serialize; @@ -15,6 +14,7 @@ use crate::{ errors::{BinstallError, InvalidPkgFmtError}, helpers::{ download::Download, + futures_resolver::FuturesResolver, remote::{Client, Method}, tasks::AutoAbortJoinHandle, }, @@ -170,7 +170,7 @@ impl super::Fetcher for GhCrateMeta { Either::Right(PkgFmt::iter()) }; - let mut handles = FuturesUnordered::new(); + let resolver = FuturesResolver::default(); // Iterate over pkg_urls first to avoid String::clone. for pkg_url in pkg_urls { @@ -183,19 +183,17 @@ impl super::Fetcher for GhCrateMeta { // basically cartesian product. // | for pkg_fmt in pkg_fmts.clone() { - handles.extend(this.launch_baseline_find_tasks(pkg_fmt, &tt, &pkg_url, repo)); + resolver.extend(this.launch_baseline_find_tasks(pkg_fmt, &tt, &pkg_url, repo)); } } - while let Some(res) = handles.next().await { - if let Some((url, pkg_fmt)) = res? { - debug!("Winning URL is {url}, with pkg_fmt {pkg_fmt}"); - self.resolution.set((url, pkg_fmt)).unwrap(); // find() is called first - return Ok(true); - } + if let Some((url, pkg_fmt)) = resolver.resolve().await? { + debug!("Winning URL is {url}, with pkg_fmt {pkg_fmt}"); + self.resolution.set((url, pkg_fmt)).unwrap(); // find() is called first + Ok(true) + } else { + Ok(false) } - - Ok(false) }) } diff --git a/crates/binstalk/src/helpers.rs b/crates/binstalk/src/helpers.rs index 83730ec3..68d84553 100644 --- a/crates/binstalk/src/helpers.rs +++ b/crates/binstalk/src/helpers.rs @@ -1,3 +1,4 @@ +pub mod futures_resolver; pub mod jobserver_client; pub mod signal; pub mod tasks; diff --git a/crates/binstalk/src/helpers/futures_resolver.rs b/crates/binstalk/src/helpers/futures_resolver.rs new file mode 100644 index 00000000..38b4eb98 --- /dev/null +++ b/crates/binstalk/src/helpers/futures_resolver.rs @@ -0,0 +1,76 @@ +use std::{future::Future, pin::Pin}; +use tokio::sync::mpsc; + +/// Given multiple futures with output = Result, E>, +/// returns the the first one that returns either `Err(_)` or +/// `Ok(Some(_))`. +pub struct FuturesResolver { + rx: mpsc::Receiver>, + tx: mpsc::Sender>, +} + +impl Default for FuturesResolver { + fn default() -> Self { + // We only need the first one, so the channel is of size 1. + let (tx, rx) = mpsc::channel(1); + Self { tx, rx } + } +} + +impl FuturesResolver { + /// Insert new future into this resolver, they will start running + /// right away. + pub fn push(&self, fut: Fut) + where + Fut: Future, E>> + Send + 'static, + { + let tx = self.tx.clone(); + + tokio::spawn(async move { + tokio::pin!(fut); + + Self::spawn_inner(fut, tx).await; + }); + } + + async fn spawn_inner( + fut: Pin<&mut (dyn Future, E>> + Send)>, + tx: mpsc::Sender>, + ) { + let res = tokio::select! { + biased; + + _ = tx.closed() => return, + res = fut => res, + }; + + if let Some(res) = res.transpose() { + // try_send can only fail due to being full or being closed. + // + // In both cases, this could means some other future has + // completed first. + // + // For closed, it could additionally means that the task + // is cancelled. + tx.try_send(res).ok(); + } + } + + /// Insert multiple futures into this resolver, they will start running + /// right away. + pub fn extend(&self, iter: Iter) + where + Fut: Future, E>> + Send + 'static, + Iter: IntoIterator, + { + iter.into_iter().for_each(|fut| self.push(fut)); + } + + /// Return the resolution. + pub fn resolve(self) -> impl Future, E>> { + let mut rx = self.rx; + drop(self.tx); + + async move { rx.recv().await.transpose() } + } +}