diff --git a/Cargo.lock b/Cargo.lock index c4fe15c3..21877876 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -142,6 +142,7 @@ name = "cargo-binstall" version = "0.9.1" dependencies = [ "async-trait", + "bytes", "cargo_metadata", "cargo_toml", "clap 3.1.18", @@ -149,10 +150,12 @@ dependencies = [ "dirs", "env_logger", "flate2", + "futures-util", "guess_host_triple", "log", "miette", "reqwest", + "scopeguard", "semver", "serde", "simplelog", @@ -597,7 +600,7 @@ dependencies = [ "indexmap", "slab", "tokio", - "tokio-util", + "tokio-util 0.7.3", "tracing", ] @@ -1112,6 +1115,7 @@ dependencies = [ "serde_urlencoded", "tokio", "tokio-rustls", + "tokio-util 0.6.10", "url", "wasm-bindgen", "wasm-bindgen-futures", @@ -1174,6 +1178,12 @@ version = "1.0.10" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f3f6f92acf49d1b98f7a81226834412ada05458b7364277387724a237f062695" +[[package]] +name = "scopeguard" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d29ab0c6d3fc0ee92fe66e2d99f700eab17a8d57d1c1d3b748380fb20baa78cd" + [[package]] name = "sct" version = "0.7.0" @@ -1516,6 +1526,20 @@ dependencies = [ "webpki", ] +[[package]] +name = "tokio-util" +version = "0.6.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "36943ee01a6d67977dd3f84a5a1d2efeb4ada3a1ae771cadfaa535d9d9fc6507" +dependencies = [ + "bytes", + "futures-core", + "futures-sink", + "log", + "pin-project-lite", + "tokio", +] + [[package]] name = "tokio-util" version = "0.7.3" diff --git a/Cargo.toml b/Cargo.toml index 7f015855..7cdbcc48 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -20,15 +20,18 @@ pkg-fmt = "zip" [dependencies] async-trait = "0.1.56" +bytes = "1.1.0" cargo_metadata = "0.14.2" cargo_toml = "0.11.4" clap = { version = "3.1.18", features = ["derive"] } crates_io_api = { version = "0.8.0", default-features = false, features = ["rustls"] } dirs = "4.0.0" flate2 = { version = "1.0.24", features = ["zlib-ng"], default-features = false } +futures-util = { version = "0.3.21", default-features = false } log = "0.4.14" miette = { version = "4.7.1", features = ["fancy-no-backtrace"] } -reqwest = { version = "0.11.10", features = [ "rustls-tls" ], default-features = false } +reqwest = { version = "0.11.10", features = [ "rustls-tls", "stream" ], default-features = false } +scopeguard = "1.1.0" semver = "1.0.7" serde = { version = "1.0.136", features = [ "derive" ] } simplelog = "0.12.0" @@ -38,7 +41,7 @@ tar = "0.4.38" tempfile = "3.3.0" thiserror = "1.0.31" tinytemplate = "1.2.1" -tokio = { version = "1.19.1", features = [ "rt-multi-thread", "process" ], default-features = false } +tokio = { version = "1.19.1", features = [ "rt-multi-thread", "process", "sync" ], default-features = false } url = "2.2.2" xz2 = "0.1.6" diff --git a/src/drivers.rs b/src/drivers.rs index 49b615c1..3ddf654d 100644 --- a/src/drivers.rs +++ b/src/drivers.rs @@ -5,6 +5,7 @@ use std::time::Duration; use crates_io_api::AsyncClient; use log::debug; use semver::{Version, VersionReq}; +use url::Url; use crate::{helpers::*, BinstallError, PkgFmt}; @@ -50,7 +51,7 @@ fn find_version<'a, V: Iterator>( .ok_or(BinstallError::VersionMismatch { req: version_req }) } -/// Fetch a crate by name and version from crates.io +/// Fetch a crate Cargo.toml by name and version from crates.io pub async fn fetch_crate_cratesio( name: &str, version_req: &str, @@ -98,16 +99,17 @@ pub async fn fetch_crate_cratesio( // Download crate to temporary dir (crates.io or git?) let crate_url = format!("https://crates.io/{}", version.dl_path); - let tgz_path = temp_dir.join(format!("{name}.tgz")); - debug!("Fetching crate from: {crate_url}"); + debug!("Fetching crate from: {crate_url} and extracting Cargo.toml from it"); - // Download crate - download(&crate_url, &tgz_path).await?; + download_and_extract( + Url::parse(&crate_url)?, + PkgFmt::Tgz, + &temp_dir, + Some([Path::new("Cargo.toml").into()]), + ) + .await?; - // Decompress downloaded tgz - debug!("Decompressing crate archive"); - extract(&tgz_path, PkgFmt::Tgz, &temp_dir)?; let crate_path = temp_dir.join(format!("{name}-{version_name}")); // Return crate directory diff --git a/src/fetchers.rs b/src/fetchers.rs index 2e7b6d02..a39a0e9a 100644 --- a/src/fetchers.rs +++ b/src/fetchers.rs @@ -4,9 +4,8 @@ use std::sync::Arc; pub use gh_crate_meta::*; pub use log::debug; pub use quickinstall::*; -use tokio::task::JoinHandle; -use crate::{BinstallError, PkgFmt, PkgMeta}; +use crate::{AutoAbortJoinHandle, BinstallError, PkgFmt, PkgMeta}; mod gh_crate_meta; mod quickinstall; @@ -18,8 +17,8 @@ pub trait Fetcher: Send + Sync { where Self: Sized; - /// Fetch a package - async fn fetch(&self, dst: &Path) -> Result<(), BinstallError>; + /// Fetch a package and extract + async fn fetch_and_extract(&self, dst: &Path) -> Result<(), BinstallError>; /// Check if a package is available for download async fn check(&self) -> Result; @@ -62,14 +61,16 @@ impl MultiFetcher { .fetchers .iter() .cloned() - .map(|fetcher| ( - fetcher.clone(), - AutoAbortJoinHandle(tokio::spawn(async move { fetcher.check().await })), - )) + .map(|fetcher| { + ( + fetcher.clone(), + AutoAbortJoinHandle::new(tokio::spawn(async move { fetcher.check().await })), + ) + }) .collect(); - for (fetcher, mut handle) in handles { - match (&mut handle.0).await { + for (fetcher, handle) in handles { + match handle.await { Ok(Ok(true)) => return Some(fetcher), Ok(Ok(false)) => (), Ok(Err(err)) => { @@ -92,12 +93,3 @@ impl MultiFetcher { None } } - -#[derive(Debug)] -struct AutoAbortJoinHandle(JoinHandle>); - -impl Drop for AutoAbortJoinHandle { - fn drop(&mut self) { - self.0.abort(); - } -} diff --git a/src/fetchers/gh_crate_meta.rs b/src/fetchers/gh_crate_meta.rs index e38d7ae1..c6ed669b 100644 --- a/src/fetchers/gh_crate_meta.rs +++ b/src/fetchers/gh_crate_meta.rs @@ -7,7 +7,7 @@ use serde::Serialize; use url::Url; use super::Data; -use crate::{download, remote_exists, BinstallError, PkgFmt, Template}; +use crate::{download_and_extract, remote_exists, BinstallError, PkgFmt, Template}; pub struct GhCrateMeta { data: Data, @@ -40,10 +40,10 @@ impl super::Fetcher for GhCrateMeta { remote_exists(url, Method::HEAD).await } - async fn fetch(&self, dst: &Path) -> Result<(), BinstallError> { + async fn fetch_and_extract(&self, dst: &Path) -> Result<(), BinstallError> { let url = self.url()?; info!("Downloading package from: '{url}'"); - download(url.as_str(), dst).await + download_and_extract::<_, 0>(url, self.pkg_fmt(), dst, None).await } fn pkg_fmt(&self) -> PkgFmt { diff --git a/src/fetchers/quickinstall.rs b/src/fetchers/quickinstall.rs index e8ca052f..f048de03 100644 --- a/src/fetchers/quickinstall.rs +++ b/src/fetchers/quickinstall.rs @@ -7,7 +7,7 @@ use tokio::task::JoinHandle; use url::Url; use super::Data; -use crate::{download, remote_exists, BinstallError, PkgFmt}; +use crate::{download_and_extract, remote_exists, BinstallError, PkgFmt}; const BASE_URL: &str = "https://github.com/alsuren/cargo-quickinstall/releases/download"; const STATS_URL: &str = "https://warehouse-clerk-tmp.vercel.app/api/crate"; @@ -37,10 +37,10 @@ impl super::Fetcher for QuickInstall { remote_exists(Url::parse(&url)?, Method::HEAD).await } - async fn fetch(&self, dst: &Path) -> Result<(), BinstallError> { + async fn fetch_and_extract(&self, dst: &Path) -> Result<(), BinstallError> { let url = self.package_url(); info!("Downloading package from: '{url}'"); - download(&url, &dst).await + download_and_extract::<_, 0>(Url::parse(&url)?, self.pkg_fmt(), dst, None).await } fn pkg_fmt(&self) -> PkgFmt { diff --git a/src/helpers.rs b/src/helpers.rs index 27b22551..da0a54c0 100644 --- a/src/helpers.rs +++ b/src/helpers.rs @@ -1,23 +1,27 @@ use std::{ - fs, + borrow::Cow, io::{stderr, stdin, Write}, path::{Path, PathBuf}, }; use cargo_toml::Manifest; -use flate2::read::GzDecoder; use log::{debug, info}; use reqwest::Method; use serde::Serialize; -use tar::Archive; use tinytemplate::TinyTemplate; use url::Url; -use xz2::read::XzDecoder; -use zip::read::ZipArchive; -use zstd::stream::Decoder as ZstdDecoder; use crate::{BinstallError, Meta, PkgFmt}; +mod async_extracter; +pub use async_extracter::extract_archive_stream; + +mod auto_abort_join_handle; +pub use auto_abort_join_handle::AutoAbortJoinHandle; + +mod extracter; +mod readable_rx; + /// Load binstall metadata from the crate `Cargo.toml` at the provided path pub fn load_manifest_path>( manifest_path: P, @@ -40,9 +44,17 @@ pub async fn remote_exists(url: Url, method: Method) -> Result>(url: &str, path: P) -> Result<(), BinstallError> { - let url = Url::parse(url)?; +/// Download a file from the provided URL and extract it to the provided path +/// +/// * `desired_outputs - If Some(_) and `fmt` is not `PkgFmt::Bin` or +/// `PkgFmt::Zip`, then it will filter the tar and only extract files +/// specified in it. +pub async fn download_and_extract, const N: usize>( + url: Url, + fmt: PkgFmt, + path: P, + desired_outputs: Option<[Cow<'static, Path>; N]>, +) -> Result<(), BinstallError> { debug!("Downloading from: '{url}'"); let resp = reqwest::get(url.clone()) @@ -54,86 +66,12 @@ pub async fn download>(url: &str, path: P) -> Result<(), Binstall err, })?; - let bytes = resp.bytes().await?; - let path = path.as_ref(); - debug!("Download OK, writing to file: '{}'", path.display()); + debug!("Downloading to file: '{}'", path.display()); - fs::create_dir_all(path.parent().unwrap())?; - fs::write(&path, bytes)?; + extract_archive_stream(resp.bytes_stream(), path, fmt, desired_outputs).await?; - Ok(()) -} - -/// Extract files from the specified source onto the specified path -pub fn extract, P: AsRef>( - source: S, - fmt: PkgFmt, - path: P, -) -> Result<(), BinstallError> { - let source = source.as_ref(); - let path = path.as_ref(); - - match fmt { - PkgFmt::Tar => { - // Extract to install dir - debug!("Extracting from tar archive '{source:?}' to `{path:?}`"); - - let dat = fs::File::open(source)?; - let mut tar = Archive::new(dat); - - tar.unpack(path)?; - } - PkgFmt::Tgz => { - // Extract to install dir - debug!("Decompressing from tgz archive '{source:?}' to `{path:?}`"); - - let dat = fs::File::open(source)?; - let tar = GzDecoder::new(dat); - let mut tgz = Archive::new(tar); - - tgz.unpack(path)?; - } - PkgFmt::Txz => { - // Extract to install dir - debug!("Decompressing from txz archive '{source:?}' to `{path:?}`"); - - let dat = fs::File::open(source)?; - let tar = XzDecoder::new(dat); - let mut txz = Archive::new(tar); - - txz.unpack(path)?; - } - PkgFmt::Tzstd => { - // Extract to install dir - debug!("Decompressing from tzstd archive '{source:?}' to `{path:?}`"); - - let dat = std::fs::File::open(source)?; - - // The error can only come from raw::Decoder::with_dictionary - // as of zstd 0.10.2 and 0.11.2, which is specified - // as &[] by ZstdDecoder::new, thus ZstdDecoder::new - // should not return any error. - let tar = ZstdDecoder::new(dat)?; - let mut txz = Archive::new(tar); - - txz.unpack(path)?; - } - PkgFmt::Zip => { - // Extract to install dir - debug!("Decompressing from zip archive '{source:?}' to `{path:?}`"); - - let dat = fs::File::open(source)?; - let mut zip = ZipArchive::new(dat)?; - - zip.extract(path)?; - } - PkgFmt::Bin => { - debug!("Copying binary '{source:?}' to `{path:?}`"); - // Copy to install dir - fs::copy(source, path)?; - } - }; + debug!("Download OK, written to file: '{}'", path.display()); Ok(()) } diff --git a/src/helpers/async_extracter.rs b/src/helpers/async_extracter.rs new file mode 100644 index 00000000..4eed3276 --- /dev/null +++ b/src/helpers/async_extracter.rs @@ -0,0 +1,232 @@ +use std::borrow::Cow; +use std::fs; +use std::io::{self, Seek, Write}; +use std::path::Path; + +use bytes::Bytes; +use futures_util::stream::{Stream, StreamExt}; +use scopeguard::{guard, Always, ScopeGuard}; +use tempfile::tempfile; +use tokio::{sync::mpsc, task::spawn_blocking}; + +use super::{extracter::*, readable_rx::*, AutoAbortJoinHandle}; +use crate::{BinstallError, PkgFmt}; + +pub(crate) enum Content { + /// Data to write to file + Data(Bytes), + + /// Abort the writing and remove the file. + Abort, +} + +#[derive(Debug)] +struct AsyncExtracterInner { + /// Use AutoAbortJoinHandle so that the task + /// will be cancelled on failure. + handle: AutoAbortJoinHandle>, + tx: mpsc::Sender, +} + +impl AsyncExtracterInner { + /// * `desired_outputs - If Some(_), then it will filter the tar + /// and only extract files specified in it. + fn new( + path: &Path, + fmt: PkgFmt, + desired_outputs: Option<[Cow<'static, Path>; N]>, + ) -> Self { + let path = path.to_owned(); + let (tx, rx) = mpsc::channel::(100); + + let handle = AutoAbortJoinHandle::new(spawn_blocking(move || { + // close rx on error so that tx.send will return an error + let mut rx = guard(rx, |mut rx| { + rx.close(); + }); + + fs::create_dir_all(path.parent().unwrap())?; + + match fmt { + PkgFmt::Bin => { + let mut file = fs::File::create(&path)?; + + // remove it unless the operation isn't aborted and no write + // fails. + let remove_guard = guard(&path, |path| { + fs::remove_file(path).ok(); + }); + + Self::read_into_file(&mut file, &mut rx)?; + + // Operation isn't aborted and all writes succeed, + // disarm the remove_guard. + ScopeGuard::into_inner(remove_guard); + } + PkgFmt::Zip => { + let mut file = tempfile()?; + + Self::read_into_file(&mut file, &mut rx)?; + + // rewind it so that we can pass it to unzip + file.rewind()?; + + unzip(file, &path)?; + } + _ => extract_compressed_from_readable( + ReadableRx::new(&mut rx), + fmt, + &path, + desired_outputs.as_ref().map(|arr| &arr[..]), + )?, + } + + Ok(()) + })); + + Self { handle, tx } + } + + fn read_into_file( + file: &mut fs::File, + rx: &mut mpsc::Receiver, + ) -> Result<(), BinstallError> { + while let Some(content) = rx.blocking_recv() { + match content { + Content::Data(bytes) => file.write_all(&*bytes)?, + Content::Abort => { + return Err(io::Error::new(io::ErrorKind::Other, "Aborted").into()) + } + } + } + + file.flush()?; + + Ok(()) + } + + /// Upon error, this extracter shall not be reused. + /// Otherwise, `Self::done` would panic. + async fn feed(&mut self, bytes: Bytes) -> Result<(), BinstallError> { + if self.tx.send(Content::Data(bytes)).await.is_err() { + // task failed + Err(Self::wait(&mut self.handle).await.expect_err( + "Implementation bug: write task finished successfully before all writes are done", + )) + } else { + Ok(()) + } + } + + async fn done(mut self) -> Result<(), BinstallError> { + // Drop tx as soon as possible so that the task would wrap up what it + // was doing and flush out all the pending data. + drop(self.tx); + + Self::wait(&mut self.handle).await + } + + async fn wait( + handle: &mut AutoAbortJoinHandle>, + ) -> Result<(), BinstallError> { + match handle.await { + Ok(res) => res, + Err(join_err) => Err(io::Error::new(io::ErrorKind::Other, join_err).into()), + } + } + + fn abort(self) { + let tx = self.tx; + // If Self::write fail, then the task is already tear down, + // tx closed and no need to abort. + if !tx.is_closed() { + // Use send here because blocking_send would panic if used + // in async threads. + tokio::spawn(async move { + tx.send(Content::Abort).await.ok(); + }); + } + } +} + +/// AsyncExtracter will pass the `Bytes` you give to another thread via +/// a `mpsc` and decompress and unpack it if needed. +/// +/// After all write is done, you must call `AsyncExtracter::done`, +/// otherwise the extracted content will be removed on drop. +/// +/// # Advantages +/// +/// `download_and_extract` has the following advantages over downloading +/// plus extracting in on the same thread: +/// +/// - The code is pipelined instead of storing the downloaded file in memory +/// and extract it, except for `PkgFmt::Zip`, since `ZipArchiver::new` +/// requires `std::io::Seek`, so it fallbacks to writing the a file then +/// unzip it. +/// - The async part (downloading) and the extracting part runs in parallel +/// using `tokio::spawn_nonblocking`. +/// - Compressing/writing which takes a lot of CPU time will not block +/// the runtime anymore. +/// - For any PkgFmt except for `PkgFmt::Zip` and `PkgFmt::Bin` (basically +/// all `tar` based formats), it can extract only specified files. +/// This means that `super::drivers::fetch_crate_cratesio` no longer need to +/// extract the whole crate and write them to disk, it now only extract the +/// relevant part (`Cargo.toml`) out to disk and open it. +#[derive(Debug)] +struct AsyncExtracter(ScopeGuard); + +impl AsyncExtracter { + /// * `path` - If `fmt` is `PkgFmt::Bin`, then this is the filename + /// for the bin. + /// Otherwise, it is the directory where the extracted content will be put. + /// * `fmt` - The format of the archive to feed in. + /// * `desired_outputs - If Some(_), then it will filter the tar and + /// only extract files specified in it. + /// Note that this is a best-effort and it only works when `fmt` + /// is not `PkgFmt::Bin` or `PkgFmt::Zip`. + fn new( + path: &Path, + fmt: PkgFmt, + desired_outputs: Option<[Cow<'static, Path>; N]>, + ) -> Self { + let inner = AsyncExtracterInner::new(path, fmt, desired_outputs); + Self(guard(inner, AsyncExtracterInner::abort)) + } + + /// Upon error, this extracter shall not be reused. + /// Otherwise, `Self::done` would panic. + async fn feed(&mut self, bytes: Bytes) -> Result<(), BinstallError> { + self.0.feed(bytes).await + } + + async fn done(self) -> Result<(), BinstallError> { + ScopeGuard::into_inner(self.0).done().await + } +} + +/// * `output` - If `fmt` is `PkgFmt::Bin`, then this is the filename +/// for the bin. +/// Otherwise, it is the directory where the extracted content will be put. +/// * `fmt` - The format of the archive to feed in. +/// * `desired_outputs - If Some(_), then it will filter the tar and +/// only extract files specified in it. +/// Note that this is a best-effort and it only works when `fmt` +/// is not `PkgFmt::Bin` or `PkgFmt::Zip`. +pub async fn extract_archive_stream( + mut stream: impl Stream> + Unpin, + output: &Path, + fmt: PkgFmt, + desired_outputs: Option<[Cow<'static, Path>; N]>, +) -> Result<(), BinstallError> +where + BinstallError: From, +{ + let mut extracter = AsyncExtracter::new(output, fmt, desired_outputs); + + while let Some(res) = stream.next().await { + extracter.feed(res?).await?; + } + + extracter.done().await +} diff --git a/src/helpers/auto_abort_join_handle.rs b/src/helpers/auto_abort_join_handle.rs new file mode 100644 index 00000000..fa476a8b --- /dev/null +++ b/src/helpers/auto_abort_join_handle.rs @@ -0,0 +1,45 @@ +use std::{ + future::Future, + ops::{Deref, DerefMut}, + pin::Pin, + task::{Context, Poll}, +}; + +use tokio::task::{JoinError, JoinHandle}; + +#[derive(Debug)] +pub struct AutoAbortJoinHandle(JoinHandle); + +impl AutoAbortJoinHandle { + pub fn new(handle: JoinHandle) -> Self { + Self(handle) + } +} + +impl Drop for AutoAbortJoinHandle { + fn drop(&mut self) { + self.0.abort(); + } +} + +impl Deref for AutoAbortJoinHandle { + type Target = JoinHandle; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +impl DerefMut for AutoAbortJoinHandle { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.0 + } +} + +impl Future for AutoAbortJoinHandle { + type Output = Result; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + Pin::new(&mut Pin::into_inner(self).0).poll(cx) + } +} diff --git a/src/helpers/extracter.rs b/src/helpers/extracter.rs new file mode 100644 index 00000000..41797210 --- /dev/null +++ b/src/helpers/extracter.rs @@ -0,0 +1,99 @@ +use std::borrow::Cow; +use std::fs::File; +use std::io::Read; +use std::path::Path; + +use flate2::read::GzDecoder; +use log::debug; +use tar::Archive; +use xz2::read::XzDecoder; +use zip::read::ZipArchive; +use zstd::stream::Decoder as ZstdDecoder; + +use crate::{BinstallError, PkgFmt}; + +/// * `desired_outputs - If Some(_), then it will filter the tar +/// and only extract files specified in it. +fn untar( + dat: impl Read, + path: &Path, + desired_outputs: Option<&[Cow<'_, Path>]>, +) -> Result<(), BinstallError> { + let mut tar = Archive::new(dat); + + if let Some(desired_outputs) = desired_outputs { + for res in tar.entries()? { + let mut entry = res?; + let entry_path = entry.path()?; + + if desired_outputs.contains(&entry_path) { + let dst = path.join(entry_path); + + entry.unpack(dst)?; + } + } + } else { + tar.unpack(path)?; + } + + Ok(()) +} + +/// Extract files from the specified source onto the specified path. +/// +/// * `fmt` - must not be `PkgFmt::Bin` or `PkgFmt::Zip`. +/// * `desired_outputs - If Some(_), then it will filter the tar +/// and only extract files specified in it. +pub(crate) fn extract_compressed_from_readable( + dat: impl Read, + fmt: PkgFmt, + path: &Path, + desired_outputs: Option<&[Cow<'_, Path>]>, +) -> Result<(), BinstallError> { + match fmt { + PkgFmt::Tar => { + // Extract to install dir + debug!("Extracting from tar archive to `{path:?}`"); + + untar(dat, path, desired_outputs)? + } + PkgFmt::Tgz => { + // Extract to install dir + debug!("Decompressing from tgz archive to `{path:?}`"); + + let tar = GzDecoder::new(dat); + untar(tar, path, desired_outputs)?; + } + PkgFmt::Txz => { + // Extract to install dir + debug!("Decompressing from txz archive to `{path:?}`"); + + let tar = XzDecoder::new(dat); + untar(tar, path, desired_outputs)?; + } + PkgFmt::Tzstd => { + // Extract to install dir + debug!("Decompressing from tzstd archive to `{path:?}`"); + + // The error can only come from raw::Decoder::with_dictionary + // as of zstd 0.10.2 and 0.11.2, which is specified + // as &[] by ZstdDecoder::new, thus ZstdDecoder::new + // should not return any error. + let tar = ZstdDecoder::new(dat)?; + untar(tar, path, desired_outputs)?; + } + PkgFmt::Zip => panic!("Unexpected PkgFmt::Zip!"), + PkgFmt::Bin => panic!("Unexpected PkgFmt::Bin!"), + }; + + Ok(()) +} + +pub(crate) fn unzip(dat: File, dst: &Path) -> Result<(), BinstallError> { + debug!("Decompressing from zip archive to `{dst:?}`"); + + let mut zip = ZipArchive::new(dat)?; + zip.extract(dst)?; + + Ok(()) +} diff --git a/src/helpers/readable_rx.rs b/src/helpers/readable_rx.rs new file mode 100644 index 00000000..15aa1300 --- /dev/null +++ b/src/helpers/readable_rx.rs @@ -0,0 +1,49 @@ +use std::cmp::min; +use std::io::{self, Read}; + +use bytes::{Buf, Bytes}; +use tokio::sync::mpsc::Receiver; + +use super::async_extracter::Content; + +#[derive(Debug)] +pub(crate) struct ReadableRx<'a> { + rx: &'a mut Receiver, + bytes: Bytes, +} + +impl<'a> ReadableRx<'a> { + pub(crate) fn new(rx: &'a mut Receiver) -> Self { + Self { + rx, + bytes: Bytes::new(), + } + } +} + +impl Read for ReadableRx<'_> { + fn read(&mut self, buf: &mut [u8]) -> io::Result { + if buf.is_empty() { + return Ok(0); + } + + let bytes = &mut self.bytes; + if !bytes.has_remaining() { + match self.rx.blocking_recv() { + Some(Content::Data(new_bytes)) => *bytes = new_bytes, + Some(Content::Abort) => { + return Err(io::Error::new(io::ErrorKind::Other, "Aborted")) + } + None => return Ok(0), + } + } + + // copy_to_slice requires the bytes to have enough remaining bytes + // to fill buf. + let n = min(buf.len(), bytes.remaining()); + + bytes.copy_to_slice(&mut buf[..n]); + + Ok(n) + } +} diff --git a/src/main.rs b/src/main.rs index 9b3dafb0..55ce22bb 100644 --- a/src/main.rs +++ b/src/main.rs @@ -295,12 +295,6 @@ async fn entry() -> Result<()> { fetcher.source_name() ); - // Compute temporary directory for downloads - let pkg_path = temp_dir - .path() - .join(format!("pkg-{}.{}", opts.name, meta.pkg_fmt)); - debug!("Using temporary download path: {}", pkg_path.display()); - install_from_package( binaries, fetcher.as_ref(), @@ -308,7 +302,6 @@ async fn entry() -> Result<()> { meta, opts, package, - pkg_path, temp_dir, ) .await @@ -337,7 +330,6 @@ async fn install_from_package( mut meta: PkgMeta, opts: Options, package: Package, - pkg_path: PathBuf, temp_dir: TempDir, ) -> Result<()> { // Prompt user for third-party source @@ -361,11 +353,21 @@ async fn install_from_package( meta.bin_dir = "{ bin }{ binary-ext }".to_string(); } + let bin_path = temp_dir.path().join(format!("bin-{}", opts.name)); + debug!("Using temporary binary path: {}", bin_path.display()); + // Download package if opts.dry_run { info!("Dry run, not downloading package"); } else { - fetcher.fetch(&pkg_path).await?; + fetcher.fetch_and_extract(&bin_path).await?; + + if binaries.is_empty() { + error!("No binaries specified (or inferred from file system)"); + return Err(miette!( + "No binaries specified (or inferred from file system)" + )); + } } #[cfg(incomplete)] @@ -392,21 +394,6 @@ async fn install_from_package( } } - let bin_path = temp_dir.path().join(format!("bin-{}", opts.name)); - debug!("Using temporary binary path: {}", bin_path.display()); - - if !opts.dry_run { - // Extract files - extract(&pkg_path, fetcher.pkg_fmt(), &bin_path)?; - - if binaries.is_empty() { - error!("No binaries specified (or inferred from file system)"); - return Err(miette!( - "No binaries specified (or inferred from file system)" - )); - } - } - // List files to be installed // based on those found via Cargo.toml let bin_data = bins::Data {