Impl retry for 503 and 429 status code in response (#473)

* Add new dep httpdate v1.0.2
* Enable feature time of dep tokio in binstalk
* Impl retry for `helpers::remote::Client`

Fixed #472 

Signed-off-by: Jiahao XU <Jiahao_XU@outlook.com>
This commit is contained in:
Jiahao XU 2022-10-10 15:54:29 +11:00 committed by GitHub
parent 4e1ac6ee0d
commit e605a99113
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 85 additions and 15 deletions

1
Cargo.lock generated
View file

@ -124,6 +124,7 @@ dependencies = [
"futures-util", "futures-util",
"generic-array", "generic-array",
"home", "home",
"httpdate",
"itertools", "itertools",
"jobslot", "jobslot",
"log", "log",

View file

@ -23,6 +23,7 @@ fs-lock = { version = "0.1.0", path = "../fs-lock" }
futures-util = { version = "0.3.23", default-features = false, features = ["std"] } futures-util = { version = "0.3.23", default-features = false, features = ["std"] }
generic-array = "0.14.6" generic-array = "0.14.6"
home = "0.5.3" home = "0.5.3"
httpdate = "1.0.2"
itertools = "0.10.5" itertools = "0.10.5"
jobslot = { version = "0.2.6", features = ["tokio"] } jobslot = { version = "0.2.6", features = ["tokio"] }
log = { version = "0.4.17", features = ["std"] } log = { version = "0.4.17", features = ["std"] }
@ -45,7 +46,7 @@ tar = { package = "binstall-tar", version = "0.4.39" }
tempfile = "3.3.0" tempfile = "3.3.0"
thiserror = "1.0.37" thiserror = "1.0.37"
tinytemplate = "1.2.1" tinytemplate = "1.2.1"
tokio = { version = "1.21.2", features = ["macros", "rt", "process", "sync", "signal"], default-features = false } tokio = { version = "1.21.2", features = ["macros", "rt", "process", "sync", "signal", "time"], default-features = false }
toml_edit = { version = "0.14.4", features = ["easy"] } toml_edit = { version = "0.14.4", features = ["easy"] }
tower = { version = "0.4.13", features = ["limit", "util"] } tower = { version = "0.4.13", features = ["limit", "util"] }
trust-dns-resolver = { version = "0.21.2", optional = true, default-features = false, features = ["dnssec-ring"] } trust-dns-resolver = { version = "0.21.2", optional = true, default-features = false, features = ["dnssec-ring"] }

View file

@ -1,10 +1,19 @@
use std::{env, num::NonZeroU64, sync::Arc, time::Duration}; use std::{
env,
num::NonZeroU64,
sync::Arc,
time::{Duration, SystemTime},
};
use bytes::Bytes; use bytes::Bytes;
use futures_util::stream::Stream; use futures_util::stream::Stream;
use log::debug; use httpdate::parse_http_date;
use reqwest::{Request, Response}; use log::{debug, info};
use tokio::sync::Mutex; use reqwest::{
header::{HeaderMap, RETRY_AFTER},
Request, Response, StatusCode,
};
use tokio::{sync::Mutex, time::sleep};
use tower::{limit::rate::RateLimit, Service, ServiceBuilder, ServiceExt}; use tower::{limit::rate::RateLimit, Service, ServiceBuilder, ServiceExt};
use crate::errors::BinstallError; use crate::errors::BinstallError;
@ -12,6 +21,9 @@ use crate::errors::BinstallError;
pub use reqwest::{tls, Method}; pub use reqwest::{tls, Method};
pub use url::Url; pub use url::Url;
const MAX_RETRY_DURATION: Duration = Duration::from_secs(120);
const MAX_RETRY_COUNT: u8 = 3;
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
pub struct Client { pub struct Client {
client: reqwest::Client, client: reqwest::Client,
@ -53,22 +65,51 @@ impl Client {
&self.client &self.client
} }
async fn send_request( async fn send_request_inner(
&self, &self,
method: Method, method: &Method,
url: Url, url: &Url,
error_for_status: bool, ) -> Result<Response, reqwest::Error> {
) -> Result<Response, BinstallError> { let mut count = 0;
loop {
let request = Request::new(method.clone(), url.clone()); let request = Request::new(method.clone(), url.clone());
// Reduce critical section: // Reduce critical section:
// - Construct the request before locking // - Construct the request before locking
// - Once the rate_limit is ready, call it and obtain // - Once the rate_limit is ready, call it and obtain
// the future, then release the lock before // the future, then release the lock before
// polling the future. // polling the future, which performs network I/O that could
// take really long.
let future = self.rate_limit.lock().await.ready().await?.call(request); let future = self.rate_limit.lock().await.ready().await?.call(request);
future let response = future.await?;
let status = response.status();
match (status, parse_header_retry_after(response.headers())) {
(
// 503 429
StatusCode::SERVICE_UNAVAILABLE | StatusCode::TOO_MANY_REQUESTS,
Some(duration),
) if duration <= MAX_RETRY_DURATION && count < MAX_RETRY_COUNT => {
info!("Receiver status code {status}, will wait for {duration:#?} and retry");
sleep(duration).await
}
_ => break Ok(response),
}
count += 1;
}
}
async fn send_request(
&self,
method: Method,
url: Url,
error_for_status: bool,
) -> Result<Response, BinstallError> {
self.send_request_inner(&method, &url)
.await .await
.and_then(|response| { .and_then(|response| {
if error_for_status { if error_for_status {
@ -107,3 +148,30 @@ impl Client {
.map(Response::bytes_stream) .map(Response::bytes_stream)
} }
} }
fn parse_header_retry_after(headers: &HeaderMap) -> Option<Duration> {
let header = headers
.get_all(RETRY_AFTER)
.into_iter()
.last()?
.to_str()
.ok()?;
match header.parse::<u64>() {
Ok(dur) => Some(Duration::from_secs(dur)),
Err(_) => {
let system_time = parse_http_date(header).ok()?;
let retry_after_unix_timestamp =
system_time.duration_since(SystemTime::UNIX_EPOCH).ok()?;
let curr_time_unix_timestamp = SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.expect("SystemTime before UNIX EPOCH!");
// retry_after_unix_timestamp - curr_time_unix_timestamp
// If underflows, returns Duration::ZERO.
Some(retry_after_unix_timestamp.saturating_sub(curr_time_unix_timestamp))
}
}
}