From ac7bac651df718620c61e67d131e29feae92a83a Mon Sep 17 00:00:00 2001
From: Jiahao XU <30436523+NobodyXu@users.noreply.github.com>
Date: Sun, 23 Jun 2024 20:42:03 +1000
Subject: [PATCH] Run artifact discover in sequential instead of in parallel
 (#1796)
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit

* Perform artifact discovery in sequential

Run different `fetcher.find()` in sequential

* FuturesResolver: Fallback to other future if one error

* Fix typo

* Apply cargo fmt

* Parallelise `<QuickInstall as Fetcher>::find`

Check for signature in parallel to the package

* Download signature in `<QuickInstall as Fetcher>::find`

So that the signature download can be done in parallel.

* Bump msrv for binstalk-fetchers to 1.70

* Update crates/binstalk-fetchers/src/futures_resolver.rs

Co-authored-by: Félix Saparelli <felix@passcod.name>
Signed-off-by: Jiahao XU <30436523+NobodyXu@users.noreply.github.com>

* cargo fmt

Signed-off-by: Jiahao XU <Jiahao_XU@outlook.com>

---------

Signed-off-by: Jiahao XU <30436523+NobodyXu@users.noreply.github.com>
Signed-off-by: Jiahao XU <Jiahao_XU@outlook.com>
Co-authored-by: Félix Saparelli <felix@passcod.name>
---
 crates/binstalk-fetchers/Cargo.toml           |   2 +-
 crates/binstalk-fetchers/src/common.rs        |  41 ++++++-
 .../binstalk-fetchers/src/futures_resolver.rs |  18 +++-
 crates/binstalk-fetchers/src/gh_crate_meta.rs |   2 +-
 crates/binstalk-fetchers/src/lib.rs           |   5 +-
 crates/binstalk-fetchers/src/quickinstall.rs  | 102 +++++++++++-------
 crates/binstalk/src/ops/resolve.rs            |  12 ++-
 7 files changed, 128 insertions(+), 54 deletions(-)

diff --git a/crates/binstalk-fetchers/Cargo.toml b/crates/binstalk-fetchers/Cargo.toml
index 877162cf..37bd254e 100644
--- a/crates/binstalk-fetchers/Cargo.toml
+++ b/crates/binstalk-fetchers/Cargo.toml
@@ -6,7 +6,7 @@ edition = "2021"
 description = "The binstall fetchers"
 repository = "https://github.com/cargo-bins/cargo-binstall"
 documentation = "https://docs.rs/binstalk-fetchers"
-rust-version = "1.65.0"
+rust-version = "1.70.0"
 authors = ["Jiahao XU <Jiahao_XU@outlook.com>"]
 license = "GPL-3.0-only"
 
diff --git a/crates/binstalk-fetchers/src/common.rs b/crates/binstalk-fetchers/src/common.rs
index b3a491ae..10272f10 100644
--- a/crates/binstalk-fetchers/src/common.rs
+++ b/crates/binstalk-fetchers/src/common.rs
@@ -1,6 +1,11 @@
-use std::sync::{
-    atomic::{AtomicBool, Ordering::Relaxed},
-    Once,
+#![allow(unused)]
+
+use std::{
+    future::Future,
+    sync::{
+        atomic::{AtomicBool, Ordering::Relaxed},
+        Once,
+    },
 };
 
 pub(super) use binstalk_downloader::{
@@ -76,3 +81,33 @@ pub(super) async fn does_url_exist(
 
     Ok(Box::pin(client.remote_gettable(url.clone())).await?)
 }
+
+#[derive(Debug)]
+pub(super) struct AutoAbortJoinHandle<T>(JoinHandle<T>);
+
+impl<T> AutoAbortJoinHandle<T>
+where
+    T: Send + 'static,
+{
+    pub(super) fn spawn<F>(future: F) -> Self
+    where
+        F: Future<Output = T> + Send + 'static,
+    {
+        Self(tokio::spawn(future))
+    }
+}
+
+impl<T> Drop for AutoAbortJoinHandle<T> {
+    fn drop(&mut self) {
+        self.0.abort();
+    }
+}
+
+impl<T, E> AutoAbortJoinHandle<Result<T, E>>
+where
+    E: Into<FetchError>,
+{
+    pub(super) async fn flattened_join(mut self) -> Result<T, FetchError> {
+        (&mut self.0).await?.map_err(Into::into)
+    }
+}
diff --git a/crates/binstalk-fetchers/src/futures_resolver.rs b/crates/binstalk-fetchers/src/futures_resolver.rs
index 0a550519..461ab462 100644
--- a/crates/binstalk-fetchers/src/futures_resolver.rs
+++ b/crates/binstalk-fetchers/src/futures_resolver.rs
@@ -1,5 +1,7 @@
-use std::{future::Future, pin::Pin};
+use std::{fmt::Debug, future::Future, pin::Pin};
+
 use tokio::sync::mpsc;
+use tracing::warn;
 
 /// Given multiple futures with output = `Result<Option<T>, E>`,
 /// returns the the first one that returns either `Err(_)` or
@@ -17,7 +19,7 @@ impl<T, E> Default for FuturesResolver<T, E> {
     }
 }
 
-impl<T: Send + 'static, E: Send + 'static> FuturesResolver<T, E> {
+impl<T: Send + 'static, E: Send + Debug + 'static> FuturesResolver<T, E> {
     /// Insert new future into this resolver, they will start running
     /// right away.
     pub fn push<Fut>(&self, fut: Fut)
@@ -67,10 +69,18 @@ impl<T: Send + 'static, E: Send + 'static> FuturesResolver<T, E> {
     }
 
     /// Return the resolution.
-    pub fn resolve(self) -> impl Future<Output = Result<Option<T>, E>> {
+    pub fn resolve(self) -> impl Future<Output = Option<T>> {
         let mut rx = self.rx;
         drop(self.tx);
 
-        async move { rx.recv().await.transpose() }
+        async move {
+            loop {
+                match rx.recv().await {
+                    Some(Ok(ret)) => return Some(ret),
+                    Some(Err(err)) => warn!(?err, "Fail to resolve the future"),
+                    None => return None,
+                }
+            }
+        }
     }
 }
diff --git a/crates/binstalk-fetchers/src/gh_crate_meta.rs b/crates/binstalk-fetchers/src/gh_crate_meta.rs
index 976b6942..51cd012c 100644
--- a/crates/binstalk-fetchers/src/gh_crate_meta.rs
+++ b/crates/binstalk-fetchers/src/gh_crate_meta.rs
@@ -278,7 +278,7 @@ impl super::Fetcher for GhCrateMeta {
                 }
             }
 
-            if let Some(resolved) = resolver.resolve().await? {
+            if let Some(resolved) = resolver.resolve().await {
                 debug!(?resolved, "Winning URL found!");
                 self.resolution
                     .set(resolved)
diff --git a/crates/binstalk-fetchers/src/lib.rs b/crates/binstalk-fetchers/src/lib.rs
index 33e50944..f589e5fc 100644
--- a/crates/binstalk-fetchers/src/lib.rs
+++ b/crates/binstalk-fetchers/src/lib.rs
@@ -6,7 +6,7 @@ use binstalk_downloader::{download::DownloadError, remote::Error as RemoteError}
 use binstalk_git_repo_api::gh_api_client::{GhApiError, GhRepo};
 use binstalk_types::cargo_toml_binstall::SigningAlgorithm;
 use thiserror::Error as ThisError;
-use tokio::{sync::OnceCell, time::sleep};
+use tokio::{sync::OnceCell, task::JoinError, time::sleep};
 pub use url::ParseError as UrlParseError;
 
 mod gh_crate_meta;
@@ -70,6 +70,9 @@ pub enum FetchError {
 
     #[error("Failed to verify signature")]
     InvalidSignature,
+
+    #[error("Failed to wait for task: {0}")]
+    TaskJoinError(#[from] JoinError),
 }
 
 impl From<RemoteError> for FetchError {
diff --git a/crates/binstalk-fetchers/src/quickinstall.rs b/crates/binstalk-fetchers/src/quickinstall.rs
index aa9b402a..2862fb89 100644
--- a/crates/binstalk-fetchers/src/quickinstall.rs
+++ b/crates/binstalk-fetchers/src/quickinstall.rs
@@ -1,4 +1,8 @@
-use std::{borrow::Cow, path::Path, sync::Arc};
+use std::{
+    borrow::Cow,
+    path::Path,
+    sync::{Arc, OnceLock},
+};
 
 use binstalk_downloader::remote::Method;
 use binstalk_types::cargo_toml_binstall::{PkgFmt, PkgMeta, PkgSigning};
@@ -61,6 +65,8 @@ pub struct QuickInstall {
     signature_policy: SignaturePolicy,
 
     target_data: Arc<TargetDataErased>,
+
+    signature_verifier: OnceLock<SignatureVerifier>,
 }
 
 impl QuickInstall {
@@ -75,6 +81,41 @@ impl QuickInstall {
             .await
             .copied()
     }
+
+    fn download_signature(
+        self: Arc<Self>,
+    ) -> AutoAbortJoinHandle<Result<SignatureVerifier, FetchError>> {
+        AutoAbortJoinHandle::spawn(async move {
+            if self.signature_policy == SignaturePolicy::Ignore {
+                Ok(SignatureVerifier::Noop)
+            } else {
+                debug!(url=%self.signature_url, "Downloading signature");
+                match Download::new(self.client.clone(), self.signature_url.clone())
+                    .into_bytes()
+                    .await
+                {
+                    Ok(signature) => {
+                        trace!(?signature, "got signature contents");
+                        let config = PkgSigning {
+                            algorithm: SigningAlgorithm::Minisign,
+                            pubkey: QUICKINSTALL_SIGN_KEY,
+                            file: None,
+                        };
+                        SignatureVerifier::new(&config, &signature)
+                    }
+                    Err(err) => {
+                        if self.signature_policy == SignaturePolicy::Require {
+                            error!("Failed to download signature: {err}");
+                            Err(FetchError::MissingSignature)
+                        } else {
+                            debug!("Failed to download signature, skipping verification: {err}");
+                            Ok(SignatureVerifier::Noop)
+                        }
+                    }
+                }
+            }
+        })
+    }
 }
 
 #[async_trait::async_trait]
@@ -109,6 +150,8 @@ impl super::Fetcher for QuickInstall {
             signature_policy,
 
             target_data,
+
+            signature_verifier: OnceLock::new(),
         })
     }
 
@@ -118,22 +161,28 @@ impl super::Fetcher for QuickInstall {
                 return Ok(false);
             }
 
-            if self.signature_policy == SignaturePolicy::Require {
-                does_url_exist(
-                    self.client.clone(),
-                    self.gh_api_client.clone(),
-                    &self.signature_url,
-                )
-                .await
-                .map_err(|_| FetchError::MissingSignature)?;
-            }
+            let download_signature_task = self.clone().download_signature();
 
-            does_url_exist(
+            let is_found = does_url_exist(
                 self.client.clone(),
                 self.gh_api_client.clone(),
                 &self.package_url,
             )
-            .await
+            .await?;
+
+            if !is_found {
+                return Ok(false);
+            }
+
+            if self
+                .signature_verifier
+                .set(download_signature_task.flattened_join().await?)
+                .is_err()
+            {
+                panic!("<QuickInstall as Fetcher>::find is run twice");
+            }
+
+            Ok(true)
         })
     }
 
@@ -160,33 +209,8 @@ by rust officially."#,
     }
 
     async fn fetch_and_extract(&self, dst: &Path) -> Result<ExtractedFiles, FetchError> {
-        let verifier = if self.signature_policy == SignaturePolicy::Ignore {
-            SignatureVerifier::Noop
-        } else {
-            debug!(url=%self.signature_url, "Downloading signature");
-            match Download::new(self.client.clone(), self.signature_url.clone())
-                .into_bytes()
-                .await
-            {
-                Ok(signature) => {
-                    trace!(?signature, "got signature contents");
-                    let config = PkgSigning {
-                        algorithm: SigningAlgorithm::Minisign,
-                        pubkey: QUICKINSTALL_SIGN_KEY,
-                        file: None,
-                    };
-                    SignatureVerifier::new(&config, &signature)?
-                }
-                Err(err) => {
-                    if self.signature_policy == SignaturePolicy::Require {
-                        error!("Failed to download signature: {err}");
-                        return Err(FetchError::MissingSignature);
-                    }
-
-                    debug!("Failed to download signature, skipping verification: {err}");
-                    SignatureVerifier::Noop
-                }
-            }
+        let Some(verifier) = self.signature_verifier.get() else {
+            panic!("<QuickInstall as Fetcher>::find has not been called yet!")
         };
 
         debug!(url=%self.package_url, "Downloading package");
diff --git a/crates/binstalk/src/ops/resolve.rs b/crates/binstalk/src/ops/resolve.rs
index 805bfb4a..b8e02374 100644
--- a/crates/binstalk/src/ops/resolve.rs
+++ b/crates/binstalk/src/ops/resolve.rs
@@ -97,7 +97,7 @@ async fn resolve_inner(
         _ => None,
     };
 
-    let mut handles: Vec<(Arc<dyn Fetcher>, _)> = Vec::with_capacity(
+    let mut handles: Vec<Arc<dyn Fetcher>> = Vec::with_capacity(
         desired_targets.len() * resolvers.len()
             + if binary_name.is_some() {
                 desired_targets.len()
@@ -139,8 +139,7 @@ async fn resolve_inner(
                             target_data,
                             opts.signature_policy,
                         );
-                        filter_fetcher_by_name_predicate(fetcher.fetcher_name())
-                            .then_some((fetcher.clone(), AutoAbortJoinHandle::new(fetcher.find())))
+                        filter_fetcher_by_name_predicate(fetcher.fetcher_name()).then_some(fetcher)
                     }),
             )
         };
@@ -165,9 +164,12 @@ async fn resolve_inner(
         );
     }
 
-    for (fetcher, handle) in handles {
+    for fetcher in handles {
         fetcher.clone().report_to_upstream();
-        match handle.flattened_join().await {
+        match AutoAbortJoinHandle::new(fetcher.clone().find())
+            .flattened_join()
+            .await
+        {
             Ok(true) => {
                 // Generate temporary binary path
                 let bin_path = opts.temp_dir.join(format!(