Disable tcp_nodelay for reqwest::Client and add rate limiting for https requests (#458)

This commit is contained in:
Jiahao XU 2022-10-07 15:51:34 +11:00 committed by GitHub
parent 8398ec2d4b
commit 76bc030f90
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
12 changed files with 243 additions and 99 deletions

46
Cargo.lock generated
View file

@ -143,6 +143,7 @@ dependencies = [
"tinytemplate",
"tokio",
"toml_edit",
"tower",
"trust-dns-resolver",
"url",
"xz2",
@ -261,7 +262,6 @@ dependencies = [
"log",
"miette",
"mimalloc",
"reqwest",
"semver",
"simplelog",
"tempfile",
@ -1373,6 +1373,26 @@ version = "2.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "478c572c3d73181ff3c2539045f6eb99e5491218eae919370993b890cdbdd98e"
[[package]]
name = "pin-project"
version = "1.0.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ad29a609b6bcd67fee905812e544992d216af9d755757c05ed2d0e15a74c6ecc"
dependencies = [
"pin-project-internal",
]
[[package]]
name = "pin-project-internal"
version = "1.0.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "069bdb1e05adc7a8990dce9cc75370895fbe4e3d58b9b73bf1aee56359344a55"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "pin-project-lite"
version = "0.2.9"
@ -2095,6 +2115,29 @@ dependencies = [
"serde",
]
[[package]]
name = "tower"
version = "0.4.13"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b8fa9be0de6cf49e536ce1851f987bd21a43b771b09473c3549a6c853db37c1c"
dependencies = [
"futures-core",
"futures-util",
"pin-project",
"pin-project-lite",
"tokio",
"tokio-util",
"tower-layer",
"tower-service",
"tracing",
]
[[package]]
name = "tower-layer"
version = "0.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "343bc9466d3fe6b0f960ef45960509f84480bf4fd96f92901afe7ff3df9d3a62"
[[package]]
name = "tower-service"
version = "0.3.2"
@ -2108,6 +2151,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2fce9567bd60a67d08a16488756721ba392f24f29006402881e43b19aac64307"
dependencies = [
"cfg-if",
"log",
"pin-project-lite",
"tracing-core",
]

View file

@ -29,7 +29,6 @@ dirs = "4.0.0"
log = "0.4.17"
miette = "5.3.0"
mimalloc = { version = "0.1.29", default-features = false, optional = true }
reqwest = { version = "0.11.12", default-features = false }
semver = "1.0.14"
simplelog = "0.12.0"
tempfile = "3.3.0"

View file

@ -1,13 +1,19 @@
use std::{ffi::OsString, path::PathBuf};
use std::{
ffi::OsString,
fmt,
num::{NonZeroU64, ParseIntError},
path::PathBuf,
str::FromStr,
};
use binstalk::{
errors::BinstallError,
helpers::remote::tls::Version,
manifests::cargo_toml_binstall::PkgFmt,
ops::resolve::{CrateName, VersionReqExt},
};
use clap::{Parser, ValueEnum};
use log::LevelFilter;
use reqwest::tls::Version;
use semver::VersionReq;
#[derive(Debug, Parser)]
@ -108,6 +114,21 @@ pub struct Args {
#[clap(help_heading = "Overrides", long)]
pub pkg_url: Option<String>,
/// Override the rate limit duration.
///
/// By default, cargo-binstall allows one request per 5 ms.
///
/// Example:
///
/// - `6`: Set the duration to 6ms, allows one request per 6 ms.
///
/// - `6/2`: Set the duration to 6ms and request_count to 2,
/// allows 2 requests per 6ms.
///
/// Both duration and request count must not be 0.
#[clap(help_heading = "Overrides", long, default_value_t = RateLimit::default())]
pub rate_limit: RateLimit,
/// Disable symlinking / versioned updates.
///
/// By default, Binstall will install a binary named `<name>-<version>` in the install path, and
@ -218,6 +239,45 @@ impl From<TLSVersion> for Version {
}
}
#[derive(Copy, Clone, Debug)]
pub struct RateLimit {
pub duration: NonZeroU64,
pub request_count: NonZeroU64,
}
impl fmt::Display for RateLimit {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}/{}", self.duration, self.request_count)
}
}
impl FromStr for RateLimit {
type Err = ParseIntError;
fn from_str(s: &str) -> Result<Self, Self::Err> {
Ok(if let Some((first, second)) = s.split_once('/') {
Self {
duration: first.parse()?,
request_count: second.parse()?,
}
} else {
Self {
duration: s.parse()?,
..Default::default()
}
})
}
}
impl Default for RateLimit {
fn default() -> Self {
Self {
duration: NonZeroU64::new(5).unwrap(),
request_count: NonZeroU64::new(1).unwrap(),
}
}
}
pub fn parse() -> Result<Args, BinstallError> {
// Filter extraneous arg when invoked by cargo
// `cargo run -- --help` gives ["target/debug/cargo-binstall", "--help"]

View file

@ -3,10 +3,7 @@ use std::{fs, path::Path, sync::Arc, time::Duration};
use binstalk::{
errors::BinstallError,
get_desired_targets,
helpers::{
jobserver_client::LazyJobserverClient, remote::create_reqwest_client,
tasks::AutoAbortJoinHandle,
},
helpers::{jobserver_client::LazyJobserverClient, remote::Client, tasks::AutoAbortJoinHandle},
manifests::{
binstall_crates_v1::Records, cargo_crates_v1::CratesToml, cargo_toml_binstall::PkgOverride,
},
@ -31,12 +28,20 @@ pub async fn install_crates(mut args: Args, jobserver_client: LazyJobserverClien
// Launch target detection
let desired_targets = get_desired_targets(args.targets.take());
let rate_limit = args.rate_limit;
// Initialize reqwest client
let client = create_reqwest_client(args.min_tls_version.map(|v| v.into()))?;
let client = Client::new(
args.min_tls_version.map(|v| v.into()),
Duration::from_millis(rate_limit.duration.get()),
rate_limit.request_count,
)?;
// Build crates.io api client
let crates_io_api_client =
crates_io_api::AsyncClient::with_http_client(client.clone(), Duration::from_millis(100));
let crates_io_api_client = crates_io_api::AsyncClient::with_http_client(
client.get_inner().clone(),
Duration::from_millis(100),
);
// Initialize UI thread
let mut uithread = UIThread::new(!args.no_confirm);

View file

@ -47,6 +47,7 @@ thiserror = "1.0.37"
tinytemplate = "1.2.1"
tokio = { version = "1.21.2", features = ["macros", "rt", "process", "sync", "signal"], default-features = false }
toml_edit = { version = "0.14.4", features = ["easy"] }
tower = { version = "0.4.13", features = ["limit", "util"] }
trust-dns-resolver = { version = "0.21.2", optional = true, default-features = false, features = ["dnssec-ring"] }
url = { version = "2.3.1", features = ["serde"] }
xz2 = "0.1.7"

View file

@ -3,13 +3,14 @@ use std::path::PathBuf;
use cargo_toml::Manifest;
use crates_io_api::AsyncClient;
use log::debug;
use reqwest::Client;
use semver::VersionReq;
use url::Url;
use crate::{
errors::BinstallError,
helpers::download::Download,
helpers::{
download::Download,
remote::{Client, Url},
},
manifests::cargo_toml_binstall::{Meta, TarBasedFmt},
};

View file

@ -4,10 +4,10 @@ use compact_str::CompactString;
pub use gh_crate_meta::*;
pub use log::debug;
pub use quickinstall::*;
use reqwest::Client;
use crate::{
errors::BinstallError,
helpers::remote::Client,
manifests::cargo_toml_binstall::{PkgFmt, PkgMeta},
};

View file

@ -4,7 +4,6 @@ use compact_str::{CompactString, ToCompactString};
use futures_util::stream::{FuturesUnordered, StreamExt};
use log::{debug, warn};
use once_cell::sync::OnceCell;
use reqwest::{Client, Method};
use serde::Serialize;
use strum::IntoEnumIterator;
use tinytemplate::TinyTemplate;
@ -14,7 +13,7 @@ use crate::{
errors::BinstallError,
helpers::{
download::Download,
remote::{get_redirected_final_url, remote_exists},
remote::{Client, Method},
tasks::AutoAbortJoinHandle,
},
manifests::cargo_toml_binstall::{PkgFmt, PkgMeta},
@ -59,11 +58,9 @@ impl GhCrateMeta {
AutoAbortJoinHandle::spawn(async move {
debug!("Checking for package at: '{url}'");
Ok(
(remote_exists(client.clone(), url.clone(), Method::HEAD).await?
|| remote_exists(client, url.clone(), Method::GET).await?)
.then_some((url, pkg_fmt)),
)
Ok((client.remote_exists(url.clone(), Method::HEAD).await?
|| client.remote_exists(url.clone(), Method::GET).await?)
.then_some((url, pkg_fmt)))
})
})
}
@ -81,7 +78,11 @@ impl super::Fetcher for GhCrateMeta {
async fn find(&self) -> Result<bool, BinstallError> {
let repo = if let Some(repo) = self.data.repo.as_deref() {
Some(get_redirected_final_url(&self.client, Url::parse(repo)?).await?)
Some(
self.client
.get_redirected_final_url(Url::parse(repo)?)
.await?,
)
} else {
None
};

View file

@ -2,14 +2,15 @@ use std::{path::Path, sync::Arc};
use compact_str::CompactString;
use log::debug;
use reqwest::Client;
use reqwest::Method;
use tokio::task::JoinHandle;
use url::Url;
use crate::{
errors::BinstallError,
helpers::{download::Download, remote::remote_exists},
helpers::{
download::Download,
remote::{Client, Method},
},
manifests::cargo_toml_binstall::{PkgFmt, PkgMeta},
};
@ -43,7 +44,9 @@ impl super::Fetcher for QuickInstall {
let url = self.package_url();
self.report();
debug!("Checking for package at: '{url}'");
remote_exists(self.client.clone(), Url::parse(&url)?, Method::HEAD).await
self.client
.remote_exists(Url::parse(&url)?, Method::HEAD)
.await
}
async fn fetch_and_extract(&self, dst: &Path) -> Result<(), BinstallError> {
@ -112,15 +115,7 @@ impl QuickInstall {
let url = Url::parse(&stats_url)?;
debug!("Sending installation report to quickinstall ({url})");
client
.request(Method::HEAD, url.clone())
.send()
.await
.map_err(|err| BinstallError::Http {
method: Method::HEAD,
url,
err,
})?;
client.remote_exists(url, Method::HEAD).await?;
Ok(())
})

View file

@ -2,11 +2,10 @@ use std::{fmt::Debug, marker::PhantomData, path::Path};
use digest::{Digest, FixedOutput, HashMarker, Output, OutputSizeUser, Update};
use log::debug;
use reqwest::{Client, Url};
use crate::{
errors::BinstallError,
helpers::remote::create_request,
helpers::remote::{Client, Url},
manifests::cargo_toml_binstall::{PkgFmt, PkgFmtDecomposed, TarBasedFmt},
};
@ -48,7 +47,7 @@ impl Download {
fmt: TarBasedFmt,
visitor: V,
) -> Result<V::Target, BinstallError> {
let stream = create_request(self.client, self.url).await?;
let stream = self.client.create_request(self.url).await?;
debug!("Downloading and extracting then in-memory processing");
@ -65,7 +64,7 @@ impl Download {
fmt: PkgFmt,
path: impl AsRef<Path>,
) -> Result<(), BinstallError> {
let stream = create_request(self.client, self.url).await?;
let stream = self.client.create_request(self.url).await?;
let path = path.as_ref();
debug!("Downloading and extracting to: '{}'", path.display());

View file

@ -1,69 +1,109 @@
use std::env;
use std::{env, num::NonZeroU64, sync::Arc, time::Duration};
use bytes::Bytes;
use futures_util::stream::Stream;
use log::debug;
use reqwest::{tls, Client, ClientBuilder, Method, Response};
use url::Url;
use reqwest::{Request, Response};
use tokio::sync::Mutex;
use tower::{limit::rate::RateLimit, Service, ServiceBuilder, ServiceExt};
use crate::errors::BinstallError;
pub fn create_reqwest_client(min_tls: Option<tls::Version>) -> Result<Client, BinstallError> {
const USER_AGENT: &str = concat!(env!("CARGO_PKG_NAME"), "/", env!("CARGO_PKG_VERSION"));
pub use reqwest::{tls, Method};
pub use url::Url;
let mut builder = ClientBuilder::new()
.user_agent(USER_AGENT)
.https_only(true)
.min_tls_version(tls::Version::TLS_1_2);
#[derive(Clone, Debug)]
pub struct Client {
client: reqwest::Client,
rate_limit: Arc<Mutex<RateLimit<reqwest::Client>>>,
}
if let Some(ver) = min_tls {
builder = builder.min_tls_version(ver);
impl Client {
/// * `per` - must not be 0.
pub fn new(
min_tls: Option<tls::Version>,
per: Duration,
num_request: NonZeroU64,
) -> Result<Self, BinstallError> {
const USER_AGENT: &str = concat!(env!("CARGO_PKG_NAME"), "/", env!("CARGO_PKG_VERSION"));
let mut builder = reqwest::ClientBuilder::new()
.user_agent(USER_AGENT)
.https_only(true)
.min_tls_version(tls::Version::TLS_1_2)
.tcp_nodelay(false);
if let Some(ver) = min_tls {
builder = builder.min_tls_version(ver);
}
let client = builder.build()?;
Ok(Self {
client: client.clone(),
rate_limit: Arc::new(Mutex::new(
ServiceBuilder::new()
.rate_limit(num_request.get(), per)
.service(client),
)),
})
}
Ok(builder.build()?)
}
pub async fn remote_exists(
client: Client,
url: Url,
method: Method,
) -> Result<bool, BinstallError> {
let req = client
.request(method.clone(), url.clone())
.send()
.await
.map_err(|err| BinstallError::Http { method, url, err })?;
Ok(req.status().is_success())
}
pub async fn get_redirected_final_url(client: &Client, url: Url) -> Result<Url, BinstallError> {
let method = Method::HEAD;
let req = client
.request(method.clone(), url.clone())
.send()
.await
.and_then(Response::error_for_status)
.map_err(|err| BinstallError::Http { method, url, err })?;
Ok(req.url().clone())
}
pub(crate) async fn create_request(
client: Client,
url: Url,
) -> Result<impl Stream<Item = reqwest::Result<Bytes>>, BinstallError> {
debug!("Downloading from: '{url}'");
client
.get(url.clone())
.send()
.await
.and_then(|r| r.error_for_status())
.map_err(|err| BinstallError::Http {
method: Method::GET,
url,
err,
})
.map(Response::bytes_stream)
pub fn get_inner(&self) -> &reqwest::Client {
&self.client
}
async fn send_request(
&self,
method: Method,
url: Url,
error_for_status: bool,
) -> Result<Response, BinstallError> {
let request = Request::new(method.clone(), url.clone());
// Reduce critical section:
// - Construct the request before locking
// - Once the rate_limit is ready, call it and obtain
// the future, then release the lock before
// polling the future.
let future = self.rate_limit.lock().await.ready().await?.call(request);
future
.await
.and_then(|response| {
if error_for_status {
response.error_for_status()
} else {
Ok(response)
}
})
.map_err(|err| BinstallError::Http { method, url, err })
}
pub async fn remote_exists(&self, url: Url, method: Method) -> Result<bool, BinstallError> {
Ok(self
.send_request(method, url, false)
.await?
.status()
.is_success())
}
pub async fn get_redirected_final_url(&self, url: Url) -> Result<Url, BinstallError> {
Ok(self
.send_request(Method::HEAD, url, true)
.await?
.url()
.clone())
}
pub(crate) async fn create_request(
&self,
url: Url,
) -> Result<impl Stream<Item = reqwest::Result<Bytes>>, BinstallError> {
debug!("Downloading from: '{url}'");
self.send_request(Method::GET, url, true)
.await
.map(Response::bytes_stream)
}
}

View file

@ -10,7 +10,6 @@ use cargo_toml::{Manifest, Package, Product};
use compact_str::{CompactString, ToCompactString};
use itertools::Itertools;
use log::{debug, info, warn};
use reqwest::Client;
use semver::{Version, VersionReq};
use tokio::task::block_in_place;
@ -20,7 +19,7 @@ use crate::{
drivers::fetch_crate_cratesio,
errors::BinstallError,
fetchers::{Data, Fetcher, GhCrateMeta, QuickInstall},
helpers::tasks::AutoAbortJoinHandle,
helpers::{remote::Client, tasks::AutoAbortJoinHandle},
manifests::cargo_toml_binstall::{Meta, PkgMeta},
};