cargo-binstall/crates/binstalk-downloader/src/remote.rs
Jiahao XU 50b6e62164
Minor refactor and optimization (#543)
* Avoid potential panicking in `args::parse` by using `Vec::get` instead of indexing
* Refactor: Simplify `opts::{resolve, install}` API
   Many parameters can be shared and put into `opts::Options` intead and
   that would also avoid a few `Arc<Path>`.
* Optimize `get_install_path`: Avoid cloning `install_path`
* Optimize `LazyJobserverClient`: Un`Arc` & remove `Clone` impl
   to avoid additional boxing
* Optimize `find_version`: Avoid cloning `semver::Version`
* Optimize `GhCrateMeta::launch_baseline_find_tasks`
   return `impl Iterator<Item = impl Future<Output = ...>>`
   instead of `impl Iterator<Item = AutoAbortJoinHandle<...>>`
   to avoid unnecessary spawning.
   
   Each task spawned has to be boxed and then polled by tokio runtime.
   They might also be moved.
   
   While they increase parallelism, spawning these futures does not justify
   the costs because:
    - Each `Future` only calls `remote_exists`
    - Each `remote_exists` call send requests to the same domain, which is
      likely to share the same http2 connection.
      Since the conn is shared anyway, spawning does not speedup anything
      but merely add communication overhead.
    - Plus the tokio runtime spawning cost
* Optimize `install_crates`: Destruct `Args` before any `.await` point
   to reduce size of the future
* Refactor `logging`: Replace param `arg` with `log_level` & `json_output`
   to avoid dep on `Args`
* Add dep strum & strum_macros to crates/bin
* Derive `strum_macros::EnumCount` for `Strategy`
* Optimize strategies parsing in `install_crates`
* Fix panic in `install_crates` when `Compile` is not the last strategy specified
* Optimize: Take `Vec<Self>` instead of slice in `CrateName::dedup`
* Refactor: Extract new fn `compute_resolvers`
* Refactor: Extract new fn `compute_paths_and_load_manifests`
* Refactor: Extract new fn `filter_out_installed_crates`
* Reorder `install_crates`: Only run target detection if args are valid
   and there are some crates to be installed.
* Optimize `filter_out_installed_crates`: Avoid allocation
   by returning an `Iterator`
* Fix user_agent of `remote::Client`: Let user specify it
* Refactor: Replace `UIThread` with `ui::confirm`
   which is much simpler.

Signed-off-by: Jiahao XU <Jiahao_XU@outlook.com>
2022-11-19 20:00:27 +13:00

208 lines
6.1 KiB
Rust

use std::{
num::NonZeroU64,
sync::Arc,
time::{Duration, SystemTime},
};
use bytes::Bytes;
use futures_util::stream::{Stream, StreamExt};
use httpdate::parse_http_date;
use reqwest::{
header::{HeaderMap, RETRY_AFTER},
Request, Response, StatusCode,
};
use thiserror::Error as ThisError;
use tokio::{sync::Mutex, time::sleep};
use tower::{limit::rate::RateLimit, Service, ServiceBuilder, ServiceExt};
use tracing::{debug, info};
pub use reqwest::{tls, Error as ReqwestError, Method};
pub use url::Url;
const MAX_RETRY_DURATION: Duration = Duration::from_secs(120);
const MAX_RETRY_COUNT: u8 = 3;
#[derive(Debug, ThisError)]
pub enum Error {
#[error(transparent)]
Reqwest(#[from] reqwest::Error),
#[error(transparent)]
Http(Box<HttpError>),
}
#[derive(Debug, ThisError)]
#[error("could not {method} {url}: {err}")]
pub struct HttpError {
method: reqwest::Method,
url: url::Url,
#[source]
err: reqwest::Error,
}
#[derive(Clone, Debug)]
pub struct Client {
client: reqwest::Client,
rate_limit: Arc<Mutex<RateLimit<reqwest::Client>>>,
}
impl Client {
/// * `per` - must not be 0.
/// * `num_request` - maximum number of requests to be processed for
/// each `per` duration.
pub fn new(
user_agent: impl AsRef<str>,
min_tls: Option<tls::Version>,
per: Duration,
num_request: NonZeroU64,
) -> Result<Self, Error> {
fn inner(
user_agent: &str,
min_tls: Option<tls::Version>,
per: Duration,
num_request: NonZeroU64,
) -> Result<Client, Error> {
let mut builder = reqwest::ClientBuilder::new()
.user_agent(user_agent)
.https_only(true)
.min_tls_version(tls::Version::TLS_1_2)
.tcp_nodelay(false);
if let Some(ver) = min_tls {
builder = builder.min_tls_version(ver);
}
let client = builder.build()?;
Ok(Client {
client: client.clone(),
rate_limit: Arc::new(Mutex::new(
ServiceBuilder::new()
.rate_limit(num_request.get(), per)
.service(client),
)),
})
}
inner(user_agent.as_ref(), min_tls, per, num_request)
}
/// Return inner reqwest client.
pub fn get_inner(&self) -> &reqwest::Client {
&self.client
}
async fn send_request_inner(
&self,
method: &Method,
url: &Url,
) -> Result<Response, ReqwestError> {
let mut count = 0;
loop {
let request = Request::new(method.clone(), url.clone());
// Reduce critical section:
// - Construct the request before locking
// - Once the rate_limit is ready, call it and obtain
// the future, then release the lock before
// polling the future, which performs network I/O that could
// take really long.
let future = self.rate_limit.lock().await.ready().await?.call(request);
let response = future.await?;
let status = response.status();
match (status, parse_header_retry_after(response.headers())) {
(
// 503 429
StatusCode::SERVICE_UNAVAILABLE | StatusCode::TOO_MANY_REQUESTS,
Some(duration),
) if duration <= MAX_RETRY_DURATION && count < MAX_RETRY_COUNT => {
info!("Receiver status code {status}, will wait for {duration:#?} and retry");
sleep(duration).await
}
_ => break Ok(response),
}
count += 1;
}
}
async fn send_request(
&self,
method: Method,
url: Url,
error_for_status: bool,
) -> Result<Response, Error> {
self.send_request_inner(&method, &url)
.await
.and_then(|response| {
if error_for_status {
response.error_for_status()
} else {
Ok(response)
}
})
.map_err(|err| Error::Http(Box::new(HttpError { method, url, err })))
}
/// Check if remote exists using `method`.
pub async fn remote_exists(&self, url: Url, method: Method) -> Result<bool, Error> {
Ok(self
.send_request(method, url, false)
.await?
.status()
.is_success())
}
/// Attempt to get final redirected url.
pub async fn get_redirected_final_url(&self, url: Url) -> Result<Url, Error> {
Ok(self
.send_request(Method::HEAD, url, true)
.await?
.url()
.clone())
}
/// Create `GET` request to `url` and return a stream of the response data.
/// On status code other than 200, it will return an error.
pub async fn get_stream(
&self,
url: Url,
) -> Result<impl Stream<Item = Result<Bytes, Error>>, Error> {
debug!("Downloading from: '{url}'");
self.send_request(Method::GET, url, true)
.await
.map(|response| response.bytes_stream().map(|res| res.map_err(Error::from)))
}
}
fn parse_header_retry_after(headers: &HeaderMap) -> Option<Duration> {
let header = headers
.get_all(RETRY_AFTER)
.into_iter()
.last()?
.to_str()
.ok()?;
match header.parse::<u64>() {
Ok(dur) => Some(Duration::from_secs(dur)),
Err(_) => {
let system_time = parse_http_date(header).ok()?;
let retry_after_unix_timestamp =
system_time.duration_since(SystemTime::UNIX_EPOCH).ok()?;
let curr_time_unix_timestamp = SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.expect("SystemTime before UNIX EPOCH!");
// retry_after_unix_timestamp - curr_time_unix_timestamp
// If underflows, returns Duration::ZERO.
Some(retry_after_unix_timestamp.saturating_sub(curr_time_unix_timestamp))
}
}
}