diff --git a/Cargo.lock b/Cargo.lock index 16033ed1..104f0edf 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -143,6 +143,7 @@ dependencies = [ "tinytemplate", "tokio", "toml_edit", + "tower", "trust-dns-resolver", "url", "xz2", @@ -261,7 +262,6 @@ dependencies = [ "log", "miette", "mimalloc", - "reqwest", "semver", "simplelog", "tempfile", @@ -1373,6 +1373,26 @@ version = "2.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "478c572c3d73181ff3c2539045f6eb99e5491218eae919370993b890cdbdd98e" +[[package]] +name = "pin-project" +version = "1.0.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ad29a609b6bcd67fee905812e544992d216af9d755757c05ed2d0e15a74c6ecc" +dependencies = [ + "pin-project-internal", +] + +[[package]] +name = "pin-project-internal" +version = "1.0.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "069bdb1e05adc7a8990dce9cc75370895fbe4e3d58b9b73bf1aee56359344a55" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "pin-project-lite" version = "0.2.9" @@ -2095,6 +2115,29 @@ dependencies = [ "serde", ] +[[package]] +name = "tower" +version = "0.4.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b8fa9be0de6cf49e536ce1851f987bd21a43b771b09473c3549a6c853db37c1c" +dependencies = [ + "futures-core", + "futures-util", + "pin-project", + "pin-project-lite", + "tokio", + "tokio-util", + "tower-layer", + "tower-service", + "tracing", +] + +[[package]] +name = "tower-layer" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "343bc9466d3fe6b0f960ef45960509f84480bf4fd96f92901afe7ff3df9d3a62" + [[package]] name = "tower-service" version = "0.3.2" @@ -2108,6 +2151,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2fce9567bd60a67d08a16488756721ba392f24f29006402881e43b19aac64307" dependencies = [ "cfg-if", + "log", "pin-project-lite", "tracing-core", ] diff --git a/crates/bin/Cargo.toml b/crates/bin/Cargo.toml index b1d84346..8e05e08d 100644 --- a/crates/bin/Cargo.toml +++ b/crates/bin/Cargo.toml @@ -29,7 +29,6 @@ dirs = "4.0.0" log = "0.4.17" miette = "5.3.0" mimalloc = { version = "0.1.29", default-features = false, optional = true } -reqwest = { version = "0.11.12", default-features = false } semver = "1.0.14" simplelog = "0.12.0" tempfile = "3.3.0" diff --git a/crates/bin/src/args.rs b/crates/bin/src/args.rs index 898c8069..0f2af0f7 100644 --- a/crates/bin/src/args.rs +++ b/crates/bin/src/args.rs @@ -1,13 +1,19 @@ -use std::{ffi::OsString, path::PathBuf}; +use std::{ + ffi::OsString, + fmt, + num::{NonZeroU64, ParseIntError}, + path::PathBuf, + str::FromStr, +}; use binstalk::{ errors::BinstallError, + helpers::remote::tls::Version, manifests::cargo_toml_binstall::PkgFmt, ops::resolve::{CrateName, VersionReqExt}, }; use clap::{Parser, ValueEnum}; use log::LevelFilter; -use reqwest::tls::Version; use semver::VersionReq; #[derive(Debug, Parser)] @@ -108,6 +114,21 @@ pub struct Args { #[clap(help_heading = "Overrides", long)] pub pkg_url: Option, + /// Override the rate limit duration. + /// + /// By default, cargo-binstall allows one request per 5 ms. + /// + /// Example: + /// + /// - `6`: Set the duration to 6ms, allows one request per 6 ms. + /// + /// - `6/2`: Set the duration to 6ms and request_count to 2, + /// allows 2 requests per 6ms. + /// + /// Both duration and request count must not be 0. + #[clap(help_heading = "Overrides", long, default_value_t = RateLimit::default())] + pub rate_limit: RateLimit, + /// Disable symlinking / versioned updates. /// /// By default, Binstall will install a binary named `-` in the install path, and @@ -218,6 +239,45 @@ impl From for Version { } } +#[derive(Copy, Clone, Debug)] +pub struct RateLimit { + pub duration: NonZeroU64, + pub request_count: NonZeroU64, +} + +impl fmt::Display for RateLimit { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "{}/{}", self.duration, self.request_count) + } +} + +impl FromStr for RateLimit { + type Err = ParseIntError; + + fn from_str(s: &str) -> Result { + Ok(if let Some((first, second)) = s.split_once('/') { + Self { + duration: first.parse()?, + request_count: second.parse()?, + } + } else { + Self { + duration: s.parse()?, + ..Default::default() + } + }) + } +} + +impl Default for RateLimit { + fn default() -> Self { + Self { + duration: NonZeroU64::new(5).unwrap(), + request_count: NonZeroU64::new(1).unwrap(), + } + } +} + pub fn parse() -> Result { // Filter extraneous arg when invoked by cargo // `cargo run -- --help` gives ["target/debug/cargo-binstall", "--help"] diff --git a/crates/bin/src/entry.rs b/crates/bin/src/entry.rs index 10fcee1f..b30fddee 100644 --- a/crates/bin/src/entry.rs +++ b/crates/bin/src/entry.rs @@ -3,10 +3,7 @@ use std::{fs, path::Path, sync::Arc, time::Duration}; use binstalk::{ errors::BinstallError, get_desired_targets, - helpers::{ - jobserver_client::LazyJobserverClient, remote::create_reqwest_client, - tasks::AutoAbortJoinHandle, - }, + helpers::{jobserver_client::LazyJobserverClient, remote::Client, tasks::AutoAbortJoinHandle}, manifests::{ binstall_crates_v1::Records, cargo_crates_v1::CratesToml, cargo_toml_binstall::PkgOverride, }, @@ -31,12 +28,20 @@ pub async fn install_crates(mut args: Args, jobserver_client: LazyJobserverClien // Launch target detection let desired_targets = get_desired_targets(args.targets.take()); + let rate_limit = args.rate_limit; + // Initialize reqwest client - let client = create_reqwest_client(args.min_tls_version.map(|v| v.into()))?; + let client = Client::new( + args.min_tls_version.map(|v| v.into()), + Duration::from_millis(rate_limit.duration.get()), + rate_limit.request_count, + )?; // Build crates.io api client - let crates_io_api_client = - crates_io_api::AsyncClient::with_http_client(client.clone(), Duration::from_millis(100)); + let crates_io_api_client = crates_io_api::AsyncClient::with_http_client( + client.get_inner().clone(), + Duration::from_millis(100), + ); // Initialize UI thread let mut uithread = UIThread::new(!args.no_confirm); diff --git a/crates/binstalk/Cargo.toml b/crates/binstalk/Cargo.toml index 858a93ed..c710c2ab 100644 --- a/crates/binstalk/Cargo.toml +++ b/crates/binstalk/Cargo.toml @@ -47,6 +47,7 @@ thiserror = "1.0.37" tinytemplate = "1.2.1" tokio = { version = "1.21.2", features = ["macros", "rt", "process", "sync", "signal"], default-features = false } toml_edit = { version = "0.14.4", features = ["easy"] } +tower = { version = "0.4.13", features = ["limit", "util"] } trust-dns-resolver = { version = "0.21.2", optional = true, default-features = false, features = ["dnssec-ring"] } url = { version = "2.3.1", features = ["serde"] } xz2 = "0.1.7" diff --git a/crates/binstalk/src/drivers/crates_io.rs b/crates/binstalk/src/drivers/crates_io.rs index 21b1da26..aa7f0abe 100644 --- a/crates/binstalk/src/drivers/crates_io.rs +++ b/crates/binstalk/src/drivers/crates_io.rs @@ -3,13 +3,14 @@ use std::path::PathBuf; use cargo_toml::Manifest; use crates_io_api::AsyncClient; use log::debug; -use reqwest::Client; use semver::VersionReq; -use url::Url; use crate::{ errors::BinstallError, - helpers::download::Download, + helpers::{ + download::Download, + remote::{Client, Url}, + }, manifests::cargo_toml_binstall::{Meta, TarBasedFmt}, }; diff --git a/crates/binstalk/src/fetchers.rs b/crates/binstalk/src/fetchers.rs index 650de1d8..603c1b3d 100644 --- a/crates/binstalk/src/fetchers.rs +++ b/crates/binstalk/src/fetchers.rs @@ -4,10 +4,10 @@ use compact_str::CompactString; pub use gh_crate_meta::*; pub use log::debug; pub use quickinstall::*; -use reqwest::Client; use crate::{ errors::BinstallError, + helpers::remote::Client, manifests::cargo_toml_binstall::{PkgFmt, PkgMeta}, }; diff --git a/crates/binstalk/src/fetchers/gh_crate_meta.rs b/crates/binstalk/src/fetchers/gh_crate_meta.rs index 8676e74e..3d33db51 100644 --- a/crates/binstalk/src/fetchers/gh_crate_meta.rs +++ b/crates/binstalk/src/fetchers/gh_crate_meta.rs @@ -4,7 +4,6 @@ use compact_str::{CompactString, ToCompactString}; use futures_util::stream::{FuturesUnordered, StreamExt}; use log::{debug, warn}; use once_cell::sync::OnceCell; -use reqwest::{Client, Method}; use serde::Serialize; use strum::IntoEnumIterator; use tinytemplate::TinyTemplate; @@ -14,7 +13,7 @@ use crate::{ errors::BinstallError, helpers::{ download::Download, - remote::{get_redirected_final_url, remote_exists}, + remote::{Client, Method}, tasks::AutoAbortJoinHandle, }, manifests::cargo_toml_binstall::{PkgFmt, PkgMeta}, @@ -59,11 +58,9 @@ impl GhCrateMeta { AutoAbortJoinHandle::spawn(async move { debug!("Checking for package at: '{url}'"); - Ok( - (remote_exists(client.clone(), url.clone(), Method::HEAD).await? - || remote_exists(client, url.clone(), Method::GET).await?) - .then_some((url, pkg_fmt)), - ) + Ok((client.remote_exists(url.clone(), Method::HEAD).await? + || client.remote_exists(url.clone(), Method::GET).await?) + .then_some((url, pkg_fmt))) }) }) } @@ -81,7 +78,11 @@ impl super::Fetcher for GhCrateMeta { async fn find(&self) -> Result { let repo = if let Some(repo) = self.data.repo.as_deref() { - Some(get_redirected_final_url(&self.client, Url::parse(repo)?).await?) + Some( + self.client + .get_redirected_final_url(Url::parse(repo)?) + .await?, + ) } else { None }; diff --git a/crates/binstalk/src/fetchers/quickinstall.rs b/crates/binstalk/src/fetchers/quickinstall.rs index 6613d308..0886b200 100644 --- a/crates/binstalk/src/fetchers/quickinstall.rs +++ b/crates/binstalk/src/fetchers/quickinstall.rs @@ -2,14 +2,15 @@ use std::{path::Path, sync::Arc}; use compact_str::CompactString; use log::debug; -use reqwest::Client; -use reqwest::Method; use tokio::task::JoinHandle; use url::Url; use crate::{ errors::BinstallError, - helpers::{download::Download, remote::remote_exists}, + helpers::{ + download::Download, + remote::{Client, Method}, + }, manifests::cargo_toml_binstall::{PkgFmt, PkgMeta}, }; @@ -43,7 +44,9 @@ impl super::Fetcher for QuickInstall { let url = self.package_url(); self.report(); debug!("Checking for package at: '{url}'"); - remote_exists(self.client.clone(), Url::parse(&url)?, Method::HEAD).await + self.client + .remote_exists(Url::parse(&url)?, Method::HEAD) + .await } async fn fetch_and_extract(&self, dst: &Path) -> Result<(), BinstallError> { @@ -112,15 +115,7 @@ impl QuickInstall { let url = Url::parse(&stats_url)?; debug!("Sending installation report to quickinstall ({url})"); - client - .request(Method::HEAD, url.clone()) - .send() - .await - .map_err(|err| BinstallError::Http { - method: Method::HEAD, - url, - err, - })?; + client.remote_exists(url, Method::HEAD).await?; Ok(()) }) diff --git a/crates/binstalk/src/helpers/download.rs b/crates/binstalk/src/helpers/download.rs index 4918e176..cb606000 100644 --- a/crates/binstalk/src/helpers/download.rs +++ b/crates/binstalk/src/helpers/download.rs @@ -2,11 +2,10 @@ use std::{fmt::Debug, marker::PhantomData, path::Path}; use digest::{Digest, FixedOutput, HashMarker, Output, OutputSizeUser, Update}; use log::debug; -use reqwest::{Client, Url}; use crate::{ errors::BinstallError, - helpers::remote::create_request, + helpers::remote::{Client, Url}, manifests::cargo_toml_binstall::{PkgFmt, PkgFmtDecomposed, TarBasedFmt}, }; @@ -48,7 +47,7 @@ impl Download { fmt: TarBasedFmt, visitor: V, ) -> Result { - let stream = create_request(self.client, self.url).await?; + let stream = self.client.create_request(self.url).await?; debug!("Downloading and extracting then in-memory processing"); @@ -65,7 +64,7 @@ impl Download { fmt: PkgFmt, path: impl AsRef, ) -> Result<(), BinstallError> { - let stream = create_request(self.client, self.url).await?; + let stream = self.client.create_request(self.url).await?; let path = path.as_ref(); debug!("Downloading and extracting to: '{}'", path.display()); diff --git a/crates/binstalk/src/helpers/remote.rs b/crates/binstalk/src/helpers/remote.rs index 95492cc0..d009939d 100644 --- a/crates/binstalk/src/helpers/remote.rs +++ b/crates/binstalk/src/helpers/remote.rs @@ -1,69 +1,109 @@ -use std::env; +use std::{env, num::NonZeroU64, sync::Arc, time::Duration}; use bytes::Bytes; use futures_util::stream::Stream; use log::debug; -use reqwest::{tls, Client, ClientBuilder, Method, Response}; -use url::Url; +use reqwest::{Request, Response}; +use tokio::sync::Mutex; +use tower::{limit::rate::RateLimit, Service, ServiceBuilder, ServiceExt}; use crate::errors::BinstallError; -pub fn create_reqwest_client(min_tls: Option) -> Result { - const USER_AGENT: &str = concat!(env!("CARGO_PKG_NAME"), "/", env!("CARGO_PKG_VERSION")); +pub use reqwest::{tls, Method}; +pub use url::Url; - let mut builder = ClientBuilder::new() - .user_agent(USER_AGENT) - .https_only(true) - .min_tls_version(tls::Version::TLS_1_2); +#[derive(Clone, Debug)] +pub struct Client { + client: reqwest::Client, + rate_limit: Arc>>, +} - if let Some(ver) = min_tls { - builder = builder.min_tls_version(ver); +impl Client { + /// * `per` - must not be 0. + pub fn new( + min_tls: Option, + per: Duration, + num_request: NonZeroU64, + ) -> Result { + const USER_AGENT: &str = concat!(env!("CARGO_PKG_NAME"), "/", env!("CARGO_PKG_VERSION")); + + let mut builder = reqwest::ClientBuilder::new() + .user_agent(USER_AGENT) + .https_only(true) + .min_tls_version(tls::Version::TLS_1_2) + .tcp_nodelay(false); + + if let Some(ver) = min_tls { + builder = builder.min_tls_version(ver); + } + + let client = builder.build()?; + + Ok(Self { + client: client.clone(), + rate_limit: Arc::new(Mutex::new( + ServiceBuilder::new() + .rate_limit(num_request.get(), per) + .service(client), + )), + }) } - Ok(builder.build()?) -} - -pub async fn remote_exists( - client: Client, - url: Url, - method: Method, -) -> Result { - let req = client - .request(method.clone(), url.clone()) - .send() - .await - .map_err(|err| BinstallError::Http { method, url, err })?; - Ok(req.status().is_success()) -} - -pub async fn get_redirected_final_url(client: &Client, url: Url) -> Result { - let method = Method::HEAD; - - let req = client - .request(method.clone(), url.clone()) - .send() - .await - .and_then(Response::error_for_status) - .map_err(|err| BinstallError::Http { method, url, err })?; - - Ok(req.url().clone()) -} - -pub(crate) async fn create_request( - client: Client, - url: Url, -) -> Result>, BinstallError> { - debug!("Downloading from: '{url}'"); - - client - .get(url.clone()) - .send() - .await - .and_then(|r| r.error_for_status()) - .map_err(|err| BinstallError::Http { - method: Method::GET, - url, - err, - }) - .map(Response::bytes_stream) + pub fn get_inner(&self) -> &reqwest::Client { + &self.client + } + + async fn send_request( + &self, + method: Method, + url: Url, + error_for_status: bool, + ) -> Result { + let request = Request::new(method.clone(), url.clone()); + + // Reduce critical section: + // - Construct the request before locking + // - Once the rate_limit is ready, call it and obtain + // the future, then release the lock before + // polling the future. + let future = self.rate_limit.lock().await.ready().await?.call(request); + + future + .await + .and_then(|response| { + if error_for_status { + response.error_for_status() + } else { + Ok(response) + } + }) + .map_err(|err| BinstallError::Http { method, url, err }) + } + + pub async fn remote_exists(&self, url: Url, method: Method) -> Result { + Ok(self + .send_request(method, url, false) + .await? + .status() + .is_success()) + } + + pub async fn get_redirected_final_url(&self, url: Url) -> Result { + Ok(self + .send_request(Method::HEAD, url, true) + .await? + .url() + .clone()) + } + + pub(crate) async fn create_request( + &self, + url: Url, + ) -> Result>, BinstallError> { + debug!("Downloading from: '{url}'"); + + self.send_request(Method::GET, url, true) + .await + .map(Response::bytes_stream) + } } diff --git a/crates/binstalk/src/ops/resolve.rs b/crates/binstalk/src/ops/resolve.rs index c56dfc87..59fa9ad8 100644 --- a/crates/binstalk/src/ops/resolve.rs +++ b/crates/binstalk/src/ops/resolve.rs @@ -10,7 +10,6 @@ use cargo_toml::{Manifest, Package, Product}; use compact_str::{CompactString, ToCompactString}; use itertools::Itertools; use log::{debug, info, warn}; -use reqwest::Client; use semver::{Version, VersionReq}; use tokio::task::block_in_place; @@ -20,7 +19,7 @@ use crate::{ drivers::fetch_crate_cratesio, errors::BinstallError, fetchers::{Data, Fetcher, GhCrateMeta, QuickInstall}, - helpers::tasks::AutoAbortJoinHandle, + helpers::{remote::Client, tasks::AutoAbortJoinHandle}, manifests::cargo_toml_binstall::{Meta, PkgMeta}, };