Merge pull request #168 from NobodyXu/feature/avoid-oom

This commit is contained in:
Félix Saparelli 2022-06-10 01:55:22 +12:00 committed by GitHub
commit 29b28a4f8f
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
12 changed files with 517 additions and 146 deletions

26
Cargo.lock generated
View file

@ -142,6 +142,7 @@ name = "cargo-binstall"
version = "0.9.1"
dependencies = [
"async-trait",
"bytes",
"cargo_metadata",
"cargo_toml",
"clap 3.1.18",
@ -149,10 +150,12 @@ dependencies = [
"dirs",
"env_logger",
"flate2",
"futures-util",
"guess_host_triple",
"log",
"miette",
"reqwest",
"scopeguard",
"semver",
"serde",
"simplelog",
@ -597,7 +600,7 @@ dependencies = [
"indexmap",
"slab",
"tokio",
"tokio-util",
"tokio-util 0.7.3",
"tracing",
]
@ -1112,6 +1115,7 @@ dependencies = [
"serde_urlencoded",
"tokio",
"tokio-rustls",
"tokio-util 0.6.10",
"url",
"wasm-bindgen",
"wasm-bindgen-futures",
@ -1174,6 +1178,12 @@ version = "1.0.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f3f6f92acf49d1b98f7a81226834412ada05458b7364277387724a237f062695"
[[package]]
name = "scopeguard"
version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d29ab0c6d3fc0ee92fe66e2d99f700eab17a8d57d1c1d3b748380fb20baa78cd"
[[package]]
name = "sct"
version = "0.7.0"
@ -1516,6 +1526,20 @@ dependencies = [
"webpki",
]
[[package]]
name = "tokio-util"
version = "0.6.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "36943ee01a6d67977dd3f84a5a1d2efeb4ada3a1ae771cadfaa535d9d9fc6507"
dependencies = [
"bytes",
"futures-core",
"futures-sink",
"log",
"pin-project-lite",
"tokio",
]
[[package]]
name = "tokio-util"
version = "0.7.3"

View file

@ -20,15 +20,18 @@ pkg-fmt = "zip"
[dependencies]
async-trait = "0.1.56"
bytes = "1.1.0"
cargo_metadata = "0.14.2"
cargo_toml = "0.11.4"
clap = { version = "3.1.18", features = ["derive"] }
crates_io_api = { version = "0.8.0", default-features = false, features = ["rustls"] }
dirs = "4.0.0"
flate2 = { version = "1.0.24", features = ["zlib-ng"], default-features = false }
futures-util = { version = "0.3.21", default-features = false }
log = "0.4.14"
miette = { version = "4.7.1", features = ["fancy-no-backtrace"] }
reqwest = { version = "0.11.10", features = [ "rustls-tls" ], default-features = false }
reqwest = { version = "0.11.10", features = [ "rustls-tls", "stream" ], default-features = false }
scopeguard = "1.1.0"
semver = "1.0.7"
serde = { version = "1.0.136", features = [ "derive" ] }
simplelog = "0.12.0"
@ -38,7 +41,7 @@ tar = "0.4.38"
tempfile = "3.3.0"
thiserror = "1.0.31"
tinytemplate = "1.2.1"
tokio = { version = "1.19.1", features = [ "rt-multi-thread", "process" ], default-features = false }
tokio = { version = "1.19.1", features = [ "rt-multi-thread", "process", "sync" ], default-features = false }
url = "2.2.2"
xz2 = "0.1.6"

View file

@ -5,6 +5,7 @@ use std::time::Duration;
use crates_io_api::AsyncClient;
use log::debug;
use semver::{Version, VersionReq};
use url::Url;
use crate::{helpers::*, BinstallError, PkgFmt};
@ -50,7 +51,7 @@ fn find_version<'a, V: Iterator<Item = &'a String>>(
.ok_or(BinstallError::VersionMismatch { req: version_req })
}
/// Fetch a crate by name and version from crates.io
/// Fetch a crate Cargo.toml by name and version from crates.io
pub async fn fetch_crate_cratesio(
name: &str,
version_req: &str,
@ -98,16 +99,17 @@ pub async fn fetch_crate_cratesio(
// Download crate to temporary dir (crates.io or git?)
let crate_url = format!("https://crates.io/{}", version.dl_path);
let tgz_path = temp_dir.join(format!("{name}.tgz"));
debug!("Fetching crate from: {crate_url}");
debug!("Fetching crate from: {crate_url} and extracting Cargo.toml from it");
// Download crate
download(&crate_url, &tgz_path).await?;
download_and_extract(
Url::parse(&crate_url)?,
PkgFmt::Tgz,
&temp_dir,
Some([Path::new("Cargo.toml").into()]),
)
.await?;
// Decompress downloaded tgz
debug!("Decompressing crate archive");
extract(&tgz_path, PkgFmt::Tgz, &temp_dir)?;
let crate_path = temp_dir.join(format!("{name}-{version_name}"));
// Return crate directory

View file

@ -4,9 +4,8 @@ use std::sync::Arc;
pub use gh_crate_meta::*;
pub use log::debug;
pub use quickinstall::*;
use tokio::task::JoinHandle;
use crate::{BinstallError, PkgFmt, PkgMeta};
use crate::{AutoAbortJoinHandle, BinstallError, PkgFmt, PkgMeta};
mod gh_crate_meta;
mod quickinstall;
@ -18,8 +17,8 @@ pub trait Fetcher: Send + Sync {
where
Self: Sized;
/// Fetch a package
async fn fetch(&self, dst: &Path) -> Result<(), BinstallError>;
/// Fetch a package and extract
async fn fetch_and_extract(&self, dst: &Path) -> Result<(), BinstallError>;
/// Check if a package is available for download
async fn check(&self) -> Result<bool, BinstallError>;
@ -62,14 +61,16 @@ impl MultiFetcher {
.fetchers
.iter()
.cloned()
.map(|fetcher| (
.map(|fetcher| {
(
fetcher.clone(),
AutoAbortJoinHandle(tokio::spawn(async move { fetcher.check().await })),
))
AutoAbortJoinHandle::new(tokio::spawn(async move { fetcher.check().await })),
)
})
.collect();
for (fetcher, mut handle) in handles {
match (&mut handle.0).await {
for (fetcher, handle) in handles {
match handle.await {
Ok(Ok(true)) => return Some(fetcher),
Ok(Ok(false)) => (),
Ok(Err(err)) => {
@ -92,12 +93,3 @@ impl MultiFetcher {
None
}
}
#[derive(Debug)]
struct AutoAbortJoinHandle(JoinHandle<Result<bool, BinstallError>>);
impl Drop for AutoAbortJoinHandle {
fn drop(&mut self) {
self.0.abort();
}
}

View file

@ -7,7 +7,7 @@ use serde::Serialize;
use url::Url;
use super::Data;
use crate::{download, remote_exists, BinstallError, PkgFmt, Template};
use crate::{download_and_extract, remote_exists, BinstallError, PkgFmt, Template};
pub struct GhCrateMeta {
data: Data,
@ -40,10 +40,10 @@ impl super::Fetcher for GhCrateMeta {
remote_exists(url, Method::HEAD).await
}
async fn fetch(&self, dst: &Path) -> Result<(), BinstallError> {
async fn fetch_and_extract(&self, dst: &Path) -> Result<(), BinstallError> {
let url = self.url()?;
info!("Downloading package from: '{url}'");
download(url.as_str(), dst).await
download_and_extract::<_, 0>(url, self.pkg_fmt(), dst, None).await
}
fn pkg_fmt(&self) -> PkgFmt {

View file

@ -7,7 +7,7 @@ use tokio::task::JoinHandle;
use url::Url;
use super::Data;
use crate::{download, remote_exists, BinstallError, PkgFmt};
use crate::{download_and_extract, remote_exists, BinstallError, PkgFmt};
const BASE_URL: &str = "https://github.com/alsuren/cargo-quickinstall/releases/download";
const STATS_URL: &str = "https://warehouse-clerk-tmp.vercel.app/api/crate";
@ -37,10 +37,10 @@ impl super::Fetcher for QuickInstall {
remote_exists(Url::parse(&url)?, Method::HEAD).await
}
async fn fetch(&self, dst: &Path) -> Result<(), BinstallError> {
async fn fetch_and_extract(&self, dst: &Path) -> Result<(), BinstallError> {
let url = self.package_url();
info!("Downloading package from: '{url}'");
download(&url, &dst).await
download_and_extract::<_, 0>(Url::parse(&url)?, self.pkg_fmt(), dst, None).await
}
fn pkg_fmt(&self) -> PkgFmt {

View file

@ -1,23 +1,27 @@
use std::{
fs,
borrow::Cow,
io::{stderr, stdin, Write},
path::{Path, PathBuf},
};
use cargo_toml::Manifest;
use flate2::read::GzDecoder;
use log::{debug, info};
use reqwest::Method;
use serde::Serialize;
use tar::Archive;
use tinytemplate::TinyTemplate;
use url::Url;
use xz2::read::XzDecoder;
use zip::read::ZipArchive;
use zstd::stream::Decoder as ZstdDecoder;
use crate::{BinstallError, Meta, PkgFmt};
mod async_extracter;
pub use async_extracter::extract_archive_stream;
mod auto_abort_join_handle;
pub use auto_abort_join_handle::AutoAbortJoinHandle;
mod extracter;
mod readable_rx;
/// Load binstall metadata from the crate `Cargo.toml` at the provided path
pub fn load_manifest_path<P: AsRef<Path>>(
manifest_path: P,
@ -40,9 +44,17 @@ pub async fn remote_exists(url: Url, method: Method) -> Result<bool, BinstallErr
Ok(req.status().is_success())
}
/// Download a file from the provided URL to the provided path
pub async fn download<P: AsRef<Path>>(url: &str, path: P) -> Result<(), BinstallError> {
let url = Url::parse(url)?;
/// Download a file from the provided URL and extract it to the provided path
///
/// * `desired_outputs - If Some(_) and `fmt` is not `PkgFmt::Bin` or
/// `PkgFmt::Zip`, then it will filter the tar and only extract files
/// specified in it.
pub async fn download_and_extract<P: AsRef<Path>, const N: usize>(
url: Url,
fmt: PkgFmt,
path: P,
desired_outputs: Option<[Cow<'static, Path>; N]>,
) -> Result<(), BinstallError> {
debug!("Downloading from: '{url}'");
let resp = reqwest::get(url.clone())
@ -54,86 +66,12 @@ pub async fn download<P: AsRef<Path>>(url: &str, path: P) -> Result<(), Binstall
err,
})?;
let bytes = resp.bytes().await?;
let path = path.as_ref();
debug!("Download OK, writing to file: '{}'", path.display());
debug!("Downloading to file: '{}'", path.display());
fs::create_dir_all(path.parent().unwrap())?;
fs::write(&path, bytes)?;
extract_archive_stream(resp.bytes_stream(), path, fmt, desired_outputs).await?;
Ok(())
}
/// Extract files from the specified source onto the specified path
pub fn extract<S: AsRef<Path>, P: AsRef<Path>>(
source: S,
fmt: PkgFmt,
path: P,
) -> Result<(), BinstallError> {
let source = source.as_ref();
let path = path.as_ref();
match fmt {
PkgFmt::Tar => {
// Extract to install dir
debug!("Extracting from tar archive '{source:?}' to `{path:?}`");
let dat = fs::File::open(source)?;
let mut tar = Archive::new(dat);
tar.unpack(path)?;
}
PkgFmt::Tgz => {
// Extract to install dir
debug!("Decompressing from tgz archive '{source:?}' to `{path:?}`");
let dat = fs::File::open(source)?;
let tar = GzDecoder::new(dat);
let mut tgz = Archive::new(tar);
tgz.unpack(path)?;
}
PkgFmt::Txz => {
// Extract to install dir
debug!("Decompressing from txz archive '{source:?}' to `{path:?}`");
let dat = fs::File::open(source)?;
let tar = XzDecoder::new(dat);
let mut txz = Archive::new(tar);
txz.unpack(path)?;
}
PkgFmt::Tzstd => {
// Extract to install dir
debug!("Decompressing from tzstd archive '{source:?}' to `{path:?}`");
let dat = std::fs::File::open(source)?;
// The error can only come from raw::Decoder::with_dictionary
// as of zstd 0.10.2 and 0.11.2, which is specified
// as &[] by ZstdDecoder::new, thus ZstdDecoder::new
// should not return any error.
let tar = ZstdDecoder::new(dat)?;
let mut txz = Archive::new(tar);
txz.unpack(path)?;
}
PkgFmt::Zip => {
// Extract to install dir
debug!("Decompressing from zip archive '{source:?}' to `{path:?}`");
let dat = fs::File::open(source)?;
let mut zip = ZipArchive::new(dat)?;
zip.extract(path)?;
}
PkgFmt::Bin => {
debug!("Copying binary '{source:?}' to `{path:?}`");
// Copy to install dir
fs::copy(source, path)?;
}
};
debug!("Download OK, written to file: '{}'", path.display());
Ok(())
}

View file

@ -0,0 +1,232 @@
use std::borrow::Cow;
use std::fs;
use std::io::{self, Seek, Write};
use std::path::Path;
use bytes::Bytes;
use futures_util::stream::{Stream, StreamExt};
use scopeguard::{guard, Always, ScopeGuard};
use tempfile::tempfile;
use tokio::{sync::mpsc, task::spawn_blocking};
use super::{extracter::*, readable_rx::*, AutoAbortJoinHandle};
use crate::{BinstallError, PkgFmt};
pub(crate) enum Content {
/// Data to write to file
Data(Bytes),
/// Abort the writing and remove the file.
Abort,
}
#[derive(Debug)]
struct AsyncExtracterInner {
/// Use AutoAbortJoinHandle so that the task
/// will be cancelled on failure.
handle: AutoAbortJoinHandle<Result<(), BinstallError>>,
tx: mpsc::Sender<Content>,
}
impl AsyncExtracterInner {
/// * `desired_outputs - If Some(_), then it will filter the tar
/// and only extract files specified in it.
fn new<const N: usize>(
path: &Path,
fmt: PkgFmt,
desired_outputs: Option<[Cow<'static, Path>; N]>,
) -> Self {
let path = path.to_owned();
let (tx, rx) = mpsc::channel::<Content>(100);
let handle = AutoAbortJoinHandle::new(spawn_blocking(move || {
// close rx on error so that tx.send will return an error
let mut rx = guard(rx, |mut rx| {
rx.close();
});
fs::create_dir_all(path.parent().unwrap())?;
match fmt {
PkgFmt::Bin => {
let mut file = fs::File::create(&path)?;
// remove it unless the operation isn't aborted and no write
// fails.
let remove_guard = guard(&path, |path| {
fs::remove_file(path).ok();
});
Self::read_into_file(&mut file, &mut rx)?;
// Operation isn't aborted and all writes succeed,
// disarm the remove_guard.
ScopeGuard::into_inner(remove_guard);
}
PkgFmt::Zip => {
let mut file = tempfile()?;
Self::read_into_file(&mut file, &mut rx)?;
// rewind it so that we can pass it to unzip
file.rewind()?;
unzip(file, &path)?;
}
_ => extract_compressed_from_readable(
ReadableRx::new(&mut rx),
fmt,
&path,
desired_outputs.as_ref().map(|arr| &arr[..]),
)?,
}
Ok(())
}));
Self { handle, tx }
}
fn read_into_file(
file: &mut fs::File,
rx: &mut mpsc::Receiver<Content>,
) -> Result<(), BinstallError> {
while let Some(content) = rx.blocking_recv() {
match content {
Content::Data(bytes) => file.write_all(&*bytes)?,
Content::Abort => {
return Err(io::Error::new(io::ErrorKind::Other, "Aborted").into())
}
}
}
file.flush()?;
Ok(())
}
/// Upon error, this extracter shall not be reused.
/// Otherwise, `Self::done` would panic.
async fn feed(&mut self, bytes: Bytes) -> Result<(), BinstallError> {
if self.tx.send(Content::Data(bytes)).await.is_err() {
// task failed
Err(Self::wait(&mut self.handle).await.expect_err(
"Implementation bug: write task finished successfully before all writes are done",
))
} else {
Ok(())
}
}
async fn done(mut self) -> Result<(), BinstallError> {
// Drop tx as soon as possible so that the task would wrap up what it
// was doing and flush out all the pending data.
drop(self.tx);
Self::wait(&mut self.handle).await
}
async fn wait(
handle: &mut AutoAbortJoinHandle<Result<(), BinstallError>>,
) -> Result<(), BinstallError> {
match handle.await {
Ok(res) => res,
Err(join_err) => Err(io::Error::new(io::ErrorKind::Other, join_err).into()),
}
}
fn abort(self) {
let tx = self.tx;
// If Self::write fail, then the task is already tear down,
// tx closed and no need to abort.
if !tx.is_closed() {
// Use send here because blocking_send would panic if used
// in async threads.
tokio::spawn(async move {
tx.send(Content::Abort).await.ok();
});
}
}
}
/// AsyncExtracter will pass the `Bytes` you give to another thread via
/// a `mpsc` and decompress and unpack it if needed.
///
/// After all write is done, you must call `AsyncExtracter::done`,
/// otherwise the extracted content will be removed on drop.
///
/// # Advantages
///
/// `download_and_extract` has the following advantages over downloading
/// plus extracting in on the same thread:
///
/// - The code is pipelined instead of storing the downloaded file in memory
/// and extract it, except for `PkgFmt::Zip`, since `ZipArchiver::new`
/// requires `std::io::Seek`, so it fallbacks to writing the a file then
/// unzip it.
/// - The async part (downloading) and the extracting part runs in parallel
/// using `tokio::spawn_nonblocking`.
/// - Compressing/writing which takes a lot of CPU time will not block
/// the runtime anymore.
/// - For any PkgFmt except for `PkgFmt::Zip` and `PkgFmt::Bin` (basically
/// all `tar` based formats), it can extract only specified files.
/// This means that `super::drivers::fetch_crate_cratesio` no longer need to
/// extract the whole crate and write them to disk, it now only extract the
/// relevant part (`Cargo.toml`) out to disk and open it.
#[derive(Debug)]
struct AsyncExtracter(ScopeGuard<AsyncExtracterInner, fn(AsyncExtracterInner), Always>);
impl AsyncExtracter {
/// * `path` - If `fmt` is `PkgFmt::Bin`, then this is the filename
/// for the bin.
/// Otherwise, it is the directory where the extracted content will be put.
/// * `fmt` - The format of the archive to feed in.
/// * `desired_outputs - If Some(_), then it will filter the tar and
/// only extract files specified in it.
/// Note that this is a best-effort and it only works when `fmt`
/// is not `PkgFmt::Bin` or `PkgFmt::Zip`.
fn new<const N: usize>(
path: &Path,
fmt: PkgFmt,
desired_outputs: Option<[Cow<'static, Path>; N]>,
) -> Self {
let inner = AsyncExtracterInner::new(path, fmt, desired_outputs);
Self(guard(inner, AsyncExtracterInner::abort))
}
/// Upon error, this extracter shall not be reused.
/// Otherwise, `Self::done` would panic.
async fn feed(&mut self, bytes: Bytes) -> Result<(), BinstallError> {
self.0.feed(bytes).await
}
async fn done(self) -> Result<(), BinstallError> {
ScopeGuard::into_inner(self.0).done().await
}
}
/// * `output` - If `fmt` is `PkgFmt::Bin`, then this is the filename
/// for the bin.
/// Otherwise, it is the directory where the extracted content will be put.
/// * `fmt` - The format of the archive to feed in.
/// * `desired_outputs - If Some(_), then it will filter the tar and
/// only extract files specified in it.
/// Note that this is a best-effort and it only works when `fmt`
/// is not `PkgFmt::Bin` or `PkgFmt::Zip`.
pub async fn extract_archive_stream<E, const N: usize>(
mut stream: impl Stream<Item = Result<Bytes, E>> + Unpin,
output: &Path,
fmt: PkgFmt,
desired_outputs: Option<[Cow<'static, Path>; N]>,
) -> Result<(), BinstallError>
where
BinstallError: From<E>,
{
let mut extracter = AsyncExtracter::new(output, fmt, desired_outputs);
while let Some(res) = stream.next().await {
extracter.feed(res?).await?;
}
extracter.done().await
}

View file

@ -0,0 +1,45 @@
use std::{
future::Future,
ops::{Deref, DerefMut},
pin::Pin,
task::{Context, Poll},
};
use tokio::task::{JoinError, JoinHandle};
#[derive(Debug)]
pub struct AutoAbortJoinHandle<T>(JoinHandle<T>);
impl<T> AutoAbortJoinHandle<T> {
pub fn new(handle: JoinHandle<T>) -> Self {
Self(handle)
}
}
impl<T> Drop for AutoAbortJoinHandle<T> {
fn drop(&mut self) {
self.0.abort();
}
}
impl<T> Deref for AutoAbortJoinHandle<T> {
type Target = JoinHandle<T>;
fn deref(&self) -> &Self::Target {
&self.0
}
}
impl<T> DerefMut for AutoAbortJoinHandle<T> {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.0
}
}
impl<T> Future for AutoAbortJoinHandle<T> {
type Output = Result<T, JoinError>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
Pin::new(&mut Pin::into_inner(self).0).poll(cx)
}
}

99
src/helpers/extracter.rs Normal file
View file

@ -0,0 +1,99 @@
use std::borrow::Cow;
use std::fs::File;
use std::io::Read;
use std::path::Path;
use flate2::read::GzDecoder;
use log::debug;
use tar::Archive;
use xz2::read::XzDecoder;
use zip::read::ZipArchive;
use zstd::stream::Decoder as ZstdDecoder;
use crate::{BinstallError, PkgFmt};
/// * `desired_outputs - If Some(_), then it will filter the tar
/// and only extract files specified in it.
fn untar(
dat: impl Read,
path: &Path,
desired_outputs: Option<&[Cow<'_, Path>]>,
) -> Result<(), BinstallError> {
let mut tar = Archive::new(dat);
if let Some(desired_outputs) = desired_outputs {
for res in tar.entries()? {
let mut entry = res?;
let entry_path = entry.path()?;
if desired_outputs.contains(&entry_path) {
let dst = path.join(entry_path);
entry.unpack(dst)?;
}
}
} else {
tar.unpack(path)?;
}
Ok(())
}
/// Extract files from the specified source onto the specified path.
///
/// * `fmt` - must not be `PkgFmt::Bin` or `PkgFmt::Zip`.
/// * `desired_outputs - If Some(_), then it will filter the tar
/// and only extract files specified in it.
pub(crate) fn extract_compressed_from_readable(
dat: impl Read,
fmt: PkgFmt,
path: &Path,
desired_outputs: Option<&[Cow<'_, Path>]>,
) -> Result<(), BinstallError> {
match fmt {
PkgFmt::Tar => {
// Extract to install dir
debug!("Extracting from tar archive to `{path:?}`");
untar(dat, path, desired_outputs)?
}
PkgFmt::Tgz => {
// Extract to install dir
debug!("Decompressing from tgz archive to `{path:?}`");
let tar = GzDecoder::new(dat);
untar(tar, path, desired_outputs)?;
}
PkgFmt::Txz => {
// Extract to install dir
debug!("Decompressing from txz archive to `{path:?}`");
let tar = XzDecoder::new(dat);
untar(tar, path, desired_outputs)?;
}
PkgFmt::Tzstd => {
// Extract to install dir
debug!("Decompressing from tzstd archive to `{path:?}`");
// The error can only come from raw::Decoder::with_dictionary
// as of zstd 0.10.2 and 0.11.2, which is specified
// as &[] by ZstdDecoder::new, thus ZstdDecoder::new
// should not return any error.
let tar = ZstdDecoder::new(dat)?;
untar(tar, path, desired_outputs)?;
}
PkgFmt::Zip => panic!("Unexpected PkgFmt::Zip!"),
PkgFmt::Bin => panic!("Unexpected PkgFmt::Bin!"),
};
Ok(())
}
pub(crate) fn unzip(dat: File, dst: &Path) -> Result<(), BinstallError> {
debug!("Decompressing from zip archive to `{dst:?}`");
let mut zip = ZipArchive::new(dat)?;
zip.extract(dst)?;
Ok(())
}

View file

@ -0,0 +1,49 @@
use std::cmp::min;
use std::io::{self, Read};
use bytes::{Buf, Bytes};
use tokio::sync::mpsc::Receiver;
use super::async_extracter::Content;
#[derive(Debug)]
pub(crate) struct ReadableRx<'a> {
rx: &'a mut Receiver<Content>,
bytes: Bytes,
}
impl<'a> ReadableRx<'a> {
pub(crate) fn new(rx: &'a mut Receiver<Content>) -> Self {
Self {
rx,
bytes: Bytes::new(),
}
}
}
impl Read for ReadableRx<'_> {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
if buf.is_empty() {
return Ok(0);
}
let bytes = &mut self.bytes;
if !bytes.has_remaining() {
match self.rx.blocking_recv() {
Some(Content::Data(new_bytes)) => *bytes = new_bytes,
Some(Content::Abort) => {
return Err(io::Error::new(io::ErrorKind::Other, "Aborted"))
}
None => return Ok(0),
}
}
// copy_to_slice requires the bytes to have enough remaining bytes
// to fill buf.
let n = min(buf.len(), bytes.remaining());
bytes.copy_to_slice(&mut buf[..n]);
Ok(n)
}
}

View file

@ -295,12 +295,6 @@ async fn entry() -> Result<()> {
fetcher.source_name()
);
// Compute temporary directory for downloads
let pkg_path = temp_dir
.path()
.join(format!("pkg-{}.{}", opts.name, meta.pkg_fmt));
debug!("Using temporary download path: {}", pkg_path.display());
install_from_package(
binaries,
fetcher.as_ref(),
@ -308,7 +302,6 @@ async fn entry() -> Result<()> {
meta,
opts,
package,
pkg_path,
temp_dir,
)
.await
@ -337,7 +330,6 @@ async fn install_from_package(
mut meta: PkgMeta,
opts: Options,
package: Package<Meta>,
pkg_path: PathBuf,
temp_dir: TempDir,
) -> Result<()> {
// Prompt user for third-party source
@ -361,11 +353,21 @@ async fn install_from_package(
meta.bin_dir = "{ bin }{ binary-ext }".to_string();
}
let bin_path = temp_dir.path().join(format!("bin-{}", opts.name));
debug!("Using temporary binary path: {}", bin_path.display());
// Download package
if opts.dry_run {
info!("Dry run, not downloading package");
} else {
fetcher.fetch(&pkg_path).await?;
fetcher.fetch_and_extract(&bin_path).await?;
if binaries.is_empty() {
error!("No binaries specified (or inferred from file system)");
return Err(miette!(
"No binaries specified (or inferred from file system)"
));
}
}
#[cfg(incomplete)]
@ -392,21 +394,6 @@ async fn install_from_package(
}
}
let bin_path = temp_dir.path().join(format!("bin-{}", opts.name));
debug!("Using temporary binary path: {}", bin_path.display());
if !opts.dry_run {
// Extract files
extract(&pkg_path, fetcher.pkg_fmt(), &bin_path)?;
if binaries.is_empty() {
error!("No binaries specified (or inferred from file system)");
return Err(miette!(
"No binaries specified (or inferred from file system)"
));
}
}
// List files to be installed
// based on those found via Cargo.toml
let bin_data = bins::Data {