mirror of
https://github.com/cargo-bins/cargo-binstall.git
synced 2025-05-29 23:22:56 +00:00

* Perform artifact discovery in sequential Run different `fetcher.find()` in sequential * FuturesResolver: Fallback to other future if one error * Fix typo * Apply cargo fmt * Parallelise `<QuickInstall as Fetcher>::find` Check for signature in parallel to the package * Download signature in `<QuickInstall as Fetcher>::find` So that the signature download can be done in parallel. * Bump msrv for binstalk-fetchers to 1.70 * Update crates/binstalk-fetchers/src/futures_resolver.rs Co-authored-by: Félix Saparelli <felix@passcod.name> Signed-off-by: Jiahao XU <30436523+NobodyXu@users.noreply.github.com> * cargo fmt Signed-off-by: Jiahao XU <Jiahao_XU@outlook.com> --------- Signed-off-by: Jiahao XU <30436523+NobodyXu@users.noreply.github.com> Signed-off-by: Jiahao XU <Jiahao_XU@outlook.com> Co-authored-by: Félix Saparelli <felix@passcod.name>
86 lines
2.4 KiB
Rust
86 lines
2.4 KiB
Rust
use std::{fmt::Debug, future::Future, pin::Pin};
|
|
|
|
use tokio::sync::mpsc;
|
|
use tracing::warn;
|
|
|
|
/// Given multiple futures with output = `Result<Option<T>, E>`,
|
|
/// returns the the first one that returns either `Err(_)` or
|
|
/// `Ok(Some(_))`.
|
|
pub struct FuturesResolver<T, E> {
|
|
rx: mpsc::Receiver<Result<T, E>>,
|
|
tx: mpsc::Sender<Result<T, E>>,
|
|
}
|
|
|
|
impl<T, E> Default for FuturesResolver<T, E> {
|
|
fn default() -> Self {
|
|
// We only need the first one, so the channel is of size 1.
|
|
let (tx, rx) = mpsc::channel(1);
|
|
Self { tx, rx }
|
|
}
|
|
}
|
|
|
|
impl<T: Send + 'static, E: Send + Debug + 'static> FuturesResolver<T, E> {
|
|
/// Insert new future into this resolver, they will start running
|
|
/// right away.
|
|
pub fn push<Fut>(&self, fut: Fut)
|
|
where
|
|
Fut: Future<Output = Result<Option<T>, E>> + Send + 'static,
|
|
{
|
|
let tx = self.tx.clone();
|
|
|
|
tokio::spawn(async move {
|
|
tokio::pin!(fut);
|
|
|
|
Self::spawn_inner(fut, tx).await;
|
|
});
|
|
}
|
|
|
|
async fn spawn_inner(
|
|
fut: Pin<&mut (dyn Future<Output = Result<Option<T>, E>> + Send)>,
|
|
tx: mpsc::Sender<Result<T, E>>,
|
|
) {
|
|
let res = tokio::select! {
|
|
biased;
|
|
|
|
_ = tx.closed() => return,
|
|
res = fut => res,
|
|
};
|
|
|
|
if let Some(res) = res.transpose() {
|
|
// try_send can only fail due to being full or being closed.
|
|
//
|
|
// In both cases, this could means some other future has
|
|
// completed first.
|
|
//
|
|
// For closed, it could additionally means that the task
|
|
// is cancelled.
|
|
tx.try_send(res).ok();
|
|
}
|
|
}
|
|
|
|
/// Insert multiple futures into this resolver, they will start running
|
|
/// right away.
|
|
pub fn extend<Fut, Iter>(&self, iter: Iter)
|
|
where
|
|
Fut: Future<Output = Result<Option<T>, E>> + Send + 'static,
|
|
Iter: IntoIterator<Item = Fut>,
|
|
{
|
|
iter.into_iter().for_each(|fut| self.push(fut));
|
|
}
|
|
|
|
/// Return the resolution.
|
|
pub fn resolve(self) -> impl Future<Output = Option<T>> {
|
|
let mut rx = self.rx;
|
|
drop(self.tx);
|
|
|
|
async move {
|
|
loop {
|
|
match rx.recv().await {
|
|
Some(Ok(ret)) => return Some(ret),
|
|
Some(Err(err)) => warn!(?err, "Fail to resolve the future"),
|
|
None => return None,
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|