Fix rate limit checking in GhApiClient (#1725)

* Fix rate limit checking in `GhApiClient`

 - Mv logic into `binstalk_downloader`
 - Check for `RETRY_AFTER` and `x-ratelimit-remaining` on any status code

Signed-off-by: Jiahao XU <Jiahao_XU@outlook.com>

* Fix clippy lint

Signed-off-by: Jiahao XU <Jiahao_XU@outlook.com>

---------

Signed-off-by: Jiahao XU <Jiahao_XU@outlook.com>
This commit is contained in:
Jiahao XU 2024-06-11 22:37:48 +10:00 committed by GitHub
parent 3aae883467
commit 238e0f6318
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
3 changed files with 40 additions and 43 deletions

View file

@ -9,7 +9,7 @@ use bytes::Bytes;
use futures_util::Stream; use futures_util::Stream;
use httpdate::parse_http_date; use httpdate::parse_http_date;
use reqwest::{ use reqwest::{
header::{HeaderMap, RETRY_AFTER}, header::{HeaderMap, HeaderValue, RETRY_AFTER},
Request, Request,
}; };
use thiserror::Error as ThisError; use thiserror::Error as ThisError;
@ -172,6 +172,8 @@ impl Client {
url: &Url, url: &Url,
) -> Result<ControlFlow<reqwest::Response, Result<reqwest::Response, ReqwestError>>, ReqwestError> ) -> Result<ControlFlow<reqwest::Response, Result<reqwest::Response, ReqwestError>>, ReqwestError>
{ {
static HEADER_VALUE_0: HeaderValue = HeaderValue::from_static("0");
let response = match self.0.service.call(request).await { let response = match self.0.service.call(request).await {
Err(err) if err.is_timeout() || err.is_connect() => { Err(err) if err.is_timeout() || err.is_connect() => {
let duration = RETRY_DURATION_FOR_TIMEOUT; let duration = RETRY_DURATION_FOR_TIMEOUT;
@ -197,22 +199,37 @@ impl Client {
Ok(ControlFlow::Continue(Ok(response))) Ok(ControlFlow::Continue(Ok(response)))
}; };
match status { let headers = response.headers();
// Delay further request on rate limit
StatusCode::SERVICE_UNAVAILABLE | StatusCode::TOO_MANY_REQUESTS => {
let duration = parse_header_retry_after(response.headers())
.unwrap_or(DEFAULT_RETRY_DURATION_FOR_RATE_LIMIT)
.min(MAX_RETRY_DURATION);
add_delay_and_continue(response, duration) // Some server (looking at you, github GraphQL API) may returns a rate limit
// even when OK is returned or on other status code (e.g. 453 forbidden).
if let Some(duration) = parse_header_retry_after(headers) {
add_delay_and_continue(response, duration.min(MAX_RETRY_DURATION))
} else if headers.get("x-ratelimit-remaining") == Some(&HEADER_VALUE_0) {
let duration = headers
.get("x-ratelimit-reset")
.and_then(|value| {
let secs = value.to_str().ok()?.parse().ok()?;
Some(Duration::from_secs(secs))
})
.unwrap_or(DEFAULT_RETRY_DURATION_FOR_RATE_LIMIT)
.min(MAX_RETRY_DURATION);
add_delay_and_continue(response, duration)
} else {
match status {
// Delay further request on rate limit
StatusCode::SERVICE_UNAVAILABLE | StatusCode::TOO_MANY_REQUESTS => {
add_delay_and_continue(response, DEFAULT_RETRY_DURATION_FOR_RATE_LIMIT)
}
// Delay further request on timeout
StatusCode::REQUEST_TIMEOUT | StatusCode::GATEWAY_TIMEOUT => {
add_delay_and_continue(response, RETRY_DURATION_FOR_TIMEOUT)
}
_ => Ok(ControlFlow::Break(response)),
} }
// Delay further request on timeout
StatusCode::REQUEST_TIMEOUT | StatusCode::GATEWAY_TIMEOUT => {
add_delay_and_continue(response, RETRY_DURATION_FOR_TIMEOUT)
}
_ => Ok(ControlFlow::Break(response)),
} }
} }

View file

@ -299,14 +299,13 @@ impl GhApiClient {
.send(false) .send(false)
.await?; .await?;
match check_http_status_and_header(&response) { match check_http_status_and_header(response) {
Err(GhApiError::Unauthorized) => { Err(GhApiError::Unauthorized) => {
self.0.is_auth_token_valid.store(false, Relaxed); self.0.is_auth_token_valid.store(false, Relaxed);
Err(GhApiError::Unauthorized)
} }
res => res?, res => res.map(Download::from_response),
} }
Ok(Download::from_response(response))
} }
} }

View file

@ -1,4 +1,4 @@
use std::{fmt::Debug, future::Future, sync::OnceLock, time::Duration}; use std::{fmt::Debug, future::Future, sync::OnceLock};
use binstalk_downloader::remote::{self, Response, Url}; use binstalk_downloader::remote::{self, Response, Url};
use compact_str::CompactString; use compact_str::CompactString;
@ -18,28 +18,12 @@ pub(super) fn percent_decode_http_url_path(input: &str) -> CompactString {
} }
} }
pub(super) fn check_http_status_and_header(response: &Response) -> Result<(), GhApiError> { pub(super) fn check_http_status_and_header(response: Response) -> Result<Response, GhApiError> {
let headers = response.headers();
match response.status() { match response.status() {
remote::StatusCode::FORBIDDEN
if headers
.get("x-ratelimit-remaining")
.map(|val| val == "0")
.unwrap_or(false) =>
{
Err(GhApiError::RateLimit {
retry_after: headers.get("x-ratelimit-reset").and_then(|value| {
let secs = value.to_str().ok()?.parse().ok()?;
Some(Duration::from_secs(secs))
}),
})
}
remote::StatusCode::UNAUTHORIZED => Err(GhApiError::Unauthorized), remote::StatusCode::UNAUTHORIZED => Err(GhApiError::Unauthorized),
remote::StatusCode::NOT_FOUND => Err(GhApiError::NotFound), remote::StatusCode::NOT_FOUND => Err(GhApiError::NotFound),
_ => Ok(()), _ => Ok(response.error_for_status()?),
} }
} }
@ -73,9 +57,7 @@ where
.send(false); .send(false);
async move { async move {
let response = future.await?; let response = check_http_status_and_header(future.await?)?;
check_http_status_and_header(&response)?;
Ok(response.json().await?) Ok(response.json().await?)
} }
@ -127,8 +109,7 @@ where
}); });
async move { async move {
let response = res?.await?; let response = check_http_status_and_header(res?.await?)?;
check_http_status_and_header(&response)?;
let mut response: GraphQLResponse<T> = response.json().await?; let mut response: GraphQLResponse<T> = response.json().await?;