mirror of
https://github.com/cargo-bins/cargo-binstall.git
synced 2025-04-24 22:30:03 +00:00
Fix "Too many "Too Many Requests" log" (#761)
Fixed #747 - Add dep compact_str v0.6.1 to binstalk-downloader - Impl new type `DelayRequest` - Handle 503/429 with wait duration > `MAX_RETRY_DURATION` by simply taking the min - Fix `Client::send_request_inner`: Ensure 503/429 get propagated to other requests even if the current requests reach its maximum retry and decides to simply return an error. Signed-off-by: Jiahao XU <Jiahao_XU@outlook.com>
This commit is contained in:
parent
e510511487
commit
f2fc37eea5
4 changed files with 134 additions and 12 deletions
1
Cargo.lock
generated
1
Cargo.lock
generated
|
@ -211,6 +211,7 @@ dependencies = [
|
||||||
"binstall-tar",
|
"binstall-tar",
|
||||||
"bytes",
|
"bytes",
|
||||||
"bzip2",
|
"bzip2",
|
||||||
|
"compact_str",
|
||||||
"digest",
|
"digest",
|
||||||
"flate2",
|
"flate2",
|
||||||
"futures-util",
|
"futures-util",
|
||||||
|
|
|
@ -16,6 +16,7 @@ async_zip = { version = "0.0.9", features = ["deflate", "bzip2", "lzma", "zstd",
|
||||||
binstalk-types = { version = "0.2.0", path = "../binstalk-types" }
|
binstalk-types = { version = "0.2.0", path = "../binstalk-types" }
|
||||||
bytes = "1.4.0"
|
bytes = "1.4.0"
|
||||||
bzip2 = "0.4.4"
|
bzip2 = "0.4.4"
|
||||||
|
compact_str = "0.6.1"
|
||||||
digest = "0.10.6"
|
digest = "0.10.6"
|
||||||
flate2 = { version = "1.0.25", default-features = false }
|
flate2 = { version = "1.0.25", default-features = false }
|
||||||
futures-util = { version = "0.3.26", default-features = false, features = ["std"] }
|
futures-util = { version = "0.3.26", default-features = false, features = ["std"] }
|
||||||
|
|
|
@ -12,13 +12,16 @@ use reqwest::{
|
||||||
Request, Response, StatusCode,
|
Request, Response, StatusCode,
|
||||||
};
|
};
|
||||||
use thiserror::Error as ThisError;
|
use thiserror::Error as ThisError;
|
||||||
use tokio::{sync::Mutex, time::sleep};
|
use tokio::time::Instant;
|
||||||
use tower::{limit::rate::RateLimit, Service, ServiceBuilder, ServiceExt};
|
use tower::{limit::rate::RateLimit, Service, ServiceBuilder, ServiceExt};
|
||||||
use tracing::{debug, info};
|
use tracing::{debug, info};
|
||||||
|
|
||||||
pub use reqwest::{tls, Error as ReqwestError, Method};
|
pub use reqwest::{tls, Error as ReqwestError, Method};
|
||||||
pub use url::Url;
|
pub use url::Url;
|
||||||
|
|
||||||
|
mod delay_request;
|
||||||
|
use delay_request::DelayRequest;
|
||||||
|
|
||||||
const MAX_RETRY_DURATION: Duration = Duration::from_secs(120);
|
const MAX_RETRY_DURATION: Duration = Duration::from_secs(120);
|
||||||
const MAX_RETRY_COUNT: u8 = 3;
|
const MAX_RETRY_COUNT: u8 = 3;
|
||||||
const DEFAULT_MIN_TLS: tls::Version = tls::Version::TLS_1_2;
|
const DEFAULT_MIN_TLS: tls::Version = tls::Version::TLS_1_2;
|
||||||
|
@ -44,7 +47,7 @@ pub struct HttpError {
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
struct Inner {
|
struct Inner {
|
||||||
client: reqwest::Client,
|
client: reqwest::Client,
|
||||||
rate_limit: Mutex<RateLimit<reqwest::Client>>,
|
service: DelayRequest<RateLimit<reqwest::Client>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Clone, Debug)]
|
#[derive(Clone, Debug)]
|
||||||
|
@ -81,7 +84,7 @@ impl Client {
|
||||||
|
|
||||||
Ok(Client(Arc::new(Inner {
|
Ok(Client(Arc::new(Inner {
|
||||||
client: client.clone(),
|
client: client.clone(),
|
||||||
rate_limit: Mutex::new(
|
service: DelayRequest::new(
|
||||||
ServiceBuilder::new()
|
ServiceBuilder::new()
|
||||||
.rate_limit(num_request.get(), per)
|
.rate_limit(num_request.get(), per)
|
||||||
.service(client),
|
.service(client),
|
||||||
|
@ -107,13 +110,7 @@ impl Client {
|
||||||
loop {
|
loop {
|
||||||
let request = Request::new(method.clone(), url.clone());
|
let request = Request::new(method.clone(), url.clone());
|
||||||
|
|
||||||
// Reduce critical section:
|
let future = (&self.0.service).ready().await?.call(request);
|
||||||
// - Construct the request before locking
|
|
||||||
// - Once the rate_limit is ready, call it and obtain
|
|
||||||
// the future, then release the lock before
|
|
||||||
// polling the future, which performs network I/O that could
|
|
||||||
// take really long.
|
|
||||||
let future = self.0.rate_limit.lock().await.ready().await?.call(request);
|
|
||||||
|
|
||||||
let response = future.await?;
|
let response = future.await?;
|
||||||
|
|
||||||
|
@ -124,9 +121,20 @@ impl Client {
|
||||||
// 503 429
|
// 503 429
|
||||||
StatusCode::SERVICE_UNAVAILABLE | StatusCode::TOO_MANY_REQUESTS,
|
StatusCode::SERVICE_UNAVAILABLE | StatusCode::TOO_MANY_REQUESTS,
|
||||||
Some(duration),
|
Some(duration),
|
||||||
) if duration <= MAX_RETRY_DURATION && count < MAX_RETRY_COUNT => {
|
) => {
|
||||||
|
let duration = duration.min(MAX_RETRY_DURATION);
|
||||||
|
|
||||||
info!("Receiver status code {status}, will wait for {duration:#?} and retry");
|
info!("Receiver status code {status}, will wait for {duration:#?} and retry");
|
||||||
sleep(duration).await
|
|
||||||
|
let deadline = Instant::now() + duration;
|
||||||
|
|
||||||
|
self.0
|
||||||
|
.service
|
||||||
|
.add_urls_to_delay(dedup([url, response.url()]), deadline);
|
||||||
|
|
||||||
|
if count >= MAX_RETRY_COUNT {
|
||||||
|
break Ok(response);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
_ => break Ok(response),
|
_ => break Ok(response),
|
||||||
}
|
}
|
||||||
|
@ -211,3 +219,11 @@ fn parse_header_retry_after(headers: &HeaderMap) -> Option<Duration> {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn dedup(urls: [&Url; 2]) -> impl Iterator<Item = &Url> {
|
||||||
|
if urls[0] == urls[1] {
|
||||||
|
Some(urls[0]).into_iter().chain(None)
|
||||||
|
} else {
|
||||||
|
Some(urls[0]).into_iter().chain(Some(urls[1]))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
104
crates/binstalk-downloader/src/remote/delay_request.rs
Normal file
104
crates/binstalk-downloader/src/remote/delay_request.rs
Normal file
|
@ -0,0 +1,104 @@
|
||||||
|
use std::{
|
||||||
|
collections::HashMap,
|
||||||
|
future::Future,
|
||||||
|
pin::Pin,
|
||||||
|
sync::Mutex,
|
||||||
|
task::{Context, Poll},
|
||||||
|
};
|
||||||
|
|
||||||
|
use compact_str::{CompactString, ToCompactString};
|
||||||
|
use reqwest::{Request, Url};
|
||||||
|
use tokio::{
|
||||||
|
sync::Mutex as AsyncMutex,
|
||||||
|
time::{sleep_until, Instant},
|
||||||
|
};
|
||||||
|
use tower::{Service, ServiceExt};
|
||||||
|
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub(super) struct DelayRequest<S> {
|
||||||
|
inner: AsyncMutex<S>,
|
||||||
|
hosts_to_delay: Mutex<HashMap<CompactString, Instant>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<S> DelayRequest<S> {
|
||||||
|
pub(super) fn new(inner: S) -> Self {
|
||||||
|
Self {
|
||||||
|
inner: AsyncMutex::new(inner),
|
||||||
|
hosts_to_delay: Default::default(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(super) fn add_urls_to_delay<'a, Urls>(&self, urls: Urls, deadline: Instant)
|
||||||
|
where
|
||||||
|
Urls: IntoIterator<Item = &'a Url>,
|
||||||
|
{
|
||||||
|
let mut hosts_to_delay = self.hosts_to_delay.lock().unwrap();
|
||||||
|
|
||||||
|
urls.into_iter().filter_map(Url::host_str).for_each(|host| {
|
||||||
|
hosts_to_delay
|
||||||
|
.entry(host.to_compact_string())
|
||||||
|
.and_modify(|old_dl| {
|
||||||
|
*old_dl = deadline.max(*old_dl);
|
||||||
|
})
|
||||||
|
.or_insert(deadline);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
fn wait_until_available(&self, url: &Url) -> impl Future<Output = ()> + Send + 'static {
|
||||||
|
let mut hosts_to_delay = self.hosts_to_delay.lock().unwrap();
|
||||||
|
|
||||||
|
let sleep = url
|
||||||
|
.host_str()
|
||||||
|
.and_then(|host| hosts_to_delay.get(host).map(|deadline| (*deadline, host)))
|
||||||
|
.and_then(|(deadline, host)| {
|
||||||
|
if deadline.elapsed().is_zero() {
|
||||||
|
Some(sleep_until(deadline))
|
||||||
|
} else {
|
||||||
|
// We have already gone past the deadline,
|
||||||
|
// so we should remove it instead.
|
||||||
|
hosts_to_delay.remove(host);
|
||||||
|
None
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
async move {
|
||||||
|
if let Some(sleep) = sleep {
|
||||||
|
sleep.await;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<'this, S> Service<Request> for &'this DelayRequest<S>
|
||||||
|
where
|
||||||
|
S: Service<Request> + Send,
|
||||||
|
S::Future: Send,
|
||||||
|
{
|
||||||
|
type Response = S::Response;
|
||||||
|
type Error = S::Error;
|
||||||
|
// TODO: Replace this with `type_alias_impl_trait` once it stablises
|
||||||
|
// https://github.com/rust-lang/rust/issues/63063
|
||||||
|
type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send + 'this>>;
|
||||||
|
|
||||||
|
fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
|
||||||
|
Poll::Ready(Ok(()))
|
||||||
|
}
|
||||||
|
|
||||||
|
fn call(&mut self, req: Request) -> Self::Future {
|
||||||
|
let this = *self;
|
||||||
|
|
||||||
|
Box::pin(async move {
|
||||||
|
this.wait_until_available(req.url()).await;
|
||||||
|
|
||||||
|
// Reduce critical section:
|
||||||
|
// - Construct the request before locking
|
||||||
|
// - Once it is ready, call it and obtain
|
||||||
|
// the future, then release the lock before
|
||||||
|
// polling the future, which performs network I/O that could
|
||||||
|
// take really long.
|
||||||
|
let future = this.inner.lock().await.ready().await?.call(req);
|
||||||
|
|
||||||
|
future.await
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
Loading…
Add table
Add a link
Reference in a new issue