diff --git a/Cargo.lock b/Cargo.lock index 306b54da..df3815f9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -290,7 +290,6 @@ dependencies = [ "tokio", "tokio-tar", "tokio-util", - "tower", "tracing", "trust-dns-resolver", "url", @@ -3547,29 +3546,6 @@ dependencies = [ "winnow", ] -[[package]] -name = "tower" -version = "0.4.13" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b8fa9be0de6cf49e536ce1851f987bd21a43b771b09473c3549a6c853db37c1c" -dependencies = [ - "futures-core", - "futures-util", - "pin-project", - "pin-project-lite", - "tokio", - "tokio-util", - "tower-layer", - "tower-service", - "tracing", -] - -[[package]] -name = "tower-layer" -version = "0.3.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c20c8dbed6283a09604c3e69b4b7eeb54e298b8a600d4d5ecb5ad39de609f1d0" - [[package]] name = "tower-service" version = "0.3.2" @@ -3583,7 +3559,6 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8ce8c33a8d48bd45d624a6e523445fd21ec13d3653cd51f681abf67418f54eb8" dependencies = [ "cfg-if", - "log", "pin-project-lite", "tracing-attributes", "tracing-core", diff --git a/crates/bin/src/args.rs b/crates/bin/src/args.rs index b3189a94..b2e4f60d 100644 --- a/crates/bin/src/args.rs +++ b/crates/bin/src/args.rs @@ -2,7 +2,7 @@ use std::{ env, ffi::OsString, fmt, - num::{NonZeroU64, ParseIntError}, + num::{NonZeroU16, NonZeroU64, ParseIntError}, path::PathBuf, str::FromStr, }; @@ -348,7 +348,7 @@ impl From for remote::TLSVersion { #[derive(Copy, Clone, Debug)] pub struct RateLimit { - pub duration: NonZeroU64, + pub duration: NonZeroU16, pub request_count: NonZeroU64, } @@ -379,7 +379,7 @@ impl FromStr for RateLimit { impl Default for RateLimit { fn default() -> Self { Self { - duration: NonZeroU64::new(10).unwrap(), + duration: NonZeroU16::new(10).unwrap(), request_count: NonZeroU64::new(1).unwrap(), } } diff --git a/crates/bin/src/entry.rs b/crates/bin/src/entry.rs index a51f16b3..1005b390 100644 --- a/crates/bin/src/entry.rs +++ b/crates/bin/src/entry.rs @@ -4,7 +4,6 @@ use std::{ future::Future, path::{Path, PathBuf}, sync::Arc, - time::Duration, }; use binstalk::{ @@ -100,7 +99,7 @@ pub fn install_crates( let client = Client::new( concat!(env!("CARGO_PKG_NAME"), "/", env!("CARGO_PKG_VERSION")), args.min_tls_version.map(|v| v.into()), - Duration::from_millis(rate_limit.duration.get()), + rate_limit.duration, rate_limit.request_count, read_root_certs( args.root_certificates, diff --git a/crates/binstalk-downloader/Cargo.toml b/crates/binstalk-downloader/Cargo.toml index 9fe0372d..abf561fd 100644 --- a/crates/binstalk-downloader/Cargo.toml +++ b/crates/binstalk-downloader/Cargo.toml @@ -37,7 +37,6 @@ thiserror = "1.0.40" tokio = { version = "1.28.2", features = ["macros", "rt-multi-thread", "sync", "time", "fs"], default-features = false } tokio-tar = "0.3.0" tokio-util = { version = "0.7.8", features = ["io"] } -tower = { version = "0.4.13", features = ["limit", "util"] } tracing = "0.1.37" # trust-dns-resolver must be kept in sync with the version reqwest uses trust-dns-resolver = { version = "0.22.0", optional = true, default-features = false, features = ["dnssec-ring"] } diff --git a/crates/binstalk-downloader/src/download.rs b/crates/binstalk-downloader/src/download.rs index e70095b9..30388808 100644 --- a/crates/binstalk-downloader/src/download.rs +++ b/crates/binstalk-downloader/src/download.rs @@ -223,6 +223,7 @@ mod test { use std::{ collections::{HashMap, HashSet}, ffi::OsStr, + num::NonZeroU16, }; use tempfile::tempdir; @@ -231,7 +232,7 @@ mod test { let client = crate::remote::Client::new( concat!(env!("CARGO_PKG_NAME"), "/", env!("CARGO_PKG_VERSION")), None, - std::time::Duration::from_millis(10), + NonZeroU16::new(10).unwrap(), 1.try_into().unwrap(), [], ) diff --git a/crates/binstalk-downloader/src/gh_api_client.rs b/crates/binstalk-downloader/src/gh_api_client.rs index 10bedf6f..1439b2d5 100644 --- a/crates/binstalk-downloader/src/gh_api_client.rs +++ b/crates/binstalk-downloader/src/gh_api_client.rs @@ -21,7 +21,7 @@ mod request; pub use request::{GhApiContextError, GhApiError, GhGraphQLErrors}; /// default retry duration if x-ratelimit-reset is not found in response header -const DEFAULT_RETRY_DURATION: Duration = Duration::from_secs(5 * 60); +const DEFAULT_RETRY_DURATION: Duration = Duration::from_secs(10 * 60); fn percent_encode_http_url_path(path: &str) -> PercentEncode<'_> { /// https://url.spec.whatwg.org/#fragment-percent-encode-set @@ -289,7 +289,7 @@ pub enum HasReleaseArtifact { mod test { use super::*; use compact_str::{CompactString, ToCompactString}; - use std::env; + use std::{env, num::NonZeroU16}; mod cargo_binstall_v0_20_1 { use super::{CompactString, GhRelease}; @@ -383,7 +383,7 @@ mod test { let client = remote::Client::new( concat!(env!("CARGO_PKG_NAME"), "/", env!("CARGO_PKG_VERSION")), None, - Duration::from_millis(10), + NonZeroU16::new(10).unwrap(), 1.try_into().unwrap(), [], ) diff --git a/crates/binstalk-downloader/src/remote.rs b/crates/binstalk-downloader/src/remote.rs index abda9d66..7d9eceb7 100644 --- a/crates/binstalk-downloader/src/remote.rs +++ b/crates/binstalk-downloader/src/remote.rs @@ -1,5 +1,5 @@ use std::{ - num::{NonZeroU64, NonZeroU8}, + num::{NonZeroU16, NonZeroU64, NonZeroU8}, ops::ControlFlow, sync::Arc, time::{Duration, SystemTime}, @@ -13,7 +13,6 @@ use reqwest::{ Request, }; use thiserror::Error as ThisError; -use tower::{limit::rate::RateLimit, Service, ServiceBuilder, ServiceExt}; use tracing::{debug, info}; pub use reqwest::{header, Error as ReqwestError, Method, StatusCode}; @@ -73,7 +72,7 @@ impl HttpError { #[derive(Debug)] struct Inner { client: reqwest::Client, - service: DelayRequest>, + service: DelayRequest, } #[derive(Clone, Debug)] @@ -81,7 +80,9 @@ pub struct Client(Arc); #[cfg_attr(not(feature = "__tls"), allow(unused_variables, unused_mut))] impl Client { - /// * `per` - must not be 0. + /// * `per_millis` - The duration (in millisecond) for which at most + /// `num_request` can be sent, itcould be increased if rate-limit + /// happens. /// * `num_request` - maximum number of requests to be processed for /// each `per` duration. /// @@ -89,14 +90,14 @@ impl Client { pub fn new( user_agent: impl AsRef, min_tls: Option, - per: Duration, + per_millis: NonZeroU16, num_request: NonZeroU64, certificates: impl IntoIterator, ) -> Result { fn inner( user_agent: &str, min_tls: Option, - per: Duration, + per_millis: NonZeroU16, num_request: NonZeroU64, certificates: &mut dyn Iterator, ) -> Result { @@ -123,9 +124,9 @@ impl Client { Ok(Client(Arc::new(Inner { client: client.clone(), service: DelayRequest::new( - ServiceBuilder::new() - .rate_limit(num_request.get(), per) - .service(client), + num_request, + Duration::from_millis(per_millis.get() as u64), + client, ), }))) } @@ -133,7 +134,7 @@ impl Client { inner( user_agent.as_ref(), min_tls, - per, + per_millis, num_request, &mut certificates.into_iter(), ) @@ -159,9 +160,7 @@ impl Client { url: &Url, ) -> Result>, ReqwestError> { - let future = (&self.0.service).ready().await?.call(request); - - let response = match future.await { + let response = match self.0.service.call(request).await { Err(err) if err.is_timeout() || err.is_connect() => { let duration = RETRY_DURATION_FOR_TIMEOUT; diff --git a/crates/binstalk-downloader/src/remote/delay_request.rs b/crates/binstalk-downloader/src/remote/delay_request.rs index 8e20682b..3feb804d 100644 --- a/crates/binstalk-downloader/src/remote/delay_request.rs +++ b/crates/binstalk-downloader/src/remote/delay_request.rs @@ -1,19 +1,14 @@ use std::{ - collections::HashMap, - future::Future, - iter::Peekable, - pin::Pin, + collections::HashMap, future::Future, iter::Peekable, num::NonZeroU64, ops::ControlFlow, sync::Mutex, - task::{Context, Poll}, }; use compact_str::{CompactString, ToCompactString}; use reqwest::{Request, Url}; -use tokio::{ - sync::Mutex as AsyncMutex, - time::{sleep_until, Duration, Instant}, -}; -use tower::{Service, ServiceExt}; +use tokio::time::{sleep_until, Duration, Instant}; +use tracing::debug; + +pub(super) type RequestResult = Result; trait IterExt: Iterator { fn dedup(self) -> Dedup @@ -47,15 +42,107 @@ where } #[derive(Debug)] -pub(super) struct DelayRequest { - inner: AsyncMutex, +struct Inner { + client: reqwest::Client, + num_request: NonZeroU64, + per: Duration, + until: Instant, + state: State, +} + +#[derive(Debug)] +enum State { + Limited, + Ready { rem: NonZeroU64 }, +} + +impl Inner { + fn new(num_request: NonZeroU64, per: Duration, client: reqwest::Client) -> Self { + Inner { + client, + per, + num_request, + until: Instant::now() + per, + state: State::Ready { rem: num_request }, + } + } + + fn inc_rate_limit(&mut self) { + if let Some(num_request) = NonZeroU64::new(self.num_request.get() / 2) { + // If self.num_request.get() > 1, then cut it by half + self.num_request = num_request; + if let State::Ready { rem, .. } = &mut self.state { + *rem = num_request.min(*rem) + } + } + + let per = self.per; + if per < Duration::from_millis(700) { + self.per = per.mul_f32(1.2); + self.until += self.per - per; + } + } + + fn ready(&mut self) -> Readiness { + match self.state { + State::Ready { .. } => Readiness::Ready, + State::Limited => { + if self.until.elapsed().is_zero() { + Readiness::Limited(self.until) + } else { + // rate limit can be reset now and is ready + self.until = Instant::now() + self.per; + self.state = State::Ready { + rem: self.num_request, + }; + + Readiness::Ready + } + } + } + } + + fn call(&mut self, req: Request) -> impl Future { + match &mut self.state { + State::Ready { rem } => { + let now = Instant::now(); + + // If the period has elapsed, reset it. + if now >= self.until { + self.until = now + self.per; + *rem = self.num_request; + } + + if let Some(new_rem) = NonZeroU64::new(rem.get() - 1) { + *rem = new_rem; + } else { + // The service is disabled until further notice + self.state = State::Limited; + } + + // Call the inner future + self.client.execute(req) + } + State::Limited => panic!("service not ready; poll_ready must be called first"), + } + } +} + +enum Readiness { + Limited(Instant), + Ready, +} + +#[derive(Debug)] +pub(super) struct DelayRequest { + inner: Mutex, hosts_to_delay: Mutex>, } -impl DelayRequest { - pub(super) fn new(inner: S) -> Self { +impl DelayRequest { + pub(super) fn new(num_request: NonZeroU64, per: Duration, client: reqwest::Client) -> Self { Self { - inner: AsyncMutex::new(inner), + inner: Mutex::new(Inner::new(num_request, per, client)), hosts_to_delay: Default::default(), } } @@ -78,61 +165,81 @@ impl DelayRequest { }); } - fn wait_until_available(&self, url: &Url) -> impl Future + Send + 'static { + fn get_delay_until(&self, host: &str) -> Option { let mut hosts_to_delay = self.hosts_to_delay.lock().unwrap(); - let deadline = url - .host_str() - .and_then(|host| hosts_to_delay.get(host).map(|deadline| (*deadline, host))) - .and_then(|(deadline, host)| { - if deadline.elapsed().is_zero() { - Some(deadline) - } else { - // We have already gone past the deadline, - // so we should remove it instead. - hosts_to_delay.remove(host); - None - } - }); - - async move { - if let Some(deadline) = deadline { - sleep_until(deadline).await; + hosts_to_delay.get(host).copied().and_then(|until| { + if until.elapsed().is_zero() { + Some(until) + } else { + // We have already gone past the deadline, + // so we should remove it instead. + hosts_to_delay.remove(host); + None } - } - } -} - -impl<'this, S> Service for &'this DelayRequest -where - S: Service + Send, - S::Future: Send, -{ - type Response = S::Response; - type Error = S::Error; - // TODO: Replace this with `type_alias_impl_trait` once it stablises - // https://github.com/rust-lang/rust/issues/63063 - type Future = Pin> + Send + 'this>>; - - fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll> { - Poll::Ready(Ok(())) - } - - fn call(&mut self, req: Request) -> Self::Future { - let this = *self; - - Box::pin(async move { - this.wait_until_available(req.url()).await; - - // Reduce critical section: - // - Construct the request before locking - // - Once it is ready, call it and obtain - // the future, then release the lock before - // polling the future, which performs network I/O that could - // take really long. - let future = this.inner.lock().await.ready().await?.call(req); - - future.await }) } + + // Define a new function so that the guard will be dropped ASAP and not + // included in the future. + fn call_inner( + &self, + counter: &mut u32, + req: &mut Option, + ) -> ControlFlow, Instant> { + // Wait until we are ready to send next requests + // (client-side rate-limit throttler). + let mut guard = self.inner.lock().unwrap(); + + if let Readiness::Limited(until) = guard.ready() { + ControlFlow::Continue(until) + } else if let Some(until) = req + .as_ref() + .unwrap() + .url() + .host_str() + .and_then(|host| self.get_delay_until(host)) + { + // If the host rate-limit us, then wait until then + // and try again (server-side rate-limit throttler). + + // Try increasing client-side rate-limit throttler to prevent + // rate-limit in the future. + guard.inc_rate_limit(); + + let additional_delay = + Duration::from_millis(200) + Duration::from_millis(100) * 20.min(*counter); + + *counter += 1; + + debug!("server-side rate limit exceeded; sleeping."); + ControlFlow::Continue(until + additional_delay) + } else { + ControlFlow::Break(guard.call(req.take().unwrap())) + } + } + + pub(super) async fn call(&self, req: Request) -> RequestResult { + // Put all variables in a block so that will be dropped before polling + // the future returned by reqwest. + { + let mut counter = 0; + // Use Option here so that we don't have to move entire `Request` + // twice when calling `self.call_inner` while retain the ability to + // take its value without boxing. + // + // This will be taken when `ControlFlow::Break` is then it will + // break the loop, so it will never call `self.call_inner` with + // a `None`. + let mut req = Some(req); + + loop { + match self.call_inner(&mut counter, &mut req) { + ControlFlow::Continue(until) => sleep_until(until).await, + ControlFlow::Break(future) => break future, + } + } + } + .await + } } diff --git a/crates/binstalk/src/drivers/registry.rs b/crates/binstalk/src/drivers/registry.rs index 2e137e17..7938d98a 100644 --- a/crates/binstalk/src/drivers/registry.rs +++ b/crates/binstalk/src/drivers/registry.rs @@ -164,7 +164,7 @@ impl FromStr for Registry { #[cfg(test)] mod test { - use std::time::Duration; + use std::num::NonZeroU16; use toml_edit::ser::to_string; @@ -176,7 +176,7 @@ mod test { Client::new( concat!(env!("CARGO_PKG_NAME"), "/", env!("CARGO_PKG_VERSION")), None, - Duration::from_millis(10), + NonZeroU16::new(10).unwrap(), 1.try_into().unwrap(), [], ) diff --git a/crates/binstalk/src/fetchers/quickinstall.rs b/crates/binstalk/src/fetchers/quickinstall.rs index 273fd720..157ae679 100644 --- a/crates/binstalk/src/fetchers/quickinstall.rs +++ b/crates/binstalk/src/fetchers/quickinstall.rs @@ -208,7 +208,7 @@ impl QuickInstall { #[cfg(test)] mod test { use super::{get_quickinstall_supported_targets, Client, CompactString}; - use std::time::Duration; + use std::num::NonZeroU16; /// Mark this as an async fn so that you won't accidentally use it in /// sync context. @@ -216,7 +216,7 @@ mod test { Client::new( concat!(env!("CARGO_PKG_NAME"), "/", env!("CARGO_PKG_VERSION")), None, - Duration::from_millis(10), + NonZeroU16::new(10).unwrap(), 1.try_into().unwrap(), [], ) diff --git a/crates/binstalk/src/helpers/remote.rs b/crates/binstalk/src/helpers/remote.rs index e794c744..24635b89 100644 --- a/crates/binstalk/src/helpers/remote.rs +++ b/crates/binstalk/src/helpers/remote.rs @@ -2,6 +2,10 @@ pub use binstalk_downloader::remote::*; pub use url::ParseError as UrlParseError; use binstalk_downloader::gh_api_client::{GhApiClient, GhReleaseArtifact, HasReleaseArtifact}; +use std::sync::{ + atomic::{AtomicBool, Ordering::Relaxed}, + Once, +}; use tracing::{debug, warn}; use crate::errors::BinstallError; @@ -13,24 +17,36 @@ pub async fn does_url_exist( gh_api_client: GhApiClient, url: &Url, ) -> Result { + static GH_API_CLIENT_FAILED: AtomicBool = AtomicBool::new(false); + static WARN_RATE_LIMIT_ONCE: Once = Once::new(); + static WARN_UNAUTHORIZED_ONCE: Once = Once::new(); + debug!("Checking for package at: '{url}'"); - if let Some(artifact) = GhReleaseArtifact::try_extract_from_url(url) { - debug!("Using GitHub API to check for existence of artifact, which will also cache the API response"); + if !GH_API_CLIENT_FAILED.load(Relaxed) { + if let Some(artifact) = GhReleaseArtifact::try_extract_from_url(url) { + debug!("Using GitHub API to check for existence of artifact, which will also cache the API response"); - // The future returned has the same size as a pointer - match gh_api_client.has_release_artifact(artifact).await? { - HasReleaseArtifact::Yes => return Ok(true), - HasReleaseArtifact::No | HasReleaseArtifact::NoSuchRelease => return Ok(false), + // The future returned has the same size as a pointer + match gh_api_client.has_release_artifact(artifact).await? { + HasReleaseArtifact::Yes => return Ok(true), + HasReleaseArtifact::No | HasReleaseArtifact::NoSuchRelease => return Ok(false), - HasReleaseArtifact::RateLimit { retry_after } => { - warn!("Your GitHub API token (if any) has reached its rate limit and cannot be used again until {retry_after:?}, so we will fallback to HEAD/GET on the url."); - warn!("If you did not supply a github token, consider doing so: GitHub limits unauthorized users to 60 requests per hour per origin IP address."); - } - HasReleaseArtifact::Unauthorized => { - warn!("GitHub API somehow requires a token for the API access, so we will fallback to HEAD/GET on the url."); - warn!("Please consider supplying a token to cargo-binstall to speedup resolution."); + HasReleaseArtifact::RateLimit { retry_after } => { + WARN_RATE_LIMIT_ONCE.call_once(|| { + warn!("Your GitHub API token (if any) has reached its rate limit and cannot be used again until {retry_after:?}, so we will fallback to HEAD/GET on the url."); + warn!("If you did not supply a github token, consider doing so: GitHub limits unauthorized users to 60 requests per hour per origin IP address."); + }); + } + HasReleaseArtifact::Unauthorized => { + WARN_UNAUTHORIZED_ONCE.call_once(|| { + warn!("GitHub API somehow requires a token for the API access, so we will fallback to HEAD/GET on the url."); + warn!("Please consider supplying a token to cargo-binstall to speedup resolution."); + }); + } } + + GH_API_CLIENT_FAILED.store(true, Relaxed); } }