diff --git a/Cargo.lock b/Cargo.lock index ab07d4b3..81b211e3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -124,6 +124,7 @@ dependencies = [ "futures-util", "generic-array", "home", + "httpdate", "itertools", "jobslot", "log", diff --git a/crates/binstalk/Cargo.toml b/crates/binstalk/Cargo.toml index 6ef1cf5b..0b53625b 100644 --- a/crates/binstalk/Cargo.toml +++ b/crates/binstalk/Cargo.toml @@ -23,6 +23,7 @@ fs-lock = { version = "0.1.0", path = "../fs-lock" } futures-util = { version = "0.3.23", default-features = false, features = ["std"] } generic-array = "0.14.6" home = "0.5.3" +httpdate = "1.0.2" itertools = "0.10.5" jobslot = { version = "0.2.6", features = ["tokio"] } log = { version = "0.4.17", features = ["std"] } @@ -45,7 +46,7 @@ tar = { package = "binstall-tar", version = "0.4.39" } tempfile = "3.3.0" thiserror = "1.0.37" tinytemplate = "1.2.1" -tokio = { version = "1.21.2", features = ["macros", "rt", "process", "sync", "signal"], default-features = false } +tokio = { version = "1.21.2", features = ["macros", "rt", "process", "sync", "signal", "time"], default-features = false } toml_edit = { version = "0.14.4", features = ["easy"] } tower = { version = "0.4.13", features = ["limit", "util"] } trust-dns-resolver = { version = "0.21.2", optional = true, default-features = false, features = ["dnssec-ring"] } diff --git a/crates/binstalk/src/helpers/remote.rs b/crates/binstalk/src/helpers/remote.rs index d009939d..f1ce9f6a 100644 --- a/crates/binstalk/src/helpers/remote.rs +++ b/crates/binstalk/src/helpers/remote.rs @@ -1,10 +1,19 @@ -use std::{env, num::NonZeroU64, sync::Arc, time::Duration}; +use std::{ + env, + num::NonZeroU64, + sync::Arc, + time::{Duration, SystemTime}, +}; use bytes::Bytes; use futures_util::stream::Stream; -use log::debug; -use reqwest::{Request, Response}; -use tokio::sync::Mutex; +use httpdate::parse_http_date; +use log::{debug, info}; +use reqwest::{ + header::{HeaderMap, RETRY_AFTER}, + Request, Response, StatusCode, +}; +use tokio::{sync::Mutex, time::sleep}; use tower::{limit::rate::RateLimit, Service, ServiceBuilder, ServiceExt}; use crate::errors::BinstallError; @@ -12,6 +21,9 @@ use crate::errors::BinstallError; pub use reqwest::{tls, Method}; pub use url::Url; +const MAX_RETRY_DURATION: Duration = Duration::from_secs(120); +const MAX_RETRY_COUNT: u8 = 3; + #[derive(Clone, Debug)] pub struct Client { client: reqwest::Client, @@ -53,22 +65,51 @@ impl Client { &self.client } + async fn send_request_inner( + &self, + method: &Method, + url: &Url, + ) -> Result { + let mut count = 0; + + loop { + let request = Request::new(method.clone(), url.clone()); + + // Reduce critical section: + // - Construct the request before locking + // - Once the rate_limit is ready, call it and obtain + // the future, then release the lock before + // polling the future, which performs network I/O that could + // take really long. + let future = self.rate_limit.lock().await.ready().await?.call(request); + + let response = future.await?; + + let status = response.status(); + + match (status, parse_header_retry_after(response.headers())) { + ( + // 503 429 + StatusCode::SERVICE_UNAVAILABLE | StatusCode::TOO_MANY_REQUESTS, + Some(duration), + ) if duration <= MAX_RETRY_DURATION && count < MAX_RETRY_COUNT => { + info!("Receiver status code {status}, will wait for {duration:#?} and retry"); + sleep(duration).await + } + _ => break Ok(response), + } + + count += 1; + } + } + async fn send_request( &self, method: Method, url: Url, error_for_status: bool, ) -> Result { - let request = Request::new(method.clone(), url.clone()); - - // Reduce critical section: - // - Construct the request before locking - // - Once the rate_limit is ready, call it and obtain - // the future, then release the lock before - // polling the future. - let future = self.rate_limit.lock().await.ready().await?.call(request); - - future + self.send_request_inner(&method, &url) .await .and_then(|response| { if error_for_status { @@ -107,3 +148,30 @@ impl Client { .map(Response::bytes_stream) } } + +fn parse_header_retry_after(headers: &HeaderMap) -> Option { + let header = headers + .get_all(RETRY_AFTER) + .into_iter() + .last()? + .to_str() + .ok()?; + + match header.parse::() { + Ok(dur) => Some(Duration::from_secs(dur)), + Err(_) => { + let system_time = parse_http_date(header).ok()?; + + let retry_after_unix_timestamp = + system_time.duration_since(SystemTime::UNIX_EPOCH).ok()?; + + let curr_time_unix_timestamp = SystemTime::now() + .duration_since(SystemTime::UNIX_EPOCH) + .expect("SystemTime before UNIX EPOCH!"); + + // retry_after_unix_timestamp - curr_time_unix_timestamp + // If underflows, returns Duration::ZERO. + Some(retry_after_unix_timestamp.saturating_sub(curr_time_unix_timestamp)) + } + } +}