mirror of
https://github.com/cargo-bins/cargo-binstall.git
synced 2025-04-20 20:48:43 +00:00
Run artifact discover in sequential instead of in parallel (#1796)
* Perform artifact discovery in sequential Run different `fetcher.find()` in sequential * FuturesResolver: Fallback to other future if one error * Fix typo * Apply cargo fmt * Parallelise `<QuickInstall as Fetcher>::find` Check for signature in parallel to the package * Download signature in `<QuickInstall as Fetcher>::find` So that the signature download can be done in parallel. * Bump msrv for binstalk-fetchers to 1.70 * Update crates/binstalk-fetchers/src/futures_resolver.rs Co-authored-by: Félix Saparelli <felix@passcod.name> Signed-off-by: Jiahao XU <30436523+NobodyXu@users.noreply.github.com> * cargo fmt Signed-off-by: Jiahao XU <Jiahao_XU@outlook.com> --------- Signed-off-by: Jiahao XU <30436523+NobodyXu@users.noreply.github.com> Signed-off-by: Jiahao XU <Jiahao_XU@outlook.com> Co-authored-by: Félix Saparelli <felix@passcod.name>
This commit is contained in:
parent
ebdca1126e
commit
ac7bac651d
7 changed files with 128 additions and 54 deletions
|
@ -6,7 +6,7 @@ edition = "2021"
|
||||||
description = "The binstall fetchers"
|
description = "The binstall fetchers"
|
||||||
repository = "https://github.com/cargo-bins/cargo-binstall"
|
repository = "https://github.com/cargo-bins/cargo-binstall"
|
||||||
documentation = "https://docs.rs/binstalk-fetchers"
|
documentation = "https://docs.rs/binstalk-fetchers"
|
||||||
rust-version = "1.65.0"
|
rust-version = "1.70.0"
|
||||||
authors = ["Jiahao XU <Jiahao_XU@outlook.com>"]
|
authors = ["Jiahao XU <Jiahao_XU@outlook.com>"]
|
||||||
license = "GPL-3.0-only"
|
license = "GPL-3.0-only"
|
||||||
|
|
||||||
|
|
|
@ -1,6 +1,11 @@
|
||||||
use std::sync::{
|
#![allow(unused)]
|
||||||
atomic::{AtomicBool, Ordering::Relaxed},
|
|
||||||
Once,
|
use std::{
|
||||||
|
future::Future,
|
||||||
|
sync::{
|
||||||
|
atomic::{AtomicBool, Ordering::Relaxed},
|
||||||
|
Once,
|
||||||
|
},
|
||||||
};
|
};
|
||||||
|
|
||||||
pub(super) use binstalk_downloader::{
|
pub(super) use binstalk_downloader::{
|
||||||
|
@ -76,3 +81,33 @@ pub(super) async fn does_url_exist(
|
||||||
|
|
||||||
Ok(Box::pin(client.remote_gettable(url.clone())).await?)
|
Ok(Box::pin(client.remote_gettable(url.clone())).await?)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub(super) struct AutoAbortJoinHandle<T>(JoinHandle<T>);
|
||||||
|
|
||||||
|
impl<T> AutoAbortJoinHandle<T>
|
||||||
|
where
|
||||||
|
T: Send + 'static,
|
||||||
|
{
|
||||||
|
pub(super) fn spawn<F>(future: F) -> Self
|
||||||
|
where
|
||||||
|
F: Future<Output = T> + Send + 'static,
|
||||||
|
{
|
||||||
|
Self(tokio::spawn(future))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T> Drop for AutoAbortJoinHandle<T> {
|
||||||
|
fn drop(&mut self) {
|
||||||
|
self.0.abort();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T, E> AutoAbortJoinHandle<Result<T, E>>
|
||||||
|
where
|
||||||
|
E: Into<FetchError>,
|
||||||
|
{
|
||||||
|
pub(super) async fn flattened_join(mut self) -> Result<T, FetchError> {
|
||||||
|
(&mut self.0).await?.map_err(Into::into)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
@ -1,5 +1,7 @@
|
||||||
use std::{future::Future, pin::Pin};
|
use std::{fmt::Debug, future::Future, pin::Pin};
|
||||||
|
|
||||||
use tokio::sync::mpsc;
|
use tokio::sync::mpsc;
|
||||||
|
use tracing::warn;
|
||||||
|
|
||||||
/// Given multiple futures with output = `Result<Option<T>, E>`,
|
/// Given multiple futures with output = `Result<Option<T>, E>`,
|
||||||
/// returns the the first one that returns either `Err(_)` or
|
/// returns the the first one that returns either `Err(_)` or
|
||||||
|
@ -17,7 +19,7 @@ impl<T, E> Default for FuturesResolver<T, E> {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<T: Send + 'static, E: Send + 'static> FuturesResolver<T, E> {
|
impl<T: Send + 'static, E: Send + Debug + 'static> FuturesResolver<T, E> {
|
||||||
/// Insert new future into this resolver, they will start running
|
/// Insert new future into this resolver, they will start running
|
||||||
/// right away.
|
/// right away.
|
||||||
pub fn push<Fut>(&self, fut: Fut)
|
pub fn push<Fut>(&self, fut: Fut)
|
||||||
|
@ -67,10 +69,18 @@ impl<T: Send + 'static, E: Send + 'static> FuturesResolver<T, E> {
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Return the resolution.
|
/// Return the resolution.
|
||||||
pub fn resolve(self) -> impl Future<Output = Result<Option<T>, E>> {
|
pub fn resolve(self) -> impl Future<Output = Option<T>> {
|
||||||
let mut rx = self.rx;
|
let mut rx = self.rx;
|
||||||
drop(self.tx);
|
drop(self.tx);
|
||||||
|
|
||||||
async move { rx.recv().await.transpose() }
|
async move {
|
||||||
|
loop {
|
||||||
|
match rx.recv().await {
|
||||||
|
Some(Ok(ret)) => return Some(ret),
|
||||||
|
Some(Err(err)) => warn!(?err, "Fail to resolve the future"),
|
||||||
|
None => return None,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -278,7 +278,7 @@ impl super::Fetcher for GhCrateMeta {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if let Some(resolved) = resolver.resolve().await? {
|
if let Some(resolved) = resolver.resolve().await {
|
||||||
debug!(?resolved, "Winning URL found!");
|
debug!(?resolved, "Winning URL found!");
|
||||||
self.resolution
|
self.resolution
|
||||||
.set(resolved)
|
.set(resolved)
|
||||||
|
|
|
@ -6,7 +6,7 @@ use binstalk_downloader::{download::DownloadError, remote::Error as RemoteError}
|
||||||
use binstalk_git_repo_api::gh_api_client::{GhApiError, GhRepo};
|
use binstalk_git_repo_api::gh_api_client::{GhApiError, GhRepo};
|
||||||
use binstalk_types::cargo_toml_binstall::SigningAlgorithm;
|
use binstalk_types::cargo_toml_binstall::SigningAlgorithm;
|
||||||
use thiserror::Error as ThisError;
|
use thiserror::Error as ThisError;
|
||||||
use tokio::{sync::OnceCell, time::sleep};
|
use tokio::{sync::OnceCell, task::JoinError, time::sleep};
|
||||||
pub use url::ParseError as UrlParseError;
|
pub use url::ParseError as UrlParseError;
|
||||||
|
|
||||||
mod gh_crate_meta;
|
mod gh_crate_meta;
|
||||||
|
@ -70,6 +70,9 @@ pub enum FetchError {
|
||||||
|
|
||||||
#[error("Failed to verify signature")]
|
#[error("Failed to verify signature")]
|
||||||
InvalidSignature,
|
InvalidSignature,
|
||||||
|
|
||||||
|
#[error("Failed to wait for task: {0}")]
|
||||||
|
TaskJoinError(#[from] JoinError),
|
||||||
}
|
}
|
||||||
|
|
||||||
impl From<RemoteError> for FetchError {
|
impl From<RemoteError> for FetchError {
|
||||||
|
|
|
@ -1,4 +1,8 @@
|
||||||
use std::{borrow::Cow, path::Path, sync::Arc};
|
use std::{
|
||||||
|
borrow::Cow,
|
||||||
|
path::Path,
|
||||||
|
sync::{Arc, OnceLock},
|
||||||
|
};
|
||||||
|
|
||||||
use binstalk_downloader::remote::Method;
|
use binstalk_downloader::remote::Method;
|
||||||
use binstalk_types::cargo_toml_binstall::{PkgFmt, PkgMeta, PkgSigning};
|
use binstalk_types::cargo_toml_binstall::{PkgFmt, PkgMeta, PkgSigning};
|
||||||
|
@ -61,6 +65,8 @@ pub struct QuickInstall {
|
||||||
signature_policy: SignaturePolicy,
|
signature_policy: SignaturePolicy,
|
||||||
|
|
||||||
target_data: Arc<TargetDataErased>,
|
target_data: Arc<TargetDataErased>,
|
||||||
|
|
||||||
|
signature_verifier: OnceLock<SignatureVerifier>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl QuickInstall {
|
impl QuickInstall {
|
||||||
|
@ -75,6 +81,41 @@ impl QuickInstall {
|
||||||
.await
|
.await
|
||||||
.copied()
|
.copied()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn download_signature(
|
||||||
|
self: Arc<Self>,
|
||||||
|
) -> AutoAbortJoinHandle<Result<SignatureVerifier, FetchError>> {
|
||||||
|
AutoAbortJoinHandle::spawn(async move {
|
||||||
|
if self.signature_policy == SignaturePolicy::Ignore {
|
||||||
|
Ok(SignatureVerifier::Noop)
|
||||||
|
} else {
|
||||||
|
debug!(url=%self.signature_url, "Downloading signature");
|
||||||
|
match Download::new(self.client.clone(), self.signature_url.clone())
|
||||||
|
.into_bytes()
|
||||||
|
.await
|
||||||
|
{
|
||||||
|
Ok(signature) => {
|
||||||
|
trace!(?signature, "got signature contents");
|
||||||
|
let config = PkgSigning {
|
||||||
|
algorithm: SigningAlgorithm::Minisign,
|
||||||
|
pubkey: QUICKINSTALL_SIGN_KEY,
|
||||||
|
file: None,
|
||||||
|
};
|
||||||
|
SignatureVerifier::new(&config, &signature)
|
||||||
|
}
|
||||||
|
Err(err) => {
|
||||||
|
if self.signature_policy == SignaturePolicy::Require {
|
||||||
|
error!("Failed to download signature: {err}");
|
||||||
|
Err(FetchError::MissingSignature)
|
||||||
|
} else {
|
||||||
|
debug!("Failed to download signature, skipping verification: {err}");
|
||||||
|
Ok(SignatureVerifier::Noop)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[async_trait::async_trait]
|
#[async_trait::async_trait]
|
||||||
|
@ -109,6 +150,8 @@ impl super::Fetcher for QuickInstall {
|
||||||
signature_policy,
|
signature_policy,
|
||||||
|
|
||||||
target_data,
|
target_data,
|
||||||
|
|
||||||
|
signature_verifier: OnceLock::new(),
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -118,22 +161,28 @@ impl super::Fetcher for QuickInstall {
|
||||||
return Ok(false);
|
return Ok(false);
|
||||||
}
|
}
|
||||||
|
|
||||||
if self.signature_policy == SignaturePolicy::Require {
|
let download_signature_task = self.clone().download_signature();
|
||||||
does_url_exist(
|
|
||||||
self.client.clone(),
|
|
||||||
self.gh_api_client.clone(),
|
|
||||||
&self.signature_url,
|
|
||||||
)
|
|
||||||
.await
|
|
||||||
.map_err(|_| FetchError::MissingSignature)?;
|
|
||||||
}
|
|
||||||
|
|
||||||
does_url_exist(
|
let is_found = does_url_exist(
|
||||||
self.client.clone(),
|
self.client.clone(),
|
||||||
self.gh_api_client.clone(),
|
self.gh_api_client.clone(),
|
||||||
&self.package_url,
|
&self.package_url,
|
||||||
)
|
)
|
||||||
.await
|
.await?;
|
||||||
|
|
||||||
|
if !is_found {
|
||||||
|
return Ok(false);
|
||||||
|
}
|
||||||
|
|
||||||
|
if self
|
||||||
|
.signature_verifier
|
||||||
|
.set(download_signature_task.flattened_join().await?)
|
||||||
|
.is_err()
|
||||||
|
{
|
||||||
|
panic!("<QuickInstall as Fetcher>::find is run twice");
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(true)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -160,33 +209,8 @@ by rust officially."#,
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn fetch_and_extract(&self, dst: &Path) -> Result<ExtractedFiles, FetchError> {
|
async fn fetch_and_extract(&self, dst: &Path) -> Result<ExtractedFiles, FetchError> {
|
||||||
let verifier = if self.signature_policy == SignaturePolicy::Ignore {
|
let Some(verifier) = self.signature_verifier.get() else {
|
||||||
SignatureVerifier::Noop
|
panic!("<QuickInstall as Fetcher>::find has not been called yet!")
|
||||||
} else {
|
|
||||||
debug!(url=%self.signature_url, "Downloading signature");
|
|
||||||
match Download::new(self.client.clone(), self.signature_url.clone())
|
|
||||||
.into_bytes()
|
|
||||||
.await
|
|
||||||
{
|
|
||||||
Ok(signature) => {
|
|
||||||
trace!(?signature, "got signature contents");
|
|
||||||
let config = PkgSigning {
|
|
||||||
algorithm: SigningAlgorithm::Minisign,
|
|
||||||
pubkey: QUICKINSTALL_SIGN_KEY,
|
|
||||||
file: None,
|
|
||||||
};
|
|
||||||
SignatureVerifier::new(&config, &signature)?
|
|
||||||
}
|
|
||||||
Err(err) => {
|
|
||||||
if self.signature_policy == SignaturePolicy::Require {
|
|
||||||
error!("Failed to download signature: {err}");
|
|
||||||
return Err(FetchError::MissingSignature);
|
|
||||||
}
|
|
||||||
|
|
||||||
debug!("Failed to download signature, skipping verification: {err}");
|
|
||||||
SignatureVerifier::Noop
|
|
||||||
}
|
|
||||||
}
|
|
||||||
};
|
};
|
||||||
|
|
||||||
debug!(url=%self.package_url, "Downloading package");
|
debug!(url=%self.package_url, "Downloading package");
|
||||||
|
|
|
@ -97,7 +97,7 @@ async fn resolve_inner(
|
||||||
_ => None,
|
_ => None,
|
||||||
};
|
};
|
||||||
|
|
||||||
let mut handles: Vec<(Arc<dyn Fetcher>, _)> = Vec::with_capacity(
|
let mut handles: Vec<Arc<dyn Fetcher>> = Vec::with_capacity(
|
||||||
desired_targets.len() * resolvers.len()
|
desired_targets.len() * resolvers.len()
|
||||||
+ if binary_name.is_some() {
|
+ if binary_name.is_some() {
|
||||||
desired_targets.len()
|
desired_targets.len()
|
||||||
|
@ -139,8 +139,7 @@ async fn resolve_inner(
|
||||||
target_data,
|
target_data,
|
||||||
opts.signature_policy,
|
opts.signature_policy,
|
||||||
);
|
);
|
||||||
filter_fetcher_by_name_predicate(fetcher.fetcher_name())
|
filter_fetcher_by_name_predicate(fetcher.fetcher_name()).then_some(fetcher)
|
||||||
.then_some((fetcher.clone(), AutoAbortJoinHandle::new(fetcher.find())))
|
|
||||||
}),
|
}),
|
||||||
)
|
)
|
||||||
};
|
};
|
||||||
|
@ -165,9 +164,12 @@ async fn resolve_inner(
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
for (fetcher, handle) in handles {
|
for fetcher in handles {
|
||||||
fetcher.clone().report_to_upstream();
|
fetcher.clone().report_to_upstream();
|
||||||
match handle.flattened_join().await {
|
match AutoAbortJoinHandle::new(fetcher.clone().find())
|
||||||
|
.flattened_join()
|
||||||
|
.await
|
||||||
|
{
|
||||||
Ok(true) => {
|
Ok(true) => {
|
||||||
// Generate temporary binary path
|
// Generate temporary binary path
|
||||||
let bin_path = opts.temp_dir.join(format!(
|
let bin_path = opts.temp_dir.join(format!(
|
||||||
|
|
Loading…
Add table
Reference in a new issue