From f2fc37eea5bc219fb53416d755dd92fb26dc59ab Mon Sep 17 00:00:00 2001 From: Jiahao XU Date: Sat, 4 Feb 2023 11:14:53 +1100 Subject: [PATCH] Fix "Too many "Too Many Requests" log" (#761) Fixed #747 - Add dep compact_str v0.6.1 to binstalk-downloader - Impl new type `DelayRequest` - Handle 503/429 with wait duration > `MAX_RETRY_DURATION` by simply taking the min - Fix `Client::send_request_inner`: Ensure 503/429 get propagated to other requests even if the current requests reach its maximum retry and decides to simply return an error. Signed-off-by: Jiahao XU --- Cargo.lock | 1 + crates/binstalk-downloader/Cargo.toml | 1 + crates/binstalk-downloader/src/remote.rs | 40 +++++-- .../src/remote/delay_request.rs | 104 ++++++++++++++++++ 4 files changed, 134 insertions(+), 12 deletions(-) create mode 100644 crates/binstalk-downloader/src/remote/delay_request.rs diff --git a/Cargo.lock b/Cargo.lock index b09d5784..7251166c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -211,6 +211,7 @@ dependencies = [ "binstall-tar", "bytes", "bzip2", + "compact_str", "digest", "flate2", "futures-util", diff --git a/crates/binstalk-downloader/Cargo.toml b/crates/binstalk-downloader/Cargo.toml index 41f79917..c7ff4790 100644 --- a/crates/binstalk-downloader/Cargo.toml +++ b/crates/binstalk-downloader/Cargo.toml @@ -16,6 +16,7 @@ async_zip = { version = "0.0.9", features = ["deflate", "bzip2", "lzma", "zstd", binstalk-types = { version = "0.2.0", path = "../binstalk-types" } bytes = "1.4.0" bzip2 = "0.4.4" +compact_str = "0.6.1" digest = "0.10.6" flate2 = { version = "1.0.25", default-features = false } futures-util = { version = "0.3.26", default-features = false, features = ["std"] } diff --git a/crates/binstalk-downloader/src/remote.rs b/crates/binstalk-downloader/src/remote.rs index 5632c19f..53f69170 100644 --- a/crates/binstalk-downloader/src/remote.rs +++ b/crates/binstalk-downloader/src/remote.rs @@ -12,13 +12,16 @@ use reqwest::{ Request, Response, StatusCode, }; use thiserror::Error as ThisError; -use tokio::{sync::Mutex, time::sleep}; +use tokio::time::Instant; use tower::{limit::rate::RateLimit, Service, ServiceBuilder, ServiceExt}; use tracing::{debug, info}; pub use reqwest::{tls, Error as ReqwestError, Method}; pub use url::Url; +mod delay_request; +use delay_request::DelayRequest; + const MAX_RETRY_DURATION: Duration = Duration::from_secs(120); const MAX_RETRY_COUNT: u8 = 3; const DEFAULT_MIN_TLS: tls::Version = tls::Version::TLS_1_2; @@ -44,7 +47,7 @@ pub struct HttpError { #[derive(Debug)] struct Inner { client: reqwest::Client, - rate_limit: Mutex>, + service: DelayRequest>, } #[derive(Clone, Debug)] @@ -81,7 +84,7 @@ impl Client { Ok(Client(Arc::new(Inner { client: client.clone(), - rate_limit: Mutex::new( + service: DelayRequest::new( ServiceBuilder::new() .rate_limit(num_request.get(), per) .service(client), @@ -107,13 +110,7 @@ impl Client { loop { let request = Request::new(method.clone(), url.clone()); - // Reduce critical section: - // - Construct the request before locking - // - Once the rate_limit is ready, call it and obtain - // the future, then release the lock before - // polling the future, which performs network I/O that could - // take really long. - let future = self.0.rate_limit.lock().await.ready().await?.call(request); + let future = (&self.0.service).ready().await?.call(request); let response = future.await?; @@ -124,9 +121,20 @@ impl Client { // 503 429 StatusCode::SERVICE_UNAVAILABLE | StatusCode::TOO_MANY_REQUESTS, Some(duration), - ) if duration <= MAX_RETRY_DURATION && count < MAX_RETRY_COUNT => { + ) => { + let duration = duration.min(MAX_RETRY_DURATION); + info!("Receiver status code {status}, will wait for {duration:#?} and retry"); - sleep(duration).await + + let deadline = Instant::now() + duration; + + self.0 + .service + .add_urls_to_delay(dedup([url, response.url()]), deadline); + + if count >= MAX_RETRY_COUNT { + break Ok(response); + } } _ => break Ok(response), } @@ -211,3 +219,11 @@ fn parse_header_retry_after(headers: &HeaderMap) -> Option { } } } + +fn dedup(urls: [&Url; 2]) -> impl Iterator { + if urls[0] == urls[1] { + Some(urls[0]).into_iter().chain(None) + } else { + Some(urls[0]).into_iter().chain(Some(urls[1])) + } +} diff --git a/crates/binstalk-downloader/src/remote/delay_request.rs b/crates/binstalk-downloader/src/remote/delay_request.rs new file mode 100644 index 00000000..64af08a9 --- /dev/null +++ b/crates/binstalk-downloader/src/remote/delay_request.rs @@ -0,0 +1,104 @@ +use std::{ + collections::HashMap, + future::Future, + pin::Pin, + sync::Mutex, + task::{Context, Poll}, +}; + +use compact_str::{CompactString, ToCompactString}; +use reqwest::{Request, Url}; +use tokio::{ + sync::Mutex as AsyncMutex, + time::{sleep_until, Instant}, +}; +use tower::{Service, ServiceExt}; + +#[derive(Debug)] +pub(super) struct DelayRequest { + inner: AsyncMutex, + hosts_to_delay: Mutex>, +} + +impl DelayRequest { + pub(super) fn new(inner: S) -> Self { + Self { + inner: AsyncMutex::new(inner), + hosts_to_delay: Default::default(), + } + } + + pub(super) fn add_urls_to_delay<'a, Urls>(&self, urls: Urls, deadline: Instant) + where + Urls: IntoIterator, + { + let mut hosts_to_delay = self.hosts_to_delay.lock().unwrap(); + + urls.into_iter().filter_map(Url::host_str).for_each(|host| { + hosts_to_delay + .entry(host.to_compact_string()) + .and_modify(|old_dl| { + *old_dl = deadline.max(*old_dl); + }) + .or_insert(deadline); + }); + } + + fn wait_until_available(&self, url: &Url) -> impl Future + Send + 'static { + let mut hosts_to_delay = self.hosts_to_delay.lock().unwrap(); + + let sleep = url + .host_str() + .and_then(|host| hosts_to_delay.get(host).map(|deadline| (*deadline, host))) + .and_then(|(deadline, host)| { + if deadline.elapsed().is_zero() { + Some(sleep_until(deadline)) + } else { + // We have already gone past the deadline, + // so we should remove it instead. + hosts_to_delay.remove(host); + None + } + }); + + async move { + if let Some(sleep) = sleep { + sleep.await; + } + } + } +} + +impl<'this, S> Service for &'this DelayRequest +where + S: Service + Send, + S::Future: Send, +{ + type Response = S::Response; + type Error = S::Error; + // TODO: Replace this with `type_alias_impl_trait` once it stablises + // https://github.com/rust-lang/rust/issues/63063 + type Future = Pin> + Send + 'this>>; + + fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll> { + Poll::Ready(Ok(())) + } + + fn call(&mut self, req: Request) -> Self::Future { + let this = *self; + + Box::pin(async move { + this.wait_until_available(req.url()).await; + + // Reduce critical section: + // - Construct the request before locking + // - Once it is ready, call it and obtain + // the future, then release the lock before + // polling the future, which performs network I/O that could + // take really long. + let future = this.inner.lock().await.ready().await?.call(req); + + future.await + }) + } +}