mirror of
https://github.com/cargo-bins/cargo-binstall.git
synced 2025-04-24 14:28:42 +00:00
Impl GhApiClient
and use it in cargo-binstall
to speedup resolution process (#832)
Fixed #776 - Add new feature gh-api-client to binstalk-downloader - Impl new type `binstalk_downloader::remote::{RequestBuilder, Response}` - Impl `binstalk_downloader::gh_api_client::GhApiClient`, exposed if `cfg(feature = "gh-api-client")` and add e2e and unit tests for it - Use `binstalk_downloader::gh_api_client::GhApiClient` to speedup `cargo-binstall` - Add new option `--github-token` to supply the token for GitHub restful API, or read from env variable `GITHUB_TOKEN` if not present. Signed-off-by: Jiahao XU <Jiahao_XU@outlook.com>
This commit is contained in:
parent
263c836757
commit
599bcaf333
26 changed files with 960 additions and 192 deletions
|
@ -23,6 +23,8 @@ futures-lite = { version = "1.12.0", default-features = false }
|
|||
generic-array = "0.14.6"
|
||||
httpdate = "1.0.2"
|
||||
reqwest = { version = "0.11.14", features = ["stream", "gzip", "brotli", "deflate"], default-features = false }
|
||||
serde = { version = "1.0.152", features = ["derive"], optional = true }
|
||||
serde_json = { version = "1.0.93", optional = true }
|
||||
# Use a fork here since we need PAX support, but the upstream
|
||||
# does not hav the PR merged yet.
|
||||
#
|
||||
|
@ -71,3 +73,9 @@ trust-dns = ["trust-dns-resolver", "reqwest/trust-dns"]
|
|||
zstd-thin = ["zstd/thin"]
|
||||
|
||||
cross-lang-fat-lto = ["zstd/fat-lto"]
|
||||
|
||||
gh-api-client = ["serde", "serde_json"]
|
||||
|
||||
[package.metadata.docs.rs]
|
||||
all-features = true
|
||||
rustdoc-args = ["--cfg", "docsrs"]
|
||||
|
|
|
@ -18,13 +18,10 @@ use async_tar_visitor::extract_tar_based_stream_and_visit;
|
|||
pub use async_tar_visitor::{TarEntriesVisitor, TarEntry, TarEntryType};
|
||||
|
||||
mod extracter;
|
||||
mod stream_readable;
|
||||
|
||||
mod zip_extraction;
|
||||
pub use zip_extraction::ZipError;
|
||||
|
||||
mod utils;
|
||||
|
||||
#[derive(Debug, ThisError)]
|
||||
pub enum DownloadError {
|
||||
#[error("Failed to extract zipfile: {0}")]
|
||||
|
|
|
@ -7,18 +7,15 @@ use std::{
|
|||
|
||||
use async_zip::read::stream::ZipFileReader;
|
||||
use bytes::{Bytes, BytesMut};
|
||||
use futures_lite::{
|
||||
future::try_zip as try_join,
|
||||
stream::{Stream, StreamExt},
|
||||
};
|
||||
use futures_lite::stream::Stream;
|
||||
use tokio::sync::mpsc;
|
||||
use tokio_util::io::StreamReader;
|
||||
use tracing::debug;
|
||||
|
||||
use super::{
|
||||
extracter::*, stream_readable::StreamReadable, utils::asyncify,
|
||||
zip_extraction::extract_zip_entry, DownloadError, TarBasedFmt, ZipError,
|
||||
extracter::*, zip_extraction::extract_zip_entry, DownloadError, TarBasedFmt, ZipError,
|
||||
};
|
||||
use crate::utils::{extract_with_blocking_task, StreamReadable};
|
||||
|
||||
pub async fn extract_bin<S>(stream: S, path: &Path) -> Result<(), DownloadError>
|
||||
where
|
||||
|
@ -77,71 +74,22 @@ where
|
|||
.await
|
||||
}
|
||||
|
||||
async fn extract_with_blocking_decoder<S, F>(
|
||||
fn extract_with_blocking_decoder<S, F>(
|
||||
stream: S,
|
||||
path: &Path,
|
||||
f: F,
|
||||
) -> Result<(), DownloadError>
|
||||
) -> impl Future<Output = Result<(), DownloadError>>
|
||||
where
|
||||
S: Stream<Item = Result<Bytes, DownloadError>> + Send + Sync + Unpin + 'static,
|
||||
F: FnOnce(mpsc::Receiver<Bytes>, &Path) -> io::Result<()> + Send + Sync + 'static,
|
||||
{
|
||||
async fn inner<S, Fut>(
|
||||
mut stream: S,
|
||||
task: Fut,
|
||||
tx: mpsc::Sender<Bytes>,
|
||||
) -> Result<(), DownloadError>
|
||||
where
|
||||
// We do not use trait object for S since there will only be one
|
||||
// S used with this function.
|
||||
S: Stream<Item = Result<Bytes, DownloadError>> + Send + Sync + Unpin + 'static,
|
||||
// asyncify would always return the same future, so no need to
|
||||
// use trait object here.
|
||||
Fut: Future<Output = io::Result<()>> + Send + Sync,
|
||||
{
|
||||
try_join(
|
||||
async move {
|
||||
while let Some(bytes) = stream.next().await.transpose()? {
|
||||
if bytes.is_empty() {
|
||||
continue;
|
||||
}
|
||||
|
||||
if tx.send(bytes).await.is_err() {
|
||||
// The extract tar returns, which could be that:
|
||||
// - Extraction fails with an error
|
||||
// - Extraction success without the rest of the data
|
||||
//
|
||||
//
|
||||
// It's hard to tell the difference here, so we assume
|
||||
// the first scienario occurs.
|
||||
//
|
||||
// Even if the second scienario occurs, it won't affect the
|
||||
// extraction process anyway, so we can jsut ignore it.
|
||||
return Ok(());
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
},
|
||||
task,
|
||||
)
|
||||
.await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// Use channel size = 5 to minimize the waiting time in the extraction task
|
||||
let (tx, rx) = mpsc::channel(5);
|
||||
|
||||
let path = path.to_owned();
|
||||
|
||||
let task = asyncify(move || {
|
||||
extract_with_blocking_task(stream, move |rx| {
|
||||
if let Some(parent) = path.parent() {
|
||||
fs::create_dir_all(parent)?;
|
||||
}
|
||||
|
||||
f(rx, &path)
|
||||
});
|
||||
|
||||
inner(stream, task, tx).await
|
||||
})
|
||||
}
|
||||
|
|
|
@ -1,65 +0,0 @@
|
|||
use std::io::{self, BufRead, Read};
|
||||
|
||||
use bytes::{Buf, Bytes};
|
||||
use tokio::sync::mpsc;
|
||||
|
||||
/// This wraps an AsyncIterator as a `Read`able.
|
||||
/// It must be used in non-async context only,
|
||||
/// meaning you have to use it with
|
||||
/// `tokio::task::{block_in_place, spawn_blocking}` or
|
||||
/// `std::thread::spawn`.
|
||||
pub struct StreamReadable {
|
||||
rx: mpsc::Receiver<Bytes>,
|
||||
bytes: Bytes,
|
||||
}
|
||||
|
||||
impl StreamReadable {
|
||||
pub(super) fn new(rx: mpsc::Receiver<Bytes>) -> Self {
|
||||
Self {
|
||||
rx,
|
||||
bytes: Bytes::new(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Read for StreamReadable {
|
||||
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
|
||||
if buf.is_empty() {
|
||||
return Ok(0);
|
||||
}
|
||||
|
||||
if self.fill_buf()?.is_empty() {
|
||||
return Ok(0);
|
||||
}
|
||||
|
||||
let bytes = &mut self.bytes;
|
||||
|
||||
// copy_to_slice requires the bytes to have enough remaining bytes
|
||||
// to fill buf.
|
||||
let n = buf.len().min(bytes.remaining());
|
||||
|
||||
// <Bytes as Buf>::copy_to_slice copies and consumes the bytes
|
||||
bytes.copy_to_slice(&mut buf[..n]);
|
||||
|
||||
Ok(n)
|
||||
}
|
||||
}
|
||||
|
||||
impl BufRead for StreamReadable {
|
||||
fn fill_buf(&mut self) -> io::Result<&[u8]> {
|
||||
let bytes = &mut self.bytes;
|
||||
|
||||
if !bytes.has_remaining() {
|
||||
if let Some(new_bytes) = self.rx.blocking_recv() {
|
||||
// new_bytes are guaranteed to be non-empty.
|
||||
*bytes = new_bytes;
|
||||
}
|
||||
}
|
||||
|
||||
Ok(&*bytes)
|
||||
}
|
||||
|
||||
fn consume(&mut self, amt: usize) {
|
||||
self.bytes.advance(amt);
|
||||
}
|
||||
}
|
|
@ -1,22 +0,0 @@
|
|||
use std::{future::Future, io};
|
||||
|
||||
use tokio::task;
|
||||
|
||||
/// Copied from tokio https://docs.rs/tokio/latest/src/tokio/fs/mod.rs.html#132
|
||||
pub(super) fn asyncify<F, T>(f: F) -> impl Future<Output = io::Result<T>> + Send + Sync + 'static
|
||||
where
|
||||
F: FnOnce() -> io::Result<T> + Send + 'static,
|
||||
T: Send + 'static,
|
||||
{
|
||||
async fn inner<T: Send + 'static>(handle: task::JoinHandle<io::Result<T>>) -> io::Result<T> {
|
||||
match handle.await {
|
||||
Ok(res) => res,
|
||||
Err(err) => Err(io::Error::new(
|
||||
io::ErrorKind::Other,
|
||||
format!("background task failed: {err}"),
|
||||
)),
|
||||
}
|
||||
}
|
||||
|
||||
inner(task::spawn_blocking(f))
|
||||
}
|
|
@ -12,7 +12,8 @@ use tokio::{
|
|||
sync::mpsc,
|
||||
};
|
||||
|
||||
use super::{utils::asyncify, DownloadError};
|
||||
use super::DownloadError;
|
||||
use crate::utils::asyncify;
|
||||
|
||||
#[derive(Debug, ThisError)]
|
||||
enum ZipErrorInner {
|
||||
|
|
398
crates/binstalk-downloader/src/gh_api_client.rs
Normal file
398
crates/binstalk-downloader/src/gh_api_client.rs
Normal file
|
@ -0,0 +1,398 @@
|
|||
use std::{
|
||||
collections::HashMap,
|
||||
ops::Deref,
|
||||
sync::{Arc, Mutex, RwLock},
|
||||
time::{Duration, Instant},
|
||||
};
|
||||
|
||||
use compact_str::{CompactString, ToCompactString};
|
||||
use tokio::sync::OnceCell;
|
||||
|
||||
use crate::remote;
|
||||
|
||||
mod request;
|
||||
pub use request::{GhApiError, JsonError};
|
||||
|
||||
/// default retry duration if x-ratelimit-reset is not found in response header
|
||||
const DEFAULT_RETRY_DURATION: Duration = Duration::from_secs(3);
|
||||
|
||||
/// The keys required to identify a github release.
|
||||
#[derive(Clone, Eq, PartialEq, Hash, Debug)]
|
||||
pub struct GhRelease {
|
||||
pub owner: CompactString,
|
||||
pub repo: CompactString,
|
||||
pub tag: CompactString,
|
||||
}
|
||||
|
||||
/// The Github Release and one of its artifact.
|
||||
#[derive(Clone, Eq, PartialEq, Hash, Debug)]
|
||||
pub struct GhReleaseArtifact {
|
||||
pub release: GhRelease,
|
||||
pub artifact_name: CompactString,
|
||||
}
|
||||
|
||||
impl GhReleaseArtifact {
|
||||
/// Create [`GhReleaseArtifact`] from url.
|
||||
pub fn try_extract_from_url(url: &remote::Url) -> Option<Self> {
|
||||
if url.domain() != Some("github.com") {
|
||||
return None;
|
||||
}
|
||||
|
||||
let mut path_segments = url.path_segments()?;
|
||||
|
||||
let owner = path_segments.next()?;
|
||||
let repo = path_segments.next()?;
|
||||
|
||||
if (path_segments.next()?, path_segments.next()?) != ("releases", "download") {
|
||||
return None;
|
||||
}
|
||||
|
||||
let tag = path_segments.next()?;
|
||||
let artifact_name = path_segments.next()?;
|
||||
|
||||
(path_segments.next().is_none() && url.fragment().is_none() && url.query().is_none()).then(
|
||||
|| Self {
|
||||
release: GhRelease {
|
||||
owner: owner.to_compact_string(),
|
||||
repo: repo.to_compact_string(),
|
||||
tag: tag.to_compact_string(),
|
||||
},
|
||||
artifact_name: artifact_name.to_compact_string(),
|
||||
},
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
struct Map<K, V>(RwLock<HashMap<K, Arc<V>>>);
|
||||
|
||||
impl<K, V> Default for Map<K, V> {
|
||||
fn default() -> Self {
|
||||
Self(Default::default())
|
||||
}
|
||||
}
|
||||
|
||||
impl<K, V> Map<K, V>
|
||||
where
|
||||
K: Eq + std::hash::Hash,
|
||||
V: Default,
|
||||
{
|
||||
fn get(&self, k: K) -> Arc<V> {
|
||||
let optional_value = self.0.read().unwrap().deref().get(&k).cloned();
|
||||
optional_value.unwrap_or_else(|| Arc::clone(self.0.write().unwrap().entry(k).or_default()))
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
struct Inner {
|
||||
client: remote::Client,
|
||||
auth_token: Option<CompactString>,
|
||||
release_artifacts: Map<GhRelease, OnceCell<Option<request::Artifacts>>>,
|
||||
retry_after: Mutex<Option<Instant>>,
|
||||
}
|
||||
|
||||
/// Github API client for querying whether a release artifact exitsts.
|
||||
/// Can only handle github.com for now.
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct GhApiClient(Arc<Inner>);
|
||||
|
||||
impl GhApiClient {
|
||||
pub fn new(client: remote::Client, auth_token: Option<CompactString>) -> Self {
|
||||
Self(Arc::new(Inner {
|
||||
client,
|
||||
auth_token,
|
||||
release_artifacts: Default::default(),
|
||||
retry_after: Default::default(),
|
||||
}))
|
||||
}
|
||||
|
||||
/// The returned future is guaranteed to be pointer size.
|
||||
pub async fn has_release_artifact(
|
||||
&self,
|
||||
GhReleaseArtifact {
|
||||
release,
|
||||
artifact_name,
|
||||
}: GhReleaseArtifact,
|
||||
) -> Result<HasReleaseArtifact, GhApiError> {
|
||||
enum Failure {
|
||||
Error(GhApiError),
|
||||
RateLimit { retry_after: Instant },
|
||||
Unauthorized,
|
||||
}
|
||||
|
||||
let once_cell = self.0.release_artifacts.get(release.clone());
|
||||
let res = once_cell
|
||||
.get_or_try_init(|| {
|
||||
Box::pin(async {
|
||||
use request::FetchReleaseRet::*;
|
||||
|
||||
{
|
||||
let mut guard = self.0.retry_after.lock().unwrap();
|
||||
|
||||
if let Some(retry_after) = *guard {
|
||||
if retry_after.elapsed().is_zero() {
|
||||
return Err(Failure::RateLimit { retry_after });
|
||||
} else {
|
||||
// Instant retry_after is already reached.
|
||||
*guard = None;
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
match request::fetch_release_artifacts(
|
||||
&self.0.client,
|
||||
release,
|
||||
self.0.auth_token.as_deref(),
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(ReleaseNotFound) => Ok::<_, Failure>(None),
|
||||
Ok(Artifacts(artifacts)) => Ok(Some(artifacts)),
|
||||
Ok(ReachedRateLimit { retry_after }) => {
|
||||
let retry_after = retry_after.unwrap_or(DEFAULT_RETRY_DURATION);
|
||||
|
||||
let now = Instant::now();
|
||||
let retry_after = now
|
||||
.checked_add(retry_after)
|
||||
.unwrap_or_else(|| now + DEFAULT_RETRY_DURATION);
|
||||
|
||||
Err(Failure::RateLimit { retry_after })
|
||||
}
|
||||
Ok(Unauthorized) => Err(Failure::Unauthorized),
|
||||
Err(err) => Err(Failure::Error(err)),
|
||||
}
|
||||
})
|
||||
})
|
||||
.await;
|
||||
|
||||
match res {
|
||||
Ok(Some(artifacts)) => {
|
||||
let has_artifact = artifacts.contains(&artifact_name);
|
||||
Ok(if has_artifact {
|
||||
HasReleaseArtifact::Yes
|
||||
} else {
|
||||
HasReleaseArtifact::No
|
||||
})
|
||||
}
|
||||
Ok(None) => Ok(HasReleaseArtifact::NoSuchRelease),
|
||||
Err(Failure::Unauthorized) => Ok(HasReleaseArtifact::Unauthorized),
|
||||
Err(Failure::RateLimit { retry_after }) => {
|
||||
*self.0.retry_after.lock().unwrap() = Some(retry_after);
|
||||
|
||||
Ok(HasReleaseArtifact::RateLimit { retry_after })
|
||||
}
|
||||
Err(Failure::Error(err)) => Err(err),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Eq, PartialEq, Copy, Clone, Debug)]
|
||||
pub enum HasReleaseArtifact {
|
||||
Yes,
|
||||
No,
|
||||
NoSuchRelease,
|
||||
/// GitHub returns 401 requiring a token.
|
||||
/// In this case, it makes sense to fallback to HEAD/GET.
|
||||
Unauthorized,
|
||||
|
||||
/// GitHub rate limit is applied per hour, so in case of reaching the rate
|
||||
/// limit, [`GhApiClient`] will return this variant and let the user decide
|
||||
/// what to do.
|
||||
///
|
||||
/// Usually it is more sensible to fallback to directly HEAD/GET the
|
||||
/// artifact url than waiting until `retry_after`.
|
||||
///
|
||||
/// If you encounter this frequently, then you should consider getting an
|
||||
/// authentication token (can be personal access or oath access token),
|
||||
/// which should give you 5000 requests per hour per user.
|
||||
///
|
||||
/// Rate limit for unauthorized user is 60 requests per hour per originating
|
||||
/// IP address, so it is very easy to be rate limited.
|
||||
RateLimit {
|
||||
retry_after: Instant,
|
||||
},
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod test {
|
||||
use super::*;
|
||||
|
||||
mod cargo_binstall_v0_20_1 {
|
||||
use super::{CompactString, GhRelease};
|
||||
|
||||
pub(super) const RELEASE: GhRelease = GhRelease {
|
||||
owner: CompactString::new_inline("cargo-bins"),
|
||||
repo: CompactString::new_inline("cargo-binstall"),
|
||||
tag: CompactString::new_inline("v0.20.1"),
|
||||
};
|
||||
|
||||
pub(super) const ARTIFACTS: &[&str] = &[
|
||||
"cargo-binstall-aarch64-apple-darwin.full.zip",
|
||||
"cargo-binstall-aarch64-apple-darwin.zip",
|
||||
"cargo-binstall-aarch64-pc-windows-msvc.full.zip",
|
||||
"cargo-binstall-aarch64-pc-windows-msvc.zip",
|
||||
"cargo-binstall-aarch64-unknown-linux-gnu.full.tgz",
|
||||
"cargo-binstall-aarch64-unknown-linux-gnu.tgz",
|
||||
"cargo-binstall-aarch64-unknown-linux-musl.full.tgz",
|
||||
"cargo-binstall-aarch64-unknown-linux-musl.tgz",
|
||||
"cargo-binstall-armv7-unknown-linux-gnueabihf.full.tgz",
|
||||
"cargo-binstall-armv7-unknown-linux-gnueabihf.tgz",
|
||||
"cargo-binstall-armv7-unknown-linux-musleabihf.full.tgz",
|
||||
"cargo-binstall-armv7-unknown-linux-musleabihf.tgz",
|
||||
"cargo-binstall-universal-apple-darwin.full.zip",
|
||||
"cargo-binstall-universal-apple-darwin.zip",
|
||||
"cargo-binstall-x86_64-apple-darwin.full.zip",
|
||||
"cargo-binstall-x86_64-apple-darwin.zip",
|
||||
"cargo-binstall-x86_64-pc-windows-msvc.full.zip",
|
||||
"cargo-binstall-x86_64-pc-windows-msvc.zip",
|
||||
"cargo-binstall-x86_64-unknown-linux-gnu.full.tgz",
|
||||
"cargo-binstall-x86_64-unknown-linux-gnu.tgz",
|
||||
"cargo-binstall-x86_64-unknown-linux-musl.full.tgz",
|
||||
"cargo-binstall-x86_64-unknown-linux-musl.tgz",
|
||||
];
|
||||
}
|
||||
|
||||
fn try_extract_artifact_from_str(s: &str) -> Option<GhReleaseArtifact> {
|
||||
GhReleaseArtifact::try_extract_from_url(&url::Url::parse(s).unwrap())
|
||||
}
|
||||
|
||||
fn assert_extract_gh_release_artifacts_failures(urls: &[&str]) {
|
||||
for url in urls {
|
||||
assert_eq!(try_extract_artifact_from_str(url), None);
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn extract_gh_release_artifacts_failure() {
|
||||
use cargo_binstall_v0_20_1::*;
|
||||
|
||||
let GhRelease { owner, repo, tag } = RELEASE;
|
||||
|
||||
assert_extract_gh_release_artifacts_failures(&[
|
||||
"https://examle.com",
|
||||
"https://github.com",
|
||||
&format!("https://github.com/{owner}"),
|
||||
&format!("https://github.com/{owner}/{repo}"),
|
||||
&format!("https://github.com/{owner}/{repo}/123e"),
|
||||
&format!("https://github.com/{owner}/{repo}/releases/21343"),
|
||||
&format!("https://github.com/{owner}/{repo}/releases/download"),
|
||||
&format!("https://github.com/{owner}/{repo}/releases/download/{tag}"),
|
||||
&format!("https://github.com/{owner}/{repo}/releases/download/{tag}/a/23"),
|
||||
&format!("https://github.com/{owner}/{repo}/releases/download/{tag}/a#a=12"),
|
||||
&format!("https://github.com/{owner}/{repo}/releases/download/{tag}/a?page=3"),
|
||||
]);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn extract_gh_release_artifacts_success() {
|
||||
use cargo_binstall_v0_20_1::*;
|
||||
|
||||
let GhRelease { owner, repo, tag } = RELEASE;
|
||||
|
||||
for artifact in ARTIFACTS {
|
||||
let GhReleaseArtifact {
|
||||
release,
|
||||
artifact_name,
|
||||
} = try_extract_artifact_from_str(&format!(
|
||||
"https://github.com/{owner}/{repo}/releases/download/{tag}/{artifact}"
|
||||
))
|
||||
.unwrap();
|
||||
|
||||
assert_eq!(release, RELEASE);
|
||||
assert_eq!(artifact_name, artifact);
|
||||
}
|
||||
}
|
||||
|
||||
/// Mark this as an async fn so that you won't accidentally use it in
|
||||
/// sync context.
|
||||
async fn create_client() -> GhApiClient {
|
||||
GhApiClient::new(
|
||||
remote::Client::new(
|
||||
concat!(env!("CARGO_PKG_NAME"), "/", env!("CARGO_PKG_VERSION")),
|
||||
None,
|
||||
Duration::from_millis(10),
|
||||
1.try_into().unwrap(),
|
||||
[],
|
||||
)
|
||||
.unwrap(),
|
||||
None,
|
||||
)
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_gh_api_client_cargo_binstall_v0_20_1() {
|
||||
let client = create_client().await;
|
||||
|
||||
let release = cargo_binstall_v0_20_1::RELEASE;
|
||||
|
||||
let artifacts = cargo_binstall_v0_20_1::ARTIFACTS
|
||||
.iter()
|
||||
.map(ToCompactString::to_compact_string);
|
||||
|
||||
for artifact_name in artifacts {
|
||||
let ret = client
|
||||
.has_release_artifact(GhReleaseArtifact {
|
||||
release: release.clone(),
|
||||
artifact_name,
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
assert!(
|
||||
matches!(
|
||||
ret,
|
||||
HasReleaseArtifact::Yes | HasReleaseArtifact::RateLimit { .. }
|
||||
),
|
||||
"ret = {:#?}",
|
||||
ret
|
||||
);
|
||||
}
|
||||
|
||||
let ret = client
|
||||
.has_release_artifact(GhReleaseArtifact {
|
||||
release,
|
||||
artifact_name: "123z".to_compact_string(),
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
assert!(
|
||||
matches!(
|
||||
ret,
|
||||
HasReleaseArtifact::No | HasReleaseArtifact::RateLimit { .. }
|
||||
),
|
||||
"ret = {:#?}",
|
||||
ret
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_gh_api_client_cargo_binstall_no_such_release() {
|
||||
let client = create_client().await;
|
||||
|
||||
let release = GhRelease {
|
||||
owner: "cargo-bins".to_compact_string(),
|
||||
repo: "cargo-binstall".to_compact_string(),
|
||||
// We are currently at v0.20.1 and we would never release
|
||||
// anything older than v0.20.1
|
||||
tag: "v0.18.2".to_compact_string(),
|
||||
};
|
||||
|
||||
let ret = client
|
||||
.has_release_artifact(GhReleaseArtifact {
|
||||
release,
|
||||
artifact_name: "1234".to_compact_string(),
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
assert!(
|
||||
matches!(
|
||||
ret,
|
||||
HasReleaseArtifact::NoSuchRelease | HasReleaseArtifact::RateLimit { .. }
|
||||
),
|
||||
"ret = {:#?}",
|
||||
ret
|
||||
);
|
||||
}
|
||||
}
|
135
crates/binstalk-downloader/src/gh_api_client/request.rs
Normal file
135
crates/binstalk-downloader/src/gh_api_client/request.rs
Normal file
|
@ -0,0 +1,135 @@
|
|||
use std::{
|
||||
borrow::Borrow,
|
||||
collections::HashSet,
|
||||
hash::{Hash, Hasher},
|
||||
io,
|
||||
time::Duration,
|
||||
};
|
||||
|
||||
use compact_str::CompactString;
|
||||
use serde::Deserialize;
|
||||
use serde_json::from_slice as json_from_slice;
|
||||
use thiserror::Error as ThisError;
|
||||
use url::Url;
|
||||
|
||||
pub use serde_json::Error as JsonError;
|
||||
|
||||
use super::{remote, GhRelease};
|
||||
|
||||
#[derive(ThisError, Debug)]
|
||||
pub enum GhApiError {
|
||||
#[error("IO Error: {0}")]
|
||||
Io(#[from] io::Error),
|
||||
|
||||
#[error("Failed to parse json: {0}")]
|
||||
Json(#[from] JsonError),
|
||||
|
||||
#[error("Remote Error: {0}")]
|
||||
Remote(#[from] remote::Error),
|
||||
|
||||
#[error("Failed to parse url: {0}")]
|
||||
InvalidUrl(#[from] url::ParseError),
|
||||
}
|
||||
|
||||
// Only include fields we do care about
|
||||
|
||||
#[derive(Eq, Deserialize, Debug)]
|
||||
struct Artifact {
|
||||
name: CompactString,
|
||||
}
|
||||
|
||||
// Manually implement PartialEq and Hash to ensure it will always produce the
|
||||
// same hash as a str with the same content, and that the comparison will be
|
||||
// the same to coparing a string.
|
||||
|
||||
impl PartialEq for Artifact {
|
||||
fn eq(&self, other: &Self) -> bool {
|
||||
self.name.eq(&other.name)
|
||||
}
|
||||
}
|
||||
|
||||
impl Hash for Artifact {
|
||||
fn hash<H>(&self, state: &mut H)
|
||||
where
|
||||
H: Hasher,
|
||||
{
|
||||
let s: &str = self.name.as_str();
|
||||
s.hash(state)
|
||||
}
|
||||
}
|
||||
|
||||
// Implement Borrow so that we can use call
|
||||
// `HashSet::contains::<str>`
|
||||
|
||||
impl Borrow<str> for Artifact {
|
||||
fn borrow(&self) -> &str {
|
||||
&self.name
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Deserialize)]
|
||||
pub(super) struct Artifacts {
|
||||
assets: HashSet<Artifact>,
|
||||
}
|
||||
|
||||
impl Artifacts {
|
||||
pub(super) fn contains(&self, artifact_name: &str) -> bool {
|
||||
self.assets.contains(artifact_name)
|
||||
}
|
||||
}
|
||||
|
||||
pub(super) enum FetchReleaseRet {
|
||||
ReachedRateLimit { retry_after: Option<Duration> },
|
||||
ReleaseNotFound,
|
||||
Artifacts(Artifacts),
|
||||
Unauthorized,
|
||||
}
|
||||
|
||||
/// Returns 404 if not found
|
||||
pub(super) async fn fetch_release_artifacts(
|
||||
client: &remote::Client,
|
||||
GhRelease { owner, repo, tag }: GhRelease,
|
||||
auth_token: Option<&str>,
|
||||
) -> Result<FetchReleaseRet, GhApiError> {
|
||||
let mut request_builder = client
|
||||
.get(Url::parse(&format!(
|
||||
"https://api.github.com/repos/{owner}/{repo}/releases/tags/{tag}"
|
||||
))?)
|
||||
.header("Accept", "application/vnd.github+json")
|
||||
.header("X-GitHub-Api-Version", "2022-11-28");
|
||||
|
||||
if let Some(auth_token) = auth_token {
|
||||
request_builder = request_builder.bearer_auth(&auth_token);
|
||||
}
|
||||
|
||||
let response = request_builder.send(false).await?;
|
||||
|
||||
let status = response.status();
|
||||
let headers = response.headers();
|
||||
|
||||
if status == remote::StatusCode::FORBIDDEN
|
||||
&& headers
|
||||
.get("x-ratelimit-remaining")
|
||||
.map(|val| val == "0")
|
||||
.unwrap_or(false)
|
||||
{
|
||||
return Ok(FetchReleaseRet::ReachedRateLimit {
|
||||
retry_after: headers.get("x-ratelimit-reset").and_then(|value| {
|
||||
let secs = value.to_str().ok()?.parse().ok()?;
|
||||
Some(Duration::from_secs(secs))
|
||||
}),
|
||||
});
|
||||
}
|
||||
|
||||
if status == remote::StatusCode::UNAUTHORIZED {
|
||||
return Ok(FetchReleaseRet::Unauthorized);
|
||||
}
|
||||
|
||||
if status == remote::StatusCode::NOT_FOUND {
|
||||
return Ok(FetchReleaseRet::ReleaseNotFound);
|
||||
}
|
||||
|
||||
let bytes = response.error_for_status()?.bytes().await?;
|
||||
|
||||
Ok(FetchReleaseRet::Artifacts(json_from_slice(&bytes)?))
|
||||
}
|
|
@ -1,2 +1,13 @@
|
|||
#![cfg_attr(docsrs, feature(doc_auto_cfg))]
|
||||
|
||||
pub mod download;
|
||||
|
||||
/// Github API client.
|
||||
/// Currently only support github.com and does not support other enterprise
|
||||
/// github.
|
||||
#[cfg(feature = "gh-api-client")]
|
||||
pub mod gh_api_client;
|
||||
|
||||
pub mod remote;
|
||||
|
||||
mod utils;
|
||||
|
|
|
@ -6,17 +6,17 @@ use std::{
|
|||
};
|
||||
|
||||
use bytes::Bytes;
|
||||
use futures_lite::stream::{Stream, StreamExt};
|
||||
use futures_lite::stream::Stream;
|
||||
use httpdate::parse_http_date;
|
||||
use reqwest::{
|
||||
header::{HeaderMap, RETRY_AFTER},
|
||||
Request, Response, StatusCode,
|
||||
Request,
|
||||
};
|
||||
use thiserror::Error as ThisError;
|
||||
use tower::{limit::rate::RateLimit, Service, ServiceBuilder, ServiceExt};
|
||||
use tracing::{debug, info};
|
||||
|
||||
pub use reqwest::{tls, Error as ReqwestError, Method};
|
||||
pub use reqwest::{header, tls, Error as ReqwestError, Method, StatusCode};
|
||||
pub use url::Url;
|
||||
|
||||
mod delay_request;
|
||||
|
@ -25,6 +25,9 @@ use delay_request::DelayRequest;
|
|||
mod certificate;
|
||||
pub use certificate::Certificate;
|
||||
|
||||
mod request_builder;
|
||||
pub use request_builder::{RequestBuilder, Response};
|
||||
|
||||
const MAX_RETRY_DURATION: Duration = Duration::from_secs(120);
|
||||
const MAX_RETRY_COUNT: u8 = 3;
|
||||
const DEFAULT_RETRY_DURATION_FOR_RATE_LIMIT: Duration = Duration::from_millis(200);
|
||||
|
@ -129,11 +132,10 @@ impl Client {
|
|||
/// to retry.
|
||||
async fn do_send_request(
|
||||
&self,
|
||||
method: &Method,
|
||||
request: Request,
|
||||
url: &Url,
|
||||
) -> Result<ControlFlow<Response, Result<Response, ReqwestError>>, ReqwestError> {
|
||||
let request = Request::new(method.clone(), url.clone());
|
||||
|
||||
) -> Result<ControlFlow<reqwest::Response, Result<reqwest::Response, ReqwestError>>, ReqwestError>
|
||||
{
|
||||
let future = (&self.0.service).ready().await?.call(request);
|
||||
|
||||
let response = match future.await {
|
||||
|
@ -151,7 +153,7 @@ impl Client {
|
|||
|
||||
let status = response.status();
|
||||
|
||||
let add_delay_and_continue = |response: Response, duration| {
|
||||
let add_delay_and_continue = |response: reqwest::Response, duration| {
|
||||
info!("Receiver status code {status}, will wait for {duration:#?} and retry");
|
||||
|
||||
self.0
|
||||
|
@ -180,11 +182,11 @@ impl Client {
|
|||
}
|
||||
}
|
||||
|
||||
/// * `request` - `Request::try_clone` must always return `Some`.
|
||||
async fn send_request_inner(
|
||||
&self,
|
||||
method: &Method,
|
||||
url: &Url,
|
||||
) -> Result<Response, ReqwestError> {
|
||||
request: &Request,
|
||||
) -> Result<reqwest::Response, ReqwestError> {
|
||||
let mut count = 0;
|
||||
let max_retry_count = NonZeroU8::new(MAX_RETRY_COUNT).unwrap();
|
||||
|
||||
|
@ -193,7 +195,10 @@ impl Client {
|
|||
// Increment the counter before checking for terminal condition.
|
||||
count += 1;
|
||||
|
||||
match self.do_send_request(method, url).await? {
|
||||
match self
|
||||
.do_send_request(request.try_clone().unwrap(), request.url())
|
||||
.await?
|
||||
{
|
||||
ControlFlow::Break(response) => break Ok(response),
|
||||
ControlFlow::Continue(res) if count >= max_retry_count.get() => {
|
||||
break res;
|
||||
|
@ -203,13 +208,13 @@ impl Client {
|
|||
}
|
||||
}
|
||||
|
||||
/// * `request` - `Request::try_clone` must always return `Some`.
|
||||
async fn send_request(
|
||||
&self,
|
||||
method: Method,
|
||||
url: Url,
|
||||
request: Request,
|
||||
error_for_status: bool,
|
||||
) -> Result<Response, Error> {
|
||||
self.send_request_inner(&method, &url)
|
||||
) -> Result<reqwest::Response, Error> {
|
||||
self.send_request_inner(&request)
|
||||
.await
|
||||
.and_then(|response| {
|
||||
if error_for_status {
|
||||
|
@ -218,22 +223,29 @@ impl Client {
|
|||
Ok(response)
|
||||
}
|
||||
})
|
||||
.map_err(|err| Error::Http(Box::new(HttpError { method, url, err })))
|
||||
.map_err(|err| {
|
||||
Error::Http(Box::new(HttpError {
|
||||
method: request.method().clone(),
|
||||
url: request.url().clone(),
|
||||
err,
|
||||
}))
|
||||
})
|
||||
}
|
||||
|
||||
async fn head_or_fallback_to_get(
|
||||
&self,
|
||||
url: Url,
|
||||
error_for_status: bool,
|
||||
) -> Result<Response, Error> {
|
||||
) -> Result<reqwest::Response, Error> {
|
||||
let res = self
|
||||
.send_request(Method::HEAD, url.clone(), error_for_status)
|
||||
.send_request(Request::new(Method::HEAD, url.clone()), error_for_status)
|
||||
.await;
|
||||
|
||||
let retry_with_get = move || async move {
|
||||
// Retry using GET
|
||||
info!("HEAD on {url} is not allowed, fallback to GET");
|
||||
self.send_request(Method::GET, url, error_for_status).await
|
||||
self.send_request(Request::new(Method::GET, url), error_for_status)
|
||||
.await
|
||||
};
|
||||
|
||||
let is_retryable = |status| {
|
||||
|
@ -282,19 +294,18 @@ impl Client {
|
|||
) -> Result<impl Stream<Item = Result<Bytes, Error>>, Error> {
|
||||
debug!("Downloading from: '{url}'");
|
||||
|
||||
let response = self.send_request(Method::GET, url.clone(), true).await?;
|
||||
Ok(self.get(url).send(true).await?.bytes_stream())
|
||||
}
|
||||
|
||||
let url = Box::new(url);
|
||||
pub fn request(&self, method: Method, url: Url) -> RequestBuilder {
|
||||
RequestBuilder {
|
||||
client: self.clone(),
|
||||
inner: self.0.client.request(method, url),
|
||||
}
|
||||
}
|
||||
|
||||
Ok(response.bytes_stream().map(move |res| {
|
||||
res.map_err(|err| {
|
||||
Error::Http(Box::new(HttpError {
|
||||
method: Method::GET,
|
||||
url: Url::clone(&*url),
|
||||
err,
|
||||
}))
|
||||
})
|
||||
}))
|
||||
pub fn get(&self, url: Url) -> RequestBuilder {
|
||||
self.request(Method::GET, url)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
99
crates/binstalk-downloader/src/remote/request_builder.rs
Normal file
99
crates/binstalk-downloader/src/remote/request_builder.rs
Normal file
|
@ -0,0 +1,99 @@
|
|||
use std::fmt;
|
||||
|
||||
use bytes::Bytes;
|
||||
use futures_lite::stream::{Stream, StreamExt};
|
||||
use reqwest::Method;
|
||||
|
||||
use super::{header, Client, Error, HttpError, StatusCode, Url};
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct RequestBuilder {
|
||||
pub(super) client: Client,
|
||||
pub(super) inner: reqwest::RequestBuilder,
|
||||
}
|
||||
|
||||
impl RequestBuilder {
|
||||
pub fn bearer_auth(self, token: &dyn fmt::Display) -> RequestBuilder {
|
||||
Self {
|
||||
client: self.client,
|
||||
inner: self.inner.bearer_auth(token),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn header(self, key: &str, value: &str) -> RequestBuilder {
|
||||
Self {
|
||||
client: self.client,
|
||||
inner: self.inner.header(key, value),
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn send(self, error_for_status: bool) -> Result<Response, Error> {
|
||||
let request = self.inner.build()?;
|
||||
let method = request.method().clone();
|
||||
Ok(Response {
|
||||
inner: self.client.send_request(request, error_for_status).await?,
|
||||
method,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct Response {
|
||||
inner: reqwest::Response,
|
||||
method: Method,
|
||||
}
|
||||
|
||||
impl Response {
|
||||
pub async fn bytes(self) -> Result<Bytes, Error> {
|
||||
self.inner.bytes().await.map_err(Error::from)
|
||||
}
|
||||
|
||||
pub fn bytes_stream(self) -> impl Stream<Item = Result<Bytes, Error>> {
|
||||
let url = Box::new(self.inner.url().clone());
|
||||
let method = self.method;
|
||||
|
||||
self.inner.bytes_stream().map(move |res| {
|
||||
res.map_err(|err| {
|
||||
Error::Http(Box::new(HttpError {
|
||||
method: method.clone(),
|
||||
url: Url::clone(&*url),
|
||||
err,
|
||||
}))
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
pub fn status(&self) -> StatusCode {
|
||||
self.inner.status()
|
||||
}
|
||||
|
||||
pub fn url(&self) -> &Url {
|
||||
self.inner.url()
|
||||
}
|
||||
|
||||
pub fn method(&self) -> &Method {
|
||||
&self.method
|
||||
}
|
||||
|
||||
pub fn error_for_status_ref(&self) -> Result<&Self, Error> {
|
||||
match self.inner.error_for_status_ref() {
|
||||
Ok(_) => Ok(self),
|
||||
Err(err) => Err(Error::Http(Box::new(HttpError {
|
||||
method: self.method().clone(),
|
||||
url: self.url().clone(),
|
||||
err,
|
||||
}))),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn error_for_status(self) -> Result<Self, Error> {
|
||||
match self.error_for_status_ref() {
|
||||
Ok(_) => Ok(self),
|
||||
Err(err) => Err(err),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn headers(&self) -> &header::HeaderMap {
|
||||
self.inner.headers()
|
||||
}
|
||||
}
|
175
crates/binstalk-downloader/src/utils.rs
Normal file
175
crates/binstalk-downloader/src/utils.rs
Normal file
|
@ -0,0 +1,175 @@
|
|||
use std::{
|
||||
future::Future,
|
||||
io::{self, BufRead, Read},
|
||||
};
|
||||
|
||||
use bytes::{Buf, Bytes};
|
||||
use futures_lite::{
|
||||
future::poll_once,
|
||||
stream::{Stream, StreamExt},
|
||||
};
|
||||
use tokio::{sync::mpsc, task};
|
||||
|
||||
pub(super) fn extract_with_blocking_task<E, StreamError, S, F, T>(
|
||||
stream: S,
|
||||
f: F,
|
||||
) -> impl Future<Output = Result<T, E>>
|
||||
where
|
||||
T: Send + 'static,
|
||||
E: From<io::Error>,
|
||||
E: From<StreamError>,
|
||||
S: Stream<Item = Result<Bytes, StreamError>> + Send + Sync + Unpin + 'static,
|
||||
F: FnOnce(mpsc::Receiver<Bytes>) -> io::Result<T> + Send + Sync + 'static,
|
||||
{
|
||||
async fn inner<S, StreamError, Fut, T, E>(
|
||||
mut stream: S,
|
||||
task: Fut,
|
||||
tx: mpsc::Sender<Bytes>,
|
||||
) -> Result<T, E>
|
||||
where
|
||||
E: From<io::Error>,
|
||||
E: From<StreamError>,
|
||||
// We do not use trait object for S since there will only be one
|
||||
// S used with this function.
|
||||
S: Stream<Item = Result<Bytes, StreamError>> + Send + Sync + Unpin + 'static,
|
||||
// asyncify would always return the same future, so no need to
|
||||
// use trait object here.
|
||||
Fut: Future<Output = io::Result<T>> + Send + Sync,
|
||||
{
|
||||
let read_fut = async move {
|
||||
while let Some(bytes) = stream.next().await.transpose()? {
|
||||
if bytes.is_empty() {
|
||||
continue;
|
||||
}
|
||||
|
||||
if tx.send(bytes).await.is_err() {
|
||||
// The extract tar returns, which could be that:
|
||||
// - Extraction fails with an error
|
||||
// - Extraction success without the rest of the data
|
||||
//
|
||||
//
|
||||
// It's hard to tell the difference here, so we assume
|
||||
// the first scienario occurs.
|
||||
//
|
||||
// Even if the second scienario occurs, it won't affect the
|
||||
// extraction process anyway, so we can jsut ignore it.
|
||||
return Ok(());
|
||||
}
|
||||
}
|
||||
|
||||
Ok::<_, E>(())
|
||||
};
|
||||
tokio::pin!(read_fut);
|
||||
|
||||
let task_fut = async move { task.await.map_err(E::from) };
|
||||
tokio::pin!(task_fut);
|
||||
|
||||
tokio::select! {
|
||||
biased;
|
||||
|
||||
res = &mut read_fut => {
|
||||
// The stream reaches eof, propagate error and wait for
|
||||
// read task to be done.
|
||||
res?;
|
||||
|
||||
task_fut.await
|
||||
},
|
||||
res = &mut task_fut => {
|
||||
// The task finishes before the read task, return early
|
||||
// after checking for errors in read_fut.
|
||||
if let Some(Err(err)) = poll_once(read_fut).await {
|
||||
Err(err)
|
||||
} else {
|
||||
res
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Use channel size = 5 to minimize the waiting time in the extraction task
|
||||
let (tx, rx) = mpsc::channel(5);
|
||||
|
||||
let task = asyncify(move || f(rx));
|
||||
|
||||
inner(stream, task, tx)
|
||||
}
|
||||
|
||||
/// Copied from tokio https://docs.rs/tokio/latest/src/tokio/fs/mod.rs.html#132
|
||||
pub(super) fn asyncify<F, T>(f: F) -> impl Future<Output = io::Result<T>> + Send + Sync + 'static
|
||||
where
|
||||
F: FnOnce() -> io::Result<T> + Send + 'static,
|
||||
T: Send + 'static,
|
||||
{
|
||||
async fn inner<T: Send + 'static>(handle: task::JoinHandle<io::Result<T>>) -> io::Result<T> {
|
||||
match handle.await {
|
||||
Ok(res) => res,
|
||||
Err(err) => Err(io::Error::new(
|
||||
io::ErrorKind::Other,
|
||||
format!("background task failed: {err}"),
|
||||
)),
|
||||
}
|
||||
}
|
||||
|
||||
inner(task::spawn_blocking(f))
|
||||
}
|
||||
|
||||
/// This wraps an AsyncIterator as a `Read`able.
|
||||
/// It must be used in non-async context only,
|
||||
/// meaning you have to use it with
|
||||
/// `tokio::task::{block_in_place, spawn_blocking}` or
|
||||
/// `std::thread::spawn`.
|
||||
pub(super) struct StreamReadable {
|
||||
rx: mpsc::Receiver<Bytes>,
|
||||
bytes: Bytes,
|
||||
}
|
||||
|
||||
impl StreamReadable {
|
||||
pub(super) fn new(rx: mpsc::Receiver<Bytes>) -> Self {
|
||||
Self {
|
||||
rx,
|
||||
bytes: Bytes::new(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Read for StreamReadable {
|
||||
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
|
||||
if buf.is_empty() {
|
||||
return Ok(0);
|
||||
}
|
||||
|
||||
if self.fill_buf()?.is_empty() {
|
||||
return Ok(0);
|
||||
}
|
||||
|
||||
let bytes = &mut self.bytes;
|
||||
|
||||
// copy_to_slice requires the bytes to have enough remaining bytes
|
||||
// to fill buf.
|
||||
let n = buf.len().min(bytes.remaining());
|
||||
|
||||
// <Bytes as Buf>::copy_to_slice copies and consumes the bytes
|
||||
bytes.copy_to_slice(&mut buf[..n]);
|
||||
|
||||
Ok(n)
|
||||
}
|
||||
}
|
||||
|
||||
impl BufRead for StreamReadable {
|
||||
fn fill_buf(&mut self) -> io::Result<&[u8]> {
|
||||
let bytes = &mut self.bytes;
|
||||
|
||||
if !bytes.has_remaining() {
|
||||
if let Some(new_bytes) = self.rx.blocking_recv() {
|
||||
// new_bytes are guaranteed to be non-empty.
|
||||
*bytes = new_bytes;
|
||||
}
|
||||
}
|
||||
|
||||
Ok(&*bytes)
|
||||
}
|
||||
|
||||
fn consume(&mut self, amt: usize) {
|
||||
self.bytes.advance(amt);
|
||||
}
|
||||
}
|
Loading…
Add table
Add a link
Reference in a new issue