mirror of
https://github.com/cargo-bins/cargo-binstall.git
synced 2025-04-20 20:48:43 +00:00
Replace dep futures-util with helpers::FuturesResolver
(#765)
futures-util has too many dependencies and it contains a lot of code of which we only use `futures_util::stream::{FuturesUnordered, StreamExt}`. We don't even need most of the functionalities in `FuturesUnordered` as we just need the output of first future that either returns `Err(_)` or `Ok(Some(_))`. So we replace it with ou own homebrew solution (~80 loc) and it's easier to use. Signed-off-by: Jiahao XU <Jiahao_XU@outlook.com>
This commit is contained in:
parent
833684b095
commit
a13c01b769
5 changed files with 86 additions and 13 deletions
1
Cargo.lock
generated
1
Cargo.lock
generated
|
@ -179,7 +179,6 @@ dependencies = [
|
||||||
"crates_io_api",
|
"crates_io_api",
|
||||||
"detect-targets",
|
"detect-targets",
|
||||||
"either",
|
"either",
|
||||||
"futures-util",
|
|
||||||
"home",
|
"home",
|
||||||
"itertools",
|
"itertools",
|
||||||
"jobslot",
|
"jobslot",
|
||||||
|
|
|
@ -19,7 +19,6 @@ compact_str = { version = "0.6.1", features = ["serde"] }
|
||||||
crates_io_api = { version = "0.8.1", default-features = false }
|
crates_io_api = { version = "0.8.1", default-features = false }
|
||||||
detect-targets = { version = "0.1.5", path = "../detect-targets" }
|
detect-targets = { version = "0.1.5", path = "../detect-targets" }
|
||||||
either = "1.8.1"
|
either = "1.8.1"
|
||||||
futures-util = { version = "0.3.26", default-features = false, features = ["std"] }
|
|
||||||
home = "0.5.4"
|
home = "0.5.4"
|
||||||
itertools = "0.10.5"
|
itertools = "0.10.5"
|
||||||
jobslot = { version = "0.2.8", features = ["tokio"] }
|
jobslot = { version = "0.2.8", features = ["tokio"] }
|
||||||
|
|
|
@ -2,7 +2,6 @@ use std::{borrow::Cow, future::Future, iter, path::Path, sync::Arc};
|
||||||
|
|
||||||
use compact_str::{CompactString, ToCompactString};
|
use compact_str::{CompactString, ToCompactString};
|
||||||
use either::Either;
|
use either::Either;
|
||||||
use futures_util::stream::{FuturesUnordered, StreamExt};
|
|
||||||
use itertools::Itertools;
|
use itertools::Itertools;
|
||||||
use once_cell::sync::OnceCell;
|
use once_cell::sync::OnceCell;
|
||||||
use serde::Serialize;
|
use serde::Serialize;
|
||||||
|
@ -15,6 +14,7 @@ use crate::{
|
||||||
errors::{BinstallError, InvalidPkgFmtError},
|
errors::{BinstallError, InvalidPkgFmtError},
|
||||||
helpers::{
|
helpers::{
|
||||||
download::Download,
|
download::Download,
|
||||||
|
futures_resolver::FuturesResolver,
|
||||||
remote::{Client, Method},
|
remote::{Client, Method},
|
||||||
tasks::AutoAbortJoinHandle,
|
tasks::AutoAbortJoinHandle,
|
||||||
},
|
},
|
||||||
|
@ -170,7 +170,7 @@ impl super::Fetcher for GhCrateMeta {
|
||||||
Either::Right(PkgFmt::iter())
|
Either::Right(PkgFmt::iter())
|
||||||
};
|
};
|
||||||
|
|
||||||
let mut handles = FuturesUnordered::new();
|
let resolver = FuturesResolver::default();
|
||||||
|
|
||||||
// Iterate over pkg_urls first to avoid String::clone.
|
// Iterate over pkg_urls first to avoid String::clone.
|
||||||
for pkg_url in pkg_urls {
|
for pkg_url in pkg_urls {
|
||||||
|
@ -183,19 +183,17 @@ impl super::Fetcher for GhCrateMeta {
|
||||||
// basically cartesian product.
|
// basically cartesian product.
|
||||||
// |
|
// |
|
||||||
for pkg_fmt in pkg_fmts.clone() {
|
for pkg_fmt in pkg_fmts.clone() {
|
||||||
handles.extend(this.launch_baseline_find_tasks(pkg_fmt, &tt, &pkg_url, repo));
|
resolver.extend(this.launch_baseline_find_tasks(pkg_fmt, &tt, &pkg_url, repo));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
while let Some(res) = handles.next().await {
|
if let Some((url, pkg_fmt)) = resolver.resolve().await? {
|
||||||
if let Some((url, pkg_fmt)) = res? {
|
|
||||||
debug!("Winning URL is {url}, with pkg_fmt {pkg_fmt}");
|
debug!("Winning URL is {url}, with pkg_fmt {pkg_fmt}");
|
||||||
self.resolution.set((url, pkg_fmt)).unwrap(); // find() is called first
|
self.resolution.set((url, pkg_fmt)).unwrap(); // find() is called first
|
||||||
return Ok(true);
|
Ok(true)
|
||||||
}
|
} else {
|
||||||
}
|
|
||||||
|
|
||||||
Ok(false)
|
Ok(false)
|
||||||
|
}
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1,3 +1,4 @@
|
||||||
|
pub mod futures_resolver;
|
||||||
pub mod jobserver_client;
|
pub mod jobserver_client;
|
||||||
pub mod signal;
|
pub mod signal;
|
||||||
pub mod tasks;
|
pub mod tasks;
|
||||||
|
|
76
crates/binstalk/src/helpers/futures_resolver.rs
Normal file
76
crates/binstalk/src/helpers/futures_resolver.rs
Normal file
|
@ -0,0 +1,76 @@
|
||||||
|
use std::{future::Future, pin::Pin};
|
||||||
|
use tokio::sync::mpsc;
|
||||||
|
|
||||||
|
/// Given multiple futures with output = Result<Option<T>, E>,
|
||||||
|
/// returns the the first one that returns either `Err(_)` or
|
||||||
|
/// `Ok(Some(_))`.
|
||||||
|
pub struct FuturesResolver<T, E> {
|
||||||
|
rx: mpsc::Receiver<Result<T, E>>,
|
||||||
|
tx: mpsc::Sender<Result<T, E>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T, E> Default for FuturesResolver<T, E> {
|
||||||
|
fn default() -> Self {
|
||||||
|
// We only need the first one, so the channel is of size 1.
|
||||||
|
let (tx, rx) = mpsc::channel(1);
|
||||||
|
Self { tx, rx }
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T: Send + 'static, E: Send + 'static> FuturesResolver<T, E> {
|
||||||
|
/// Insert new future into this resolver, they will start running
|
||||||
|
/// right away.
|
||||||
|
pub fn push<Fut>(&self, fut: Fut)
|
||||||
|
where
|
||||||
|
Fut: Future<Output = Result<Option<T>, E>> + Send + 'static,
|
||||||
|
{
|
||||||
|
let tx = self.tx.clone();
|
||||||
|
|
||||||
|
tokio::spawn(async move {
|
||||||
|
tokio::pin!(fut);
|
||||||
|
|
||||||
|
Self::spawn_inner(fut, tx).await;
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn spawn_inner(
|
||||||
|
fut: Pin<&mut (dyn Future<Output = Result<Option<T>, E>> + Send)>,
|
||||||
|
tx: mpsc::Sender<Result<T, E>>,
|
||||||
|
) {
|
||||||
|
let res = tokio::select! {
|
||||||
|
biased;
|
||||||
|
|
||||||
|
_ = tx.closed() => return,
|
||||||
|
res = fut => res,
|
||||||
|
};
|
||||||
|
|
||||||
|
if let Some(res) = res.transpose() {
|
||||||
|
// try_send can only fail due to being full or being closed.
|
||||||
|
//
|
||||||
|
// In both cases, this could means some other future has
|
||||||
|
// completed first.
|
||||||
|
//
|
||||||
|
// For closed, it could additionally means that the task
|
||||||
|
// is cancelled.
|
||||||
|
tx.try_send(res).ok();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Insert multiple futures into this resolver, they will start running
|
||||||
|
/// right away.
|
||||||
|
pub fn extend<Fut, Iter>(&self, iter: Iter)
|
||||||
|
where
|
||||||
|
Fut: Future<Output = Result<Option<T>, E>> + Send + 'static,
|
||||||
|
Iter: IntoIterator<Item = Fut>,
|
||||||
|
{
|
||||||
|
iter.into_iter().for_each(|fut| self.push(fut));
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Return the resolution.
|
||||||
|
pub fn resolve(self) -> impl Future<Output = Result<Option<T>, E>> {
|
||||||
|
let mut rx = self.rx;
|
||||||
|
drop(self.tx);
|
||||||
|
|
||||||
|
async move { rx.recv().await.transpose() }
|
||||||
|
}
|
||||||
|
}
|
Loading…
Add table
Reference in a new issue