From dab790deaf37479289fd491944313876137bc3e1 Mon Sep 17 00:00:00 2001 From: Jiahao XU Date: Wed, 8 Jun 2022 19:33:56 +1000 Subject: [PATCH 01/49] Enable feature "stream" of dep reqwest Signed-off-by: Jiahao XU --- Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Cargo.toml b/Cargo.toml index 7f015855..3f953a96 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -28,7 +28,7 @@ dirs = "4.0.0" flate2 = { version = "1.0.24", features = ["zlib-ng"], default-features = false } log = "0.4.14" miette = { version = "4.7.1", features = ["fancy-no-backtrace"] } -reqwest = { version = "0.11.10", features = [ "rustls-tls" ], default-features = false } +reqwest = { version = "0.11.10", features = [ "rustls-tls", "stream" ], default-features = false } semver = "1.0.7" serde = { version = "1.0.136", features = [ "derive" ] } simplelog = "0.12.0" From 3b889130138197ce2fc5aa00f05b70d46d5e690d Mon Sep 17 00:00:00 2001 From: Jiahao XU Date: Wed, 8 Jun 2022 19:37:52 +1000 Subject: [PATCH 02/49] Add new dep futures-util v0.3.21 without default feature Signed-off-by: Jiahao XU --- Cargo.toml | 1 + 1 file changed, 1 insertion(+) diff --git a/Cargo.toml b/Cargo.toml index 3f953a96..188267e1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -26,6 +26,7 @@ clap = { version = "3.1.18", features = ["derive"] } crates_io_api = { version = "0.8.0", default-features = false, features = ["rustls"] } dirs = "4.0.0" flate2 = { version = "1.0.24", features = ["zlib-ng"], default-features = false } +futures-util = { version = "0.3.21", default-features = false } log = "0.4.14" miette = { version = "4.7.1", features = ["fancy-no-backtrace"] } reqwest = { version = "0.11.10", features = [ "rustls-tls", "stream" ], default-features = false } From 2e25360e8221c21c01a446dac826f08e4e319c6c Mon Sep 17 00:00:00 2001 From: Jiahao XU Date: Wed, 8 Jun 2022 19:38:26 +1000 Subject: [PATCH 03/49] Enable feature "sync" of dep tokio Signed-off-by: Jiahao XU --- Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Cargo.toml b/Cargo.toml index 188267e1..c64e0b41 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -39,7 +39,7 @@ tar = "0.4.38" tempfile = "3.3.0" thiserror = "1.0.31" tinytemplate = "1.2.1" -tokio = { version = "1.19.1", features = [ "rt-multi-thread", "process" ], default-features = false } +tokio = { version = "1.19.1", features = [ "rt-multi-thread", "process", "sync" ], default-features = false } url = "2.2.2" xz2 = "0.1.6" From 6988264e99758c201a382afe341dd326ef2d24f4 Mon Sep 17 00:00:00 2001 From: Jiahao XU Date: Wed, 8 Jun 2022 19:38:45 +1000 Subject: [PATCH 04/49] Update `Cargo.lock` Signed-off-by: Jiahao XU --- Cargo.lock | 18 +++++++++++++++++- 1 file changed, 17 insertions(+), 1 deletion(-) diff --git a/Cargo.lock b/Cargo.lock index c4fe15c3..837f4ac3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -149,6 +149,7 @@ dependencies = [ "dirs", "env_logger", "flate2", + "futures-util", "guess_host_triple", "log", "miette", @@ -597,7 +598,7 @@ dependencies = [ "indexmap", "slab", "tokio", - "tokio-util", + "tokio-util 0.7.3", "tracing", ] @@ -1112,6 +1113,7 @@ dependencies = [ "serde_urlencoded", "tokio", "tokio-rustls", + "tokio-util 0.6.10", "url", "wasm-bindgen", "wasm-bindgen-futures", @@ -1516,6 +1518,20 @@ dependencies = [ "webpki", ] +[[package]] +name = "tokio-util" +version = "0.6.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "36943ee01a6d67977dd3f84a5a1d2efeb4ada3a1ae771cadfaa535d9d9fc6507" +dependencies = [ + "bytes", + "futures-core", + "futures-sink", + "log", + "pin-project-lite", + "tokio", +] + [[package]] name = "tokio-util" version = "0.7.3" From 33e61f544a3c8ba347ad6c9a8f6b515fd3c9955c Mon Sep 17 00:00:00 2001 From: Jiahao XU Date: Wed, 8 Jun 2022 19:43:41 +1000 Subject: [PATCH 05/49] Add new dep bytes v1.1.0 Signed-off-by: Jiahao XU --- Cargo.toml | 1 + 1 file changed, 1 insertion(+) diff --git a/Cargo.toml b/Cargo.toml index c64e0b41..94d772b0 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -20,6 +20,7 @@ pkg-fmt = "zip" [dependencies] async-trait = "0.1.56" +bytes = "1.1.0" cargo_metadata = "0.14.2" cargo_toml = "0.11.4" clap = { version = "3.1.18", features = ["derive"] } From 570febdaadbc8807e353f3eda620d54522270991 Mon Sep 17 00:00:00 2001 From: Jiahao XU Date: Wed, 8 Jun 2022 19:43:56 +1000 Subject: [PATCH 06/49] Update `Cargo.lock` Signed-off-by: Jiahao XU --- Cargo.lock | 1 + 1 file changed, 1 insertion(+) diff --git a/Cargo.lock b/Cargo.lock index 837f4ac3..b4295f60 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -142,6 +142,7 @@ name = "cargo-binstall" version = "0.9.1" dependencies = [ "async-trait", + "bytes", "cargo_metadata", "cargo_toml", "clap 3.1.18", From 5fdeea86ad45477d7470a23520b96e02b4b6c679 Mon Sep 17 00:00:00 2001 From: Jiahao XU Date: Wed, 8 Jun 2022 20:03:58 +1000 Subject: [PATCH 07/49] Impl `helpers::AsyncFileWriter` Signed-off-by: Jiahao XU --- src/helpers.rs | 50 +++++++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 49 insertions(+), 1 deletion(-) diff --git a/src/helpers.rs b/src/helpers.rs index 27b22551..55e7a8df 100644 --- a/src/helpers.rs +++ b/src/helpers.rs @@ -1,9 +1,10 @@ use std::{ fs, - io::{stderr, stdin, Write}, + io::{self, stderr, stdin, Write}, path::{Path, PathBuf}, }; +use bytes::Bytes; use cargo_toml::Manifest; use flate2::read::GzDecoder; use log::{debug, info}; @@ -11,6 +12,7 @@ use reqwest::Method; use serde::Serialize; use tar::Archive; use tinytemplate::TinyTemplate; +use tokio::{sync::mpsc, task}; use url::Url; use xz2::read::XzDecoder; use zip::read::ZipArchive; @@ -210,3 +212,49 @@ pub trait Template: Serialize { Ok(tt.render("path", self)?) } } + +#[derive(Debug)] +pub struct AsyncFileWriter { + handle: task::JoinHandle>, + tx: mpsc::Sender, +} + +impl AsyncFileWriter { + pub fn new(path: &Path) -> io::Result { + fs::create_dir_all(path.parent().unwrap())?; + + let mut file = fs::File::create(path)?; + let (tx, mut rx) = mpsc::channel::(100); + + let handle = task::spawn_blocking(move || { + while let Some(bytes) = rx.blocking_recv() { + file.write_all(&*bytes)?; + } + + rx.close(); + file.flush()?; + + Ok(()) + }); + + Ok(Self { handle, tx }) + } + + pub async fn write(&self, bytes: Bytes) { + self.tx + .send(bytes) + .await + .expect("Implementation bug: rx is closed before tx is dropped") + } + + pub async fn done(self) -> io::Result<()> { + // Drop tx as soon as possible so that the task would wrap up what it + // was doing and flush out all the pending data. + drop(self.tx); + + match self.handle.await { + Ok(res) => res, + Err(join_err) => Err(io::Error::new(io::ErrorKind::Other, join_err)), + } + } +} From 191fd6e981f83f6f3358c9089f2df4887ccfa04d Mon Sep 17 00:00:00 2001 From: Jiahao XU Date: Wed, 8 Jun 2022 20:11:01 +1000 Subject: [PATCH 08/49] Use `AsyncFileWriter` in `helpers::download` so that writing to file will not block the download. Signed-off-by: Jiahao XU --- src/helpers.rs | 17 ++++++++++++----- 1 file changed, 12 insertions(+), 5 deletions(-) diff --git a/src/helpers.rs b/src/helpers.rs index 55e7a8df..e37cf89e 100644 --- a/src/helpers.rs +++ b/src/helpers.rs @@ -7,6 +7,7 @@ use std::{ use bytes::Bytes; use cargo_toml::Manifest; use flate2::read::GzDecoder; +use futures_util::stream::StreamExt; use log::{debug, info}; use reqwest::Method; use serde::Serialize; @@ -56,13 +57,19 @@ pub async fn download>(url: &str, path: P) -> Result<(), Binstall err, })?; - let bytes = resp.bytes().await?; - let path = path.as_ref(); - debug!("Download OK, writing to file: '{}'", path.display()); + debug!("Downloading to file: '{}'", path.display()); - fs::create_dir_all(path.parent().unwrap())?; - fs::write(&path, bytes)?; + let mut bytes_stream = resp.bytes_stream(); + let writer = AsyncFileWriter::new(path)?; + + while let Some(res) = bytes_stream.next().await { + writer.write(res?).await; + } + + writer.done().await?; + + debug!("Download OK, written to file: '{}'", path.display()); Ok(()) } From ba21372134d8f9e8582afc6c78dc19a98e883f1b Mon Sep 17 00:00:00 2001 From: Jiahao XU Date: Wed, 8 Jun 2022 20:12:37 +1000 Subject: [PATCH 09/49] Add new dep scopeguard v1.1.0 Signed-off-by: Jiahao XU --- Cargo.toml | 1 + 1 file changed, 1 insertion(+) diff --git a/Cargo.toml b/Cargo.toml index 94d772b0..7cdbcc48 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -31,6 +31,7 @@ futures-util = { version = "0.3.21", default-features = false } log = "0.4.14" miette = { version = "4.7.1", features = ["fancy-no-backtrace"] } reqwest = { version = "0.11.10", features = [ "rustls-tls", "stream" ], default-features = false } +scopeguard = "1.1.0" semver = "1.0.7" serde = { version = "1.0.136", features = [ "derive" ] } simplelog = "0.12.0" From 80706dc3c4849a1b50811327191e8245f60d8254 Mon Sep 17 00:00:00 2001 From: Jiahao XU Date: Wed, 8 Jun 2022 20:12:53 +1000 Subject: [PATCH 10/49] Update `Cargo.lock` Signed-off-by: Jiahao XU --- Cargo.lock | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/Cargo.lock b/Cargo.lock index b4295f60..21877876 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -155,6 +155,7 @@ dependencies = [ "log", "miette", "reqwest", + "scopeguard", "semver", "serde", "simplelog", @@ -1177,6 +1178,12 @@ version = "1.0.10" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f3f6f92acf49d1b98f7a81226834412ada05458b7364277387724a237f062695" +[[package]] +name = "scopeguard" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d29ab0c6d3fc0ee92fe66e2d99f700eab17a8d57d1c1d3b748380fb20baa78cd" + [[package]] name = "sct" version = "0.7.0" From c7965ceb4f01681814307ef72936eb00135de7e1 Mon Sep 17 00:00:00 2001 From: Jiahao XU Date: Wed, 8 Jun 2022 20:23:03 +1000 Subject: [PATCH 11/49] Use `ScopeGuard` to auto remove file on failure Signed-off-by: Jiahao XU --- src/helpers.rs | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/src/helpers.rs b/src/helpers.rs index e37cf89e..e735c2d0 100644 --- a/src/helpers.rs +++ b/src/helpers.rs @@ -10,6 +10,7 @@ use flate2::read::GzDecoder; use futures_util::stream::StreamExt; use log::{debug, info}; use reqwest::Method; +use scopeguard::ScopeGuard; use serde::Serialize; use tar::Archive; use tinytemplate::TinyTemplate; @@ -63,11 +64,17 @@ pub async fn download>(url: &str, path: P) -> Result<(), Binstall let mut bytes_stream = resp.bytes_stream(); let writer = AsyncFileWriter::new(path)?; + let guard = scopeguard::guard(path, |path| { + fs::remove_file(path).ok(); + }); + while let Some(res) = bytes_stream.next().await { writer.write(res?).await; } writer.done().await?; + // Disarm as it is successfully downloaded and written to file. + ScopeGuard::into_inner(guard); debug!("Download OK, written to file: '{}'", path.display()); From 24d3a2af2b424c728c249de63fc8f7143838e631 Mon Sep 17 00:00:00 2001 From: Jiahao XU Date: Wed, 8 Jun 2022 20:39:39 +1000 Subject: [PATCH 12/49] Enable feature "macros" on dep tokio Signed-off-by: Jiahao XU --- Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Cargo.toml b/Cargo.toml index 7cdbcc48..bd76fd13 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -41,7 +41,7 @@ tar = "0.4.38" tempfile = "3.3.0" thiserror = "1.0.31" tinytemplate = "1.2.1" -tokio = { version = "1.19.1", features = [ "rt-multi-thread", "process", "sync" ], default-features = false } +tokio = { version = "1.19.1", features = [ "rt-multi-thread", "process", "sync", "macros" ], default-features = false } url = "2.2.2" xz2 = "0.1.6" From 90186f0b1560648584327fbe9304508ce84910cb Mon Sep 17 00:00:00 2001 From: Jiahao XU Date: Wed, 8 Jun 2022 20:40:02 +1000 Subject: [PATCH 13/49] Update `Cargo.lock` Signed-off-by: Jiahao XU --- Cargo.lock | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/Cargo.lock b/Cargo.lock index 21877876..e6e26b1a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1512,9 +1512,21 @@ dependencies = [ "pin-project-lite", "signal-hook-registry", "socket2", + "tokio-macros", "winapi", ] +[[package]] +name = "tokio-macros" +version = "1.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9724f9a975fb987ef7a3cd9be0350edcbe130698af5b8f7a631e23d42d052484" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "tokio-rustls" version = "0.23.4" From 6367bfc1e3860d3a195e9866823f6f30881953fa Mon Sep 17 00:00:00 2001 From: Jiahao XU Date: Wed, 8 Jun 2022 20:42:22 +1000 Subject: [PATCH 14/49] Fix infinite block in `AsyncFileWriter::write` Signed-off-by: Jiahao XU --- src/helpers.rs | 46 +++++++++++++++++++++++++++++++++++++--------- 1 file changed, 37 insertions(+), 9 deletions(-) diff --git a/src/helpers.rs b/src/helpers.rs index e735c2d0..ae7a432d 100644 --- a/src/helpers.rs +++ b/src/helpers.rs @@ -62,14 +62,14 @@ pub async fn download>(url: &str, path: P) -> Result<(), Binstall debug!("Downloading to file: '{}'", path.display()); let mut bytes_stream = resp.bytes_stream(); - let writer = AsyncFileWriter::new(path)?; + let mut writer = AsyncFileWriter::new(path)?; let guard = scopeguard::guard(path, |path| { fs::remove_file(path).ok(); }); while let Some(res) = bytes_stream.next().await { - writer.write(res?).await; + writer.write(res?).await?; } writer.done().await?; @@ -254,19 +254,47 @@ impl AsyncFileWriter { Ok(Self { handle, tx }) } - pub async fn write(&self, bytes: Bytes) { - self.tx - .send(bytes) - .await - .expect("Implementation bug: rx is closed before tx is dropped") + /// Upon error, this writer shall not be reused. + /// Otherwise, `Self::done` would panic. + pub async fn write(&mut self, bytes: Bytes) -> io::Result<()> { + let send_future = async { + self.tx + .send(bytes) + .await + .expect("Implementation bug: rx is closed before tx is dropped") + }; + tokio::pin!(send_future); + + let task_future = async { + Self::wait(&mut self.handle).await.map(|_| { + panic!("Implementation bug: write task finished before all writes are done") + }) + }; + tokio::pin!(task_future); + + // Use select to run them in parallel, so that if the send blocks + // the current future and the task failed with some error, the future + // returned by this function would not block forever. + tokio::select! { + // It isn't completely safe to cancel the send_future as it would + // cause us to lose our place in the queue, but if the send_future + // is cancelled, it means that the task has failed and the mpsc + // won't matter anyway. + _ = send_future => Ok(()), + res = task_future => res, + } } - pub async fn done(self) -> io::Result<()> { + pub async fn done(mut self) -> io::Result<()> { // Drop tx as soon as possible so that the task would wrap up what it // was doing and flush out all the pending data. drop(self.tx); - match self.handle.await { + Self::wait(&mut self.handle).await + } + + async fn wait(handle: &mut task::JoinHandle>) -> io::Result<()> { + match handle.await { Ok(res) => res, Err(join_err) => Err(io::Error::new(io::ErrorKind::Other, join_err)), } From 12931fc0242f09ff34dd9f97a14507e2122c8bc3 Mon Sep 17 00:00:00 2001 From: Jiahao XU Date: Wed, 8 Jun 2022 20:44:20 +1000 Subject: [PATCH 15/49] Refactor: Mv `AutoAbortJoinHandle` into `helpers` Signed-off-by: Jiahao XU --- src/fetchers.rs | 22 +++++++--------------- src/helpers.rs | 9 +++++++++ 2 files changed, 16 insertions(+), 15 deletions(-) diff --git a/src/fetchers.rs b/src/fetchers.rs index 2e7b6d02..69905c8a 100644 --- a/src/fetchers.rs +++ b/src/fetchers.rs @@ -4,9 +4,8 @@ use std::sync::Arc; pub use gh_crate_meta::*; pub use log::debug; pub use quickinstall::*; -use tokio::task::JoinHandle; -use crate::{BinstallError, PkgFmt, PkgMeta}; +use crate::{AutoAbortJoinHandle, BinstallError, PkgFmt, PkgMeta}; mod gh_crate_meta; mod quickinstall; @@ -62,10 +61,12 @@ impl MultiFetcher { .fetchers .iter() .cloned() - .map(|fetcher| ( - fetcher.clone(), - AutoAbortJoinHandle(tokio::spawn(async move { fetcher.check().await })), - )) + .map(|fetcher| { + ( + fetcher.clone(), + AutoAbortJoinHandle(tokio::spawn(async move { fetcher.check().await })), + ) + }) .collect(); for (fetcher, mut handle) in handles { @@ -92,12 +93,3 @@ impl MultiFetcher { None } } - -#[derive(Debug)] -struct AutoAbortJoinHandle(JoinHandle>); - -impl Drop for AutoAbortJoinHandle { - fn drop(&mut self) { - self.0.abort(); - } -} diff --git a/src/helpers.rs b/src/helpers.rs index ae7a432d..869bfedc 100644 --- a/src/helpers.rs +++ b/src/helpers.rs @@ -300,3 +300,12 @@ impl AsyncFileWriter { } } } + +#[derive(Debug)] +pub struct AutoAbortJoinHandle(pub task::JoinHandle); + +impl Drop for AutoAbortJoinHandle { + fn drop(&mut self) { + self.0.abort(); + } +} From d6a372a1605b68d2c692b3560c999f11f7589447 Mon Sep 17 00:00:00 2001 From: Jiahao XU Date: Wed, 8 Jun 2022 20:46:00 +1000 Subject: [PATCH 16/49] Use `AutoAbortJoinHandle` in `AsyncFileWriter` to cancel the task on failure. Signed-off-by: Jiahao XU --- src/helpers.rs | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/src/helpers.rs b/src/helpers.rs index 869bfedc..736cb74a 100644 --- a/src/helpers.rs +++ b/src/helpers.rs @@ -229,7 +229,9 @@ pub trait Template: Serialize { #[derive(Debug)] pub struct AsyncFileWriter { - handle: task::JoinHandle>, + /// Use AutoAbortJoinHandle so that the task + /// will be cancelled on failure. + handle: AutoAbortJoinHandle>, tx: mpsc::Sender, } @@ -240,7 +242,7 @@ impl AsyncFileWriter { let mut file = fs::File::create(path)?; let (tx, mut rx) = mpsc::channel::(100); - let handle = task::spawn_blocking(move || { + let handle = AutoAbortJoinHandle(task::spawn_blocking(move || { while let Some(bytes) = rx.blocking_recv() { file.write_all(&*bytes)?; } @@ -249,7 +251,7 @@ impl AsyncFileWriter { file.flush()?; Ok(()) - }); + })); Ok(Self { handle, tx }) } @@ -293,8 +295,8 @@ impl AsyncFileWriter { Self::wait(&mut self.handle).await } - async fn wait(handle: &mut task::JoinHandle>) -> io::Result<()> { - match handle.await { + async fn wait(handle: &mut AutoAbortJoinHandle>) -> io::Result<()> { + match (&mut handle.0).await { Ok(res) => res, Err(join_err) => Err(io::Error::new(io::ErrorKind::Other, join_err)), } From 52210d1a8c2ab2668a6be5e04f19e3cb1f1a7aea Mon Sep 17 00:00:00 2001 From: Jiahao XU Date: Wed, 8 Jun 2022 20:48:31 +1000 Subject: [PATCH 17/49] Impl `Deref{Mut}` for `AutoAbortJoinHandle` Signed-off-by: Jiahao XU --- src/helpers.rs | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/src/helpers.rs b/src/helpers.rs index 736cb74a..9cea2ef2 100644 --- a/src/helpers.rs +++ b/src/helpers.rs @@ -1,6 +1,7 @@ use std::{ fs, io::{self, stderr, stdin, Write}, + ops::{Deref, DerefMut}, path::{Path, PathBuf}, }; @@ -311,3 +312,17 @@ impl Drop for AutoAbortJoinHandle { self.0.abort(); } } + +impl Deref for AutoAbortJoinHandle { + type Target = task::JoinHandle; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +impl DerefMut for AutoAbortJoinHandle { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.0 + } +} From f41391a53c975ea69bad96de28f28907777a0f34 Mon Sep 17 00:00:00 2001 From: Jiahao XU Date: Wed, 8 Jun 2022 20:51:59 +1000 Subject: [PATCH 18/49] Impl `Future` for `AutoAbortJoinHandle` Signed-off-by: Jiahao XU --- src/helpers.rs | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/src/helpers.rs b/src/helpers.rs index 9cea2ef2..5a694381 100644 --- a/src/helpers.rs +++ b/src/helpers.rs @@ -1,8 +1,11 @@ use std::{ fs, + future::Future, io::{self, stderr, stdin, Write}, ops::{Deref, DerefMut}, path::{Path, PathBuf}, + pin::Pin, + task::{Context, Poll}, }; use bytes::Bytes; @@ -326,3 +329,11 @@ impl DerefMut for AutoAbortJoinHandle { &mut self.0 } } + +impl Future for AutoAbortJoinHandle { + type Output = Result; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + Pin::new(&mut Pin::into_inner(self).0).poll(cx) + } +} From d9bcca8b78a7bb1e9fe6692cb98b7e2bb2a6ecec Mon Sep 17 00:00:00 2001 From: Jiahao XU Date: Wed, 8 Jun 2022 20:53:49 +1000 Subject: [PATCH 19/49] Impl `AutoAbortJoinHandle::new` & make its field private plus change all its users to use its new APIs. Signed-off-by: Jiahao XU --- src/fetchers.rs | 6 +++--- src/helpers.rs | 12 +++++++++--- 2 files changed, 12 insertions(+), 6 deletions(-) diff --git a/src/fetchers.rs b/src/fetchers.rs index 69905c8a..e0b95063 100644 --- a/src/fetchers.rs +++ b/src/fetchers.rs @@ -64,13 +64,13 @@ impl MultiFetcher { .map(|fetcher| { ( fetcher.clone(), - AutoAbortJoinHandle(tokio::spawn(async move { fetcher.check().await })), + AutoAbortJoinHandle::new(tokio::spawn(async move { fetcher.check().await })), ) }) .collect(); - for (fetcher, mut handle) in handles { - match (&mut handle.0).await { + for (fetcher, handle) in handles { + match handle.await { Ok(Ok(true)) => return Some(fetcher), Ok(Ok(false)) => (), Ok(Err(err)) => { diff --git a/src/helpers.rs b/src/helpers.rs index 5a694381..4fbc12c7 100644 --- a/src/helpers.rs +++ b/src/helpers.rs @@ -246,7 +246,7 @@ impl AsyncFileWriter { let mut file = fs::File::create(path)?; let (tx, mut rx) = mpsc::channel::(100); - let handle = AutoAbortJoinHandle(task::spawn_blocking(move || { + let handle = AutoAbortJoinHandle::new(task::spawn_blocking(move || { while let Some(bytes) = rx.blocking_recv() { file.write_all(&*bytes)?; } @@ -300,7 +300,7 @@ impl AsyncFileWriter { } async fn wait(handle: &mut AutoAbortJoinHandle>) -> io::Result<()> { - match (&mut handle.0).await { + match handle.await { Ok(res) => res, Err(join_err) => Err(io::Error::new(io::ErrorKind::Other, join_err)), } @@ -308,7 +308,13 @@ impl AsyncFileWriter { } #[derive(Debug)] -pub struct AutoAbortJoinHandle(pub task::JoinHandle); +pub struct AutoAbortJoinHandle(task::JoinHandle); + +impl AutoAbortJoinHandle { + pub fn new(handle: task::JoinHandle) -> Self { + Self(handle) + } +} impl Drop for AutoAbortJoinHandle { fn drop(&mut self) { From e584b99240c30c77cfc8c409fafec98ed5157096 Mon Sep 17 00:00:00 2001 From: Jiahao XU Date: Wed, 8 Jun 2022 21:38:05 +1000 Subject: [PATCH 20/49] Simplify `AsyncFileWriter::write` by closing rx on err Signed-off-by: Jiahao XU --- src/helpers.rs | 40 +++++++++++++--------------------------- 1 file changed, 13 insertions(+), 27 deletions(-) diff --git a/src/helpers.rs b/src/helpers.rs index 4fbc12c7..9e51f319 100644 --- a/src/helpers.rs +++ b/src/helpers.rs @@ -244,14 +244,18 @@ impl AsyncFileWriter { fs::create_dir_all(path.parent().unwrap())?; let mut file = fs::File::create(path)?; - let (tx, mut rx) = mpsc::channel::(100); + let (tx, rx) = mpsc::channel::(100); let handle = AutoAbortJoinHandle::new(task::spawn_blocking(move || { + // close rx on error so that tx.send will return an error + let mut rx = scopeguard::guard(rx, |mut rx| { + rx.close(); + }); + while let Some(bytes) = rx.blocking_recv() { file.write_all(&*bytes)?; } - rx.close(); file.flush()?; Ok(()) @@ -263,31 +267,13 @@ impl AsyncFileWriter { /// Upon error, this writer shall not be reused. /// Otherwise, `Self::done` would panic. pub async fn write(&mut self, bytes: Bytes) -> io::Result<()> { - let send_future = async { - self.tx - .send(bytes) - .await - .expect("Implementation bug: rx is closed before tx is dropped") - }; - tokio::pin!(send_future); - - let task_future = async { - Self::wait(&mut self.handle).await.map(|_| { - panic!("Implementation bug: write task finished before all writes are done") - }) - }; - tokio::pin!(task_future); - - // Use select to run them in parallel, so that if the send blocks - // the current future and the task failed with some error, the future - // returned by this function would not block forever. - tokio::select! { - // It isn't completely safe to cancel the send_future as it would - // cause us to lose our place in the queue, but if the send_future - // is cancelled, it means that the task has failed and the mpsc - // won't matter anyway. - _ = send_future => Ok(()), - res = task_future => res, + if self.tx.send(bytes).await.is_err() { + // task failed + Err(Self::wait(&mut self.handle).await.expect_err( + "Implementation bug: write task finished successfully before all writes are done", + )) + } else { + Ok(()) } } From 6aced2ca9bee82361d822de8d7381f69211a628f Mon Sep 17 00:00:00 2001 From: Jiahao XU Date: Wed, 8 Jun 2022 21:38:44 +1000 Subject: [PATCH 21/49] Disable feature "macros" of dep tokio Signed-off-by: Jiahao XU --- Cargo.lock | 12 ------------ Cargo.toml | 2 +- 2 files changed, 1 insertion(+), 13 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index e6e26b1a..21877876 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1512,21 +1512,9 @@ dependencies = [ "pin-project-lite", "signal-hook-registry", "socket2", - "tokio-macros", "winapi", ] -[[package]] -name = "tokio-macros" -version = "1.8.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9724f9a975fb987ef7a3cd9be0350edcbe130698af5b8f7a631e23d42d052484" -dependencies = [ - "proc-macro2", - "quote", - "syn", -] - [[package]] name = "tokio-rustls" version = "0.23.4" diff --git a/Cargo.toml b/Cargo.toml index bd76fd13..7cdbcc48 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -41,7 +41,7 @@ tar = "0.4.38" tempfile = "3.3.0" thiserror = "1.0.31" tinytemplate = "1.2.1" -tokio = { version = "1.19.1", features = [ "rt-multi-thread", "process", "sync", "macros" ], default-features = false } +tokio = { version = "1.19.1", features = [ "rt-multi-thread", "process", "sync" ], default-features = false } url = "2.2.2" xz2 = "0.1.6" From 358bea5c6df4c1f03c85f56c06f7494c66898277 Mon Sep 17 00:00:00 2001 From: Jiahao XU Date: Wed, 8 Jun 2022 21:43:09 +1000 Subject: [PATCH 22/49] Refactor: Extract `AsyncFileWriter` Signed-off-by: Jiahao XU --- src/helpers.rs | 65 ++--------------------------- src/helpers/async_file_writer.rs | 70 ++++++++++++++++++++++++++++++++ 2 files changed, 73 insertions(+), 62 deletions(-) create mode 100644 src/helpers/async_file_writer.rs diff --git a/src/helpers.rs b/src/helpers.rs index 9e51f319..c31884f1 100644 --- a/src/helpers.rs +++ b/src/helpers.rs @@ -26,6 +26,9 @@ use zstd::stream::Decoder as ZstdDecoder; use crate::{BinstallError, Meta, PkgFmt}; +mod async_file_writer; +pub use async_file_writer::AsyncFileWriter; + /// Load binstall metadata from the crate `Cargo.toml` at the provided path pub fn load_manifest_path>( manifest_path: P, @@ -231,68 +234,6 @@ pub trait Template: Serialize { } } -#[derive(Debug)] -pub struct AsyncFileWriter { - /// Use AutoAbortJoinHandle so that the task - /// will be cancelled on failure. - handle: AutoAbortJoinHandle>, - tx: mpsc::Sender, -} - -impl AsyncFileWriter { - pub fn new(path: &Path) -> io::Result { - fs::create_dir_all(path.parent().unwrap())?; - - let mut file = fs::File::create(path)?; - let (tx, rx) = mpsc::channel::(100); - - let handle = AutoAbortJoinHandle::new(task::spawn_blocking(move || { - // close rx on error so that tx.send will return an error - let mut rx = scopeguard::guard(rx, |mut rx| { - rx.close(); - }); - - while let Some(bytes) = rx.blocking_recv() { - file.write_all(&*bytes)?; - } - - file.flush()?; - - Ok(()) - })); - - Ok(Self { handle, tx }) - } - - /// Upon error, this writer shall not be reused. - /// Otherwise, `Self::done` would panic. - pub async fn write(&mut self, bytes: Bytes) -> io::Result<()> { - if self.tx.send(bytes).await.is_err() { - // task failed - Err(Self::wait(&mut self.handle).await.expect_err( - "Implementation bug: write task finished successfully before all writes are done", - )) - } else { - Ok(()) - } - } - - pub async fn done(mut self) -> io::Result<()> { - // Drop tx as soon as possible so that the task would wrap up what it - // was doing and flush out all the pending data. - drop(self.tx); - - Self::wait(&mut self.handle).await - } - - async fn wait(handle: &mut AutoAbortJoinHandle>) -> io::Result<()> { - match handle.await { - Ok(res) => res, - Err(join_err) => Err(io::Error::new(io::ErrorKind::Other, join_err)), - } - } -} - #[derive(Debug)] pub struct AutoAbortJoinHandle(task::JoinHandle); diff --git a/src/helpers/async_file_writer.rs b/src/helpers/async_file_writer.rs new file mode 100644 index 00000000..21a577c7 --- /dev/null +++ b/src/helpers/async_file_writer.rs @@ -0,0 +1,70 @@ +use std::fs; +use std::io::{self, Write}; +use std::path::Path; + +use bytes::Bytes; +use tokio::{sync::mpsc, task::spawn_blocking}; + +use super::AutoAbortJoinHandle; + +#[derive(Debug)] +pub struct AsyncFileWriter { + /// Use AutoAbortJoinHandle so that the task + /// will be cancelled on failure. + handle: AutoAbortJoinHandle>, + tx: mpsc::Sender, +} + +impl AsyncFileWriter { + pub fn new(path: &Path) -> io::Result { + fs::create_dir_all(path.parent().unwrap())?; + + let mut file = fs::File::create(path)?; + let (tx, rx) = mpsc::channel::(100); + + let handle = AutoAbortJoinHandle::new(spawn_blocking(move || { + // close rx on error so that tx.send will return an error + let mut rx = scopeguard::guard(rx, |mut rx| { + rx.close(); + }); + + while let Some(bytes) = rx.blocking_recv() { + file.write_all(&*bytes)?; + } + + file.flush()?; + + Ok(()) + })); + + Ok(Self { handle, tx }) + } + + /// Upon error, this writer shall not be reused. + /// Otherwise, `Self::done` would panic. + pub async fn write(&mut self, bytes: Bytes) -> io::Result<()> { + if self.tx.send(bytes).await.is_err() { + // task failed + Err(Self::wait(&mut self.handle).await.expect_err( + "Implementation bug: write task finished successfully before all writes are done", + )) + } else { + Ok(()) + } + } + + pub async fn done(mut self) -> io::Result<()> { + // Drop tx as soon as possible so that the task would wrap up what it + // was doing and flush out all the pending data. + drop(self.tx); + + Self::wait(&mut self.handle).await + } + + async fn wait(handle: &mut AutoAbortJoinHandle>) -> io::Result<()> { + match handle.await { + Ok(res) => res, + Err(join_err) => Err(io::Error::new(io::ErrorKind::Other, join_err)), + } + } +} From 5d70f6131733a674697be0ca83dbc96cc168b864 Mon Sep 17 00:00:00 2001 From: Jiahao XU Date: Wed, 8 Jun 2022 21:46:32 +1000 Subject: [PATCH 23/49] Refactor: Extract `AutoAbortJoinHandle` Signed-off-by: Jiahao XU --- src/helpers.rs | 48 +++------------------------ src/helpers/auto_abort_join_handle.rs | 45 +++++++++++++++++++++++++ 2 files changed, 49 insertions(+), 44 deletions(-) create mode 100644 src/helpers/auto_abort_join_handle.rs diff --git a/src/helpers.rs b/src/helpers.rs index c31884f1..fd014acb 100644 --- a/src/helpers.rs +++ b/src/helpers.rs @@ -1,14 +1,9 @@ use std::{ fs, - future::Future, - io::{self, stderr, stdin, Write}, - ops::{Deref, DerefMut}, + io::{stderr, stdin, Write}, path::{Path, PathBuf}, - pin::Pin, - task::{Context, Poll}, }; -use bytes::Bytes; use cargo_toml::Manifest; use flate2::read::GzDecoder; use futures_util::stream::StreamExt; @@ -18,7 +13,6 @@ use scopeguard::ScopeGuard; use serde::Serialize; use tar::Archive; use tinytemplate::TinyTemplate; -use tokio::{sync::mpsc, task}; use url::Url; use xz2::read::XzDecoder; use zip::read::ZipArchive; @@ -29,6 +23,9 @@ use crate::{BinstallError, Meta, PkgFmt}; mod async_file_writer; pub use async_file_writer::AsyncFileWriter; +mod auto_abort_join_handle; +pub use auto_abort_join_handle::AutoAbortJoinHandle; + /// Load binstall metadata from the crate `Cargo.toml` at the provided path pub fn load_manifest_path>( manifest_path: P, @@ -233,40 +230,3 @@ pub trait Template: Serialize { Ok(tt.render("path", self)?) } } - -#[derive(Debug)] -pub struct AutoAbortJoinHandle(task::JoinHandle); - -impl AutoAbortJoinHandle { - pub fn new(handle: task::JoinHandle) -> Self { - Self(handle) - } -} - -impl Drop for AutoAbortJoinHandle { - fn drop(&mut self) { - self.0.abort(); - } -} - -impl Deref for AutoAbortJoinHandle { - type Target = task::JoinHandle; - - fn deref(&self) -> &Self::Target { - &self.0 - } -} - -impl DerefMut for AutoAbortJoinHandle { - fn deref_mut(&mut self) -> &mut Self::Target { - &mut self.0 - } -} - -impl Future for AutoAbortJoinHandle { - type Output = Result; - - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - Pin::new(&mut Pin::into_inner(self).0).poll(cx) - } -} diff --git a/src/helpers/auto_abort_join_handle.rs b/src/helpers/auto_abort_join_handle.rs new file mode 100644 index 00000000..fa476a8b --- /dev/null +++ b/src/helpers/auto_abort_join_handle.rs @@ -0,0 +1,45 @@ +use std::{ + future::Future, + ops::{Deref, DerefMut}, + pin::Pin, + task::{Context, Poll}, +}; + +use tokio::task::{JoinError, JoinHandle}; + +#[derive(Debug)] +pub struct AutoAbortJoinHandle(JoinHandle); + +impl AutoAbortJoinHandle { + pub fn new(handle: JoinHandle) -> Self { + Self(handle) + } +} + +impl Drop for AutoAbortJoinHandle { + fn drop(&mut self) { + self.0.abort(); + } +} + +impl Deref for AutoAbortJoinHandle { + type Target = JoinHandle; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +impl DerefMut for AutoAbortJoinHandle { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.0 + } +} + +impl Future for AutoAbortJoinHandle { + type Output = Result; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + Pin::new(&mut Pin::into_inner(self).0).poll(cx) + } +} From 911c52d8e1b56f64eab019558771ad36753913ab Mon Sep 17 00:00:00 2001 From: Jiahao XU Date: Wed, 8 Jun 2022 22:27:19 +1000 Subject: [PATCH 24/49] Auto remove file in `AsyncFileWriter` unless done is called. Also moves creation of the dir/file into the blocking thread to avoid blocking. Signed-off-by: Jiahao XU --- src/helpers.rs | 7 --- src/helpers/async_file_writer.rs | 84 ++++++++++++++++++++++++++------ 2 files changed, 70 insertions(+), 21 deletions(-) diff --git a/src/helpers.rs b/src/helpers.rs index fd014acb..f60df2d8 100644 --- a/src/helpers.rs +++ b/src/helpers.rs @@ -9,7 +9,6 @@ use flate2::read::GzDecoder; use futures_util::stream::StreamExt; use log::{debug, info}; use reqwest::Method; -use scopeguard::ScopeGuard; use serde::Serialize; use tar::Archive; use tinytemplate::TinyTemplate; @@ -68,17 +67,11 @@ pub async fn download>(url: &str, path: P) -> Result<(), Binstall let mut bytes_stream = resp.bytes_stream(); let mut writer = AsyncFileWriter::new(path)?; - let guard = scopeguard::guard(path, |path| { - fs::remove_file(path).ok(); - }); - while let Some(res) = bytes_stream.next().await { writer.write(res?).await?; } writer.done().await?; - // Disarm as it is successfully downloaded and written to file. - ScopeGuard::into_inner(guard); debug!("Download OK, written to file: '{}'", path.display()); diff --git a/src/helpers/async_file_writer.rs b/src/helpers/async_file_writer.rs index 21a577c7..6405c777 100644 --- a/src/helpers/async_file_writer.rs +++ b/src/helpers/async_file_writer.rs @@ -3,37 +3,60 @@ use std::io::{self, Write}; use std::path::Path; use bytes::Bytes; +use scopeguard::{guard, Always, ScopeGuard}; use tokio::{sync::mpsc, task::spawn_blocking}; use super::AutoAbortJoinHandle; +enum Content { + /// Data to write to file + Data(Bytes), + + /// Abort the writing and remove the file. + Abort, +} + #[derive(Debug)] -pub struct AsyncFileWriter { +struct AsyncFileWriterInner { /// Use AutoAbortJoinHandle so that the task /// will be cancelled on failure. handle: AutoAbortJoinHandle>, - tx: mpsc::Sender, + tx: mpsc::Sender, } -impl AsyncFileWriter { - pub fn new(path: &Path) -> io::Result { - fs::create_dir_all(path.parent().unwrap())?; - - let mut file = fs::File::create(path)?; - let (tx, rx) = mpsc::channel::(100); +impl AsyncFileWriterInner { + fn new(path: &Path) -> io::Result { + let path = path.to_owned(); + let (tx, rx) = mpsc::channel::(100); let handle = AutoAbortJoinHandle::new(spawn_blocking(move || { // close rx on error so that tx.send will return an error - let mut rx = scopeguard::guard(rx, |mut rx| { + let mut rx = guard(rx, |mut rx| { rx.close(); }); - while let Some(bytes) = rx.blocking_recv() { - file.write_all(&*bytes)?; + fs::create_dir_all(path.parent().unwrap())?; + let mut file = fs::File::create(&path)?; + + // remove it unless the operation isn't aborted and no write + // fails. + let remove_guard = guard(path, |path| { + fs::remove_file(path).ok(); + }); + + while let Some(content) = rx.blocking_recv() { + match content { + Content::Data(bytes) => file.write_all(&*bytes)?, + Content::Abort => return Err(io::Error::new(io::ErrorKind::Other, "Aborted")), + } } file.flush()?; + // Operation isn't aborted and all writes succeed, + // disarm the remove_guard. + ScopeGuard::into_inner(remove_guard); + Ok(()) })); @@ -42,8 +65,8 @@ impl AsyncFileWriter { /// Upon error, this writer shall not be reused. /// Otherwise, `Self::done` would panic. - pub async fn write(&mut self, bytes: Bytes) -> io::Result<()> { - if self.tx.send(bytes).await.is_err() { + async fn write(&mut self, bytes: Bytes) -> io::Result<()> { + if self.tx.send(Content::Data(bytes)).await.is_err() { // task failed Err(Self::wait(&mut self.handle).await.expect_err( "Implementation bug: write task finished successfully before all writes are done", @@ -53,7 +76,7 @@ impl AsyncFileWriter { } } - pub async fn done(mut self) -> io::Result<()> { + async fn done(mut self) -> io::Result<()> { // Drop tx as soon as possible so that the task would wrap up what it // was doing and flush out all the pending data. drop(self.tx); @@ -67,4 +90,37 @@ impl AsyncFileWriter { Err(join_err) => Err(io::Error::new(io::ErrorKind::Other, join_err)), } } + + fn abort(self) { + let tx = self.tx; + // If Self::write fail, then the task is already tear down, + // tx closed and no need to abort. + if !tx.is_closed() { + // Use send here because blocking_send would panic if used + // in async threads. + tokio::spawn(async move { + tx.send(Content::Abort).await.ok(); + }); + } + } +} + +/// AsyncFileWriter removes the file if `done` isn't called. +#[derive(Debug)] +pub struct AsyncFileWriter(ScopeGuard); + +impl AsyncFileWriter { + pub fn new(path: &Path) -> io::Result { + AsyncFileWriterInner::new(path).map(|inner| Self(guard(inner, AsyncFileWriterInner::abort))) + } + + /// Upon error, this writer shall not be reused. + /// Otherwise, `Self::done` would panic. + pub async fn write(&mut self, bytes: Bytes) -> io::Result<()> { + self.0.write(bytes).await + } + + pub async fn done(self) -> io::Result<()> { + ScopeGuard::into_inner(self.0).done().await + } } From 894f9b49f9ecd861d0e734b442c6be8d8042a773 Mon Sep 17 00:00:00 2001 From: Jiahao XU Date: Wed, 8 Jun 2022 22:29:04 +1000 Subject: [PATCH 25/49] Simplify `AsyncFileWriter::new`: Ret `Self` instead of `io::Result` Signed-off-by: Jiahao XU --- src/helpers.rs | 2 +- src/helpers/async_file_writer.rs | 9 +++++---- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/src/helpers.rs b/src/helpers.rs index f60df2d8..8ee87f9a 100644 --- a/src/helpers.rs +++ b/src/helpers.rs @@ -65,7 +65,7 @@ pub async fn download>(url: &str, path: P) -> Result<(), Binstall debug!("Downloading to file: '{}'", path.display()); let mut bytes_stream = resp.bytes_stream(); - let mut writer = AsyncFileWriter::new(path)?; + let mut writer = AsyncFileWriter::new(path); while let Some(res) = bytes_stream.next().await { writer.write(res?).await?; diff --git a/src/helpers/async_file_writer.rs b/src/helpers/async_file_writer.rs index 6405c777..12f69133 100644 --- a/src/helpers/async_file_writer.rs +++ b/src/helpers/async_file_writer.rs @@ -25,7 +25,7 @@ struct AsyncFileWriterInner { } impl AsyncFileWriterInner { - fn new(path: &Path) -> io::Result { + fn new(path: &Path) -> Self { let path = path.to_owned(); let (tx, rx) = mpsc::channel::(100); @@ -60,7 +60,7 @@ impl AsyncFileWriterInner { Ok(()) })); - Ok(Self { handle, tx }) + Self { handle, tx } } /// Upon error, this writer shall not be reused. @@ -110,8 +110,9 @@ impl AsyncFileWriterInner { pub struct AsyncFileWriter(ScopeGuard); impl AsyncFileWriter { - pub fn new(path: &Path) -> io::Result { - AsyncFileWriterInner::new(path).map(|inner| Self(guard(inner, AsyncFileWriterInner::abort))) + pub fn new(path: &Path) -> Self { + let inner = AsyncFileWriterInner::new(path); + Self(guard(inner, AsyncFileWriterInner::abort)) } /// Upon error, this writer shall not be reused. From 6bc04340b64587346d78f7c7422ac3f666e69ed1 Mon Sep 17 00:00:00 2001 From: Jiahao XU Date: Wed, 8 Jun 2022 22:44:29 +1000 Subject: [PATCH 26/49] Impl `extract_compressed_from_readable` and `unzip` in new mod `extracter` Signed-off-by: Jiahao XU --- src/helpers.rs | 2 ++ src/helpers/extracter.rs | 76 ++++++++++++++++++++++++++++++++++++++++ 2 files changed, 78 insertions(+) create mode 100644 src/helpers/extracter.rs diff --git a/src/helpers.rs b/src/helpers.rs index 8ee87f9a..9ec2bcb7 100644 --- a/src/helpers.rs +++ b/src/helpers.rs @@ -25,6 +25,8 @@ pub use async_file_writer::AsyncFileWriter; mod auto_abort_join_handle; pub use auto_abort_join_handle::AutoAbortJoinHandle; +mod extracter; + /// Load binstall metadata from the crate `Cargo.toml` at the provided path pub fn load_manifest_path>( manifest_path: P, diff --git a/src/helpers/extracter.rs b/src/helpers/extracter.rs new file mode 100644 index 00000000..77ad3665 --- /dev/null +++ b/src/helpers/extracter.rs @@ -0,0 +1,76 @@ +use std::fs::File; +use std::io::Read; +use std::path::Path; + +use flate2::read::GzDecoder; +use log::debug; +use tar::Archive; +use xz2::read::XzDecoder; +use zip::read::ZipArchive; +use zstd::stream::Decoder as ZstdDecoder; + +use crate::{BinstallError, PkgFmt}; + +/// Extract files from the specified source onto the specified path. +/// +/// * `fmt` - must not be `PkgFmt::Bin` or `PkgFmt::Zip`. +pub fn extract_compressed_from_readable( + dat: impl Read, + fmt: PkgFmt, + path: &Path, +) -> Result<(), BinstallError> { + match fmt { + PkgFmt::Tar => { + // Extract to install dir + debug!("Extracting from tar archive to `{path:?}`"); + + let mut tar = Archive::new(dat); + + tar.unpack(path)?; + } + PkgFmt::Tgz => { + // Extract to install dir + debug!("Decompressing from tgz archive to `{path:?}`"); + + let tar = GzDecoder::new(dat); + let mut tgz = Archive::new(tar); + + tgz.unpack(path)?; + } + PkgFmt::Txz => { + // Extract to install dir + debug!("Decompressing from txz archive to `{path:?}`"); + + let tar = XzDecoder::new(dat); + let mut txz = Archive::new(tar); + + txz.unpack(path)?; + } + PkgFmt::Tzstd => { + // Extract to install dir + debug!("Decompressing from tzstd archive to `{path:?}`"); + + // The error can only come from raw::Decoder::with_dictionary + // as of zstd 0.10.2 and 0.11.2, which is specified + // as &[] by ZstdDecoder::new, thus ZstdDecoder::new + // should not return any error. + let tar = ZstdDecoder::new(dat)?; + let mut txz = Archive::new(tar); + + txz.unpack(path)?; + } + PkgFmt::Zip => panic!("Unexpected PkgFmt::Zip!"), + PkgFmt::Bin => panic!("Unexpected PkgFmt::Bin!"), + }; + + Ok(()) +} + +pub fn unzip(dat: File, dst: &Path) -> Result<(), BinstallError> { + debug!("Decompressing from zip archive to `{dst:?}`"); + + let mut zip = ZipArchive::new(dat)?; + zip.extract(dst)?; + + Ok(()) +} From f211788052675e1c8efd976304f93229c9a0c8c9 Mon Sep 17 00:00:00 2001 From: Jiahao XU Date: Wed, 8 Jun 2022 23:04:14 +1000 Subject: [PATCH 27/49] Impl `ReadableRx` in mod `receiver_as_readable` Signed-off-by: Jiahao XU --- src/helpers.rs | 1 + src/helpers/async_file_writer.rs | 2 +- src/helpers/receiver_as_readable.rs | 49 +++++++++++++++++++++++++++++ 3 files changed, 51 insertions(+), 1 deletion(-) create mode 100644 src/helpers/receiver_as_readable.rs diff --git a/src/helpers.rs b/src/helpers.rs index 9ec2bcb7..e9351147 100644 --- a/src/helpers.rs +++ b/src/helpers.rs @@ -26,6 +26,7 @@ mod auto_abort_join_handle; pub use auto_abort_join_handle::AutoAbortJoinHandle; mod extracter; +mod receiver_as_readable; /// Load binstall metadata from the crate `Cargo.toml` at the provided path pub fn load_manifest_path>( diff --git a/src/helpers/async_file_writer.rs b/src/helpers/async_file_writer.rs index 12f69133..a6bc1fc8 100644 --- a/src/helpers/async_file_writer.rs +++ b/src/helpers/async_file_writer.rs @@ -8,7 +8,7 @@ use tokio::{sync::mpsc, task::spawn_blocking}; use super::AutoAbortJoinHandle; -enum Content { +pub enum Content { /// Data to write to file Data(Bytes), diff --git a/src/helpers/receiver_as_readable.rs b/src/helpers/receiver_as_readable.rs new file mode 100644 index 00000000..02ccc344 --- /dev/null +++ b/src/helpers/receiver_as_readable.rs @@ -0,0 +1,49 @@ +use std::cmp::min; +use std::io::{self, Read}; + +use bytes::{Buf, Bytes}; +use tokio::sync::mpsc::Receiver; + +use super::async_file_writer::Content; + +#[derive(Debug)] +pub struct ReadableRx<'a> { + rx: &'a mut Receiver, + bytes: Bytes, +} + +impl<'a> ReadableRx<'a> { + pub fn new(rx: &'a mut Receiver) -> Self { + Self { + rx, + bytes: Bytes::new(), + } + } +} + +impl Read for ReadableRx<'_> { + fn read(&mut self, buf: &mut [u8]) -> io::Result { + if buf.is_empty() { + return Ok(0); + } + + let bytes = &mut self.bytes; + if !bytes.has_remaining() { + match self.rx.blocking_recv() { + Some(Content::Data(new_bytes)) => *bytes = new_bytes, + Some(Content::Abort) => { + return Err(io::Error::new(io::ErrorKind::Other, "Aborted")) + } + None => return Ok(0), + } + } + + // copy_to_slice requires the bytes to have enough remaining bytes + // to fill buf. + let n = min(buf.len(), bytes.remaining()); + + bytes.copy_to_slice(&mut buf[..n]); + + Ok(n) + } +} From 59544e8b555c7f56a94b022600dbf7d9a7b04059 Mon Sep 17 00:00:00 2001 From: Jiahao XU Date: Thu, 9 Jun 2022 01:01:37 +1000 Subject: [PATCH 28/49] Use `BinstallError` in `AsyncFileWriter` Signed-off-by: Jiahao XU --- src/helpers/async_file_writer.rs | 21 +++++++++++++-------- 1 file changed, 13 insertions(+), 8 deletions(-) diff --git a/src/helpers/async_file_writer.rs b/src/helpers/async_file_writer.rs index a6bc1fc8..ad549558 100644 --- a/src/helpers/async_file_writer.rs +++ b/src/helpers/async_file_writer.rs @@ -7,6 +7,7 @@ use scopeguard::{guard, Always, ScopeGuard}; use tokio::{sync::mpsc, task::spawn_blocking}; use super::AutoAbortJoinHandle; +use crate::{BinstallError, PkgFmt}; pub enum Content { /// Data to write to file @@ -20,7 +21,7 @@ pub enum Content { struct AsyncFileWriterInner { /// Use AutoAbortJoinHandle so that the task /// will be cancelled on failure. - handle: AutoAbortJoinHandle>, + handle: AutoAbortJoinHandle>, tx: mpsc::Sender, } @@ -47,7 +48,9 @@ impl AsyncFileWriterInner { while let Some(content) = rx.blocking_recv() { match content { Content::Data(bytes) => file.write_all(&*bytes)?, - Content::Abort => return Err(io::Error::new(io::ErrorKind::Other, "Aborted")), + Content::Abort => { + return Err(io::Error::new(io::ErrorKind::Other, "Aborted").into()) + } } } @@ -65,7 +68,7 @@ impl AsyncFileWriterInner { /// Upon error, this writer shall not be reused. /// Otherwise, `Self::done` would panic. - async fn write(&mut self, bytes: Bytes) -> io::Result<()> { + async fn write(&mut self, bytes: Bytes) -> Result<(), BinstallError> { if self.tx.send(Content::Data(bytes)).await.is_err() { // task failed Err(Self::wait(&mut self.handle).await.expect_err( @@ -76,7 +79,7 @@ impl AsyncFileWriterInner { } } - async fn done(mut self) -> io::Result<()> { + async fn done(mut self) -> Result<(), BinstallError> { // Drop tx as soon as possible so that the task would wrap up what it // was doing and flush out all the pending data. drop(self.tx); @@ -84,10 +87,12 @@ impl AsyncFileWriterInner { Self::wait(&mut self.handle).await } - async fn wait(handle: &mut AutoAbortJoinHandle>) -> io::Result<()> { + async fn wait( + handle: &mut AutoAbortJoinHandle>, + ) -> Result<(), BinstallError> { match handle.await { Ok(res) => res, - Err(join_err) => Err(io::Error::new(io::ErrorKind::Other, join_err)), + Err(join_err) => Err(io::Error::new(io::ErrorKind::Other, join_err).into()), } } @@ -117,11 +122,11 @@ impl AsyncFileWriter { /// Upon error, this writer shall not be reused. /// Otherwise, `Self::done` would panic. - pub async fn write(&mut self, bytes: Bytes) -> io::Result<()> { + pub async fn write(&mut self, bytes: Bytes) -> Result<(), BinstallError> { self.0.write(bytes).await } - pub async fn done(self) -> io::Result<()> { + pub async fn done(self) -> Result<(), BinstallError> { ScopeGuard::into_inner(self.0).done().await } } From 58c775a6485c7a2b83802e88c73c0ac1290a50a7 Mon Sep 17 00:00:00 2001 From: Jiahao XU Date: Thu, 9 Jun 2022 01:02:53 +1000 Subject: [PATCH 29/49] Rename mod `receiver_as_readable` to `readable_rx` Signed-off-by: Jiahao XU --- src/helpers.rs | 2 +- src/helpers/{receiver_as_readable.rs => readable_rx.rs} | 0 2 files changed, 1 insertion(+), 1 deletion(-) rename src/helpers/{receiver_as_readable.rs => readable_rx.rs} (100%) diff --git a/src/helpers.rs b/src/helpers.rs index e9351147..9e981189 100644 --- a/src/helpers.rs +++ b/src/helpers.rs @@ -26,7 +26,7 @@ mod auto_abort_join_handle; pub use auto_abort_join_handle::AutoAbortJoinHandle; mod extracter; -mod receiver_as_readable; +mod readable_rx; /// Load binstall metadata from the crate `Cargo.toml` at the provided path pub fn load_manifest_path>( diff --git a/src/helpers/receiver_as_readable.rs b/src/helpers/readable_rx.rs similarity index 100% rename from src/helpers/receiver_as_readable.rs rename to src/helpers/readable_rx.rs From c3b5cb11c292d12ea714131077e6b6d9b9032949 Mon Sep 17 00:00:00 2001 From: Jiahao XU Date: Thu, 9 Jun 2022 01:20:37 +1000 Subject: [PATCH 30/49] Support for any `PkgFmt` in `AsyncFileWriter` Signed-off-by: Jiahao XU --- src/helpers.rs | 2 +- src/helpers/async_file_writer.rs | 72 ++++++++++++++++++++++---------- 2 files changed, 50 insertions(+), 24 deletions(-) diff --git a/src/helpers.rs b/src/helpers.rs index 9e981189..8473cfe3 100644 --- a/src/helpers.rs +++ b/src/helpers.rs @@ -68,7 +68,7 @@ pub async fn download>(url: &str, path: P) -> Result<(), Binstall debug!("Downloading to file: '{}'", path.display()); let mut bytes_stream = resp.bytes_stream(); - let mut writer = AsyncFileWriter::new(path); + let mut writer = AsyncFileWriter::new(path, PkgFmt::Bin); while let Some(res) = bytes_stream.next().await { writer.write(res?).await?; diff --git a/src/helpers/async_file_writer.rs b/src/helpers/async_file_writer.rs index ad549558..eacb20bf 100644 --- a/src/helpers/async_file_writer.rs +++ b/src/helpers/async_file_writer.rs @@ -1,12 +1,13 @@ use std::fs; -use std::io::{self, Write}; +use std::io::{self, Seek, Write}; use std::path::Path; use bytes::Bytes; use scopeguard::{guard, Always, ScopeGuard}; +use tempfile::tempfile; use tokio::{sync::mpsc, task::spawn_blocking}; -use super::AutoAbortJoinHandle; +use super::{extracter::*, readable_rx::*, AutoAbortJoinHandle}; use crate::{BinstallError, PkgFmt}; pub enum Content { @@ -26,7 +27,7 @@ struct AsyncFileWriterInner { } impl AsyncFileWriterInner { - fn new(path: &Path) -> Self { + fn new(path: &Path, fmt: PkgFmt) -> Self { let path = path.to_owned(); let (tx, rx) = mpsc::channel::(100); @@ -37,35 +38,60 @@ impl AsyncFileWriterInner { }); fs::create_dir_all(path.parent().unwrap())?; - let mut file = fs::File::create(&path)?; - // remove it unless the operation isn't aborted and no write - // fails. - let remove_guard = guard(path, |path| { - fs::remove_file(path).ok(); - }); + match fmt { + PkgFmt::Bin => { + let mut file = fs::File::create(&path)?; - while let Some(content) = rx.blocking_recv() { - match content { - Content::Data(bytes) => file.write_all(&*bytes)?, - Content::Abort => { - return Err(io::Error::new(io::ErrorKind::Other, "Aborted").into()) - } + // remove it unless the operation isn't aborted and no write + // fails. + let remove_guard = guard(&path, |path| { + fs::remove_file(path).ok(); + }); + + Self::read_into_file(&mut file, &mut rx)?; + + // Operation isn't aborted and all writes succeed, + // disarm the remove_guard. + ScopeGuard::into_inner(remove_guard); } + PkgFmt::Zip => { + let mut file = tempfile()?; + + Self::read_into_file(&mut file, &mut rx)?; + + // rewind it so that we can pass it to unzip + file.rewind()?; + + unzip(file, &path)?; + } + _ => extract_compressed_from_readable(ReadableRx::new(&mut rx), fmt, &path)?, } - file.flush()?; - - // Operation isn't aborted and all writes succeed, - // disarm the remove_guard. - ScopeGuard::into_inner(remove_guard); - Ok(()) })); Self { handle, tx } } + fn read_into_file( + file: &mut fs::File, + rx: &mut mpsc::Receiver, + ) -> Result<(), BinstallError> { + while let Some(content) = rx.blocking_recv() { + match content { + Content::Data(bytes) => file.write_all(&*bytes)?, + Content::Abort => { + return Err(io::Error::new(io::ErrorKind::Other, "Aborted").into()) + } + } + } + + file.flush()?; + + Ok(()) + } + /// Upon error, this writer shall not be reused. /// Otherwise, `Self::done` would panic. async fn write(&mut self, bytes: Bytes) -> Result<(), BinstallError> { @@ -115,8 +141,8 @@ impl AsyncFileWriterInner { pub struct AsyncFileWriter(ScopeGuard); impl AsyncFileWriter { - pub fn new(path: &Path) -> Self { - let inner = AsyncFileWriterInner::new(path); + pub fn new(path: &Path, fmt: PkgFmt) -> Self { + let inner = AsyncFileWriterInner::new(path, fmt); Self(guard(inner, AsyncFileWriterInner::abort)) } From 784d1f0bf6ce11a989fbf9f6321ca01a4a46082c Mon Sep 17 00:00:00 2001 From: Jiahao XU Date: Thu, 9 Jun 2022 01:32:38 +1000 Subject: [PATCH 31/49] Impl new fn `helpers::download_and_extract` Signed-off-by: Jiahao XU --- src/helpers.rs | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/src/helpers.rs b/src/helpers.rs index 8473cfe3..3eac6bc6 100644 --- a/src/helpers.rs +++ b/src/helpers.rs @@ -53,6 +53,15 @@ pub async fn remote_exists(url: Url, method: Method) -> Result>(url: &str, path: P) -> Result<(), BinstallError> { let url = Url::parse(url)?; + download_and_extract(url, PkgFmt::Bin, path.as_ref()).await +} + +/// Download a file from the provided URL and extract it to the provided path +pub async fn download_and_extract>( + url: Url, + fmt: PkgFmt, + path: P, +) -> Result<(), BinstallError> { debug!("Downloading from: '{url}'"); let resp = reqwest::get(url.clone()) @@ -68,7 +77,7 @@ pub async fn download>(url: &str, path: P) -> Result<(), Binstall debug!("Downloading to file: '{}'", path.display()); let mut bytes_stream = resp.bytes_stream(); - let mut writer = AsyncFileWriter::new(path, PkgFmt::Bin); + let mut writer = AsyncFileWriter::new(path, fmt); while let Some(res) = bytes_stream.next().await { writer.write(res?).await?; From 441e004ef19148cc8e0643779448b885248b8ddb Mon Sep 17 00:00:00 2001 From: Jiahao XU Date: Thu, 9 Jun 2022 01:42:13 +1000 Subject: [PATCH 32/49] Fix `test::parse_meta` in `lib.rs` Signed-off-by: Jiahao XU --- src/lib.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/lib.rs b/src/lib.rs index d83d7887..0ad83f42 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -168,7 +168,7 @@ mod test { &[Product { name: Some("cargo-binstall".to_string()), path: Some("src/main.rs".to_string()), - edition: Some(cargo_toml::Edition::E2018), + edition: Some(cargo_toml::Edition::E2021), ..Default::default() },], ); From 2ea03f6b29a5a0e9945704b027495cdde22b503f Mon Sep 17 00:00:00 2001 From: Jiahao XU Date: Thu, 9 Jun 2022 01:44:31 +1000 Subject: [PATCH 33/49] Add new workflow "ci" to run `cargo-test` Signed-off-by: Jiahao XU --- .github/workflows/ci.yml | 32 ++++++++++++++++++++++++++++++++ 1 file changed, 32 insertions(+) create mode 100644 .github/workflows/ci.yml diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml new file mode 100644 index 00000000..13143e9c --- /dev/null +++ b/.github/workflows/ci.yml @@ -0,0 +1,32 @@ +name: Rust + +on: + push: + pull_request: + +env: + CARGO_TERM_COLOR: always + +jobs: + test: + name: test + runs-on: ubuntu-latest + + steps: + - uses: actions/checkout@v2 + - name: Configure toolchain + uses: actions-rs/toolchain@v1 + with: + toolchain: stable + override: true + - name: Configure caching + uses: actions/cache@v2 + with: + key: ${{ matrix.os }}-${{ matrix.target }}-testing + path: | + ${{ env.HOME }}/.cargo + target + - name: test + uses: actions-rs/cargo@v1 + with: + command: test From 4b6b3e667c8225e4d50d37da42f1d22d8756670d Mon Sep 17 00:00:00 2001 From: Jiahao XU Date: Thu, 9 Jun 2022 01:48:42 +1000 Subject: [PATCH 34/49] Fix syntax err in workflow "ci" Signed-off-by: Jiahao XU --- .github/workflows/ci.yml | 37 ++++++++++++++++++------------------- 1 file changed, 18 insertions(+), 19 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 13143e9c..7d8ae7d0 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -11,22 +11,21 @@ jobs: test: name: test runs-on: ubuntu-latest - - steps: - - uses: actions/checkout@v2 - - name: Configure toolchain - uses: actions-rs/toolchain@v1 - with: - toolchain: stable - override: true - - name: Configure caching - uses: actions/cache@v2 - with: - key: ${{ matrix.os }}-${{ matrix.target }}-testing - path: | - ${{ env.HOME }}/.cargo - target - - name: test - uses: actions-rs/cargo@v1 - with: - command: test + steps: + - uses: actions/checkout@v2 + - name: Configure toolchain + uses: actions-rs/toolchain@v1 + with: + toolchain: stable + override: true + - name: Configure caching + uses: actions/cache@v2 + with: + key: ${{ matrix.os }}-${{ matrix.target }}-testing + path: | + ${{ env.HOME }}/.cargo + target + - name: test + uses: actions-rs/cargo@v1 + with: + command: test From 1c40848f5115b5423f5811c423154c214ed7f528 Mon Sep 17 00:00:00 2001 From: Jiahao XU Date: Thu, 9 Jun 2022 01:52:30 +1000 Subject: [PATCH 35/49] Rm `ci.yml` Signed-off-by: Jiahao XU --- .github/workflows/ci.yml | 31 ------------------------------- 1 file changed, 31 deletions(-) delete mode 100644 .github/workflows/ci.yml diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml deleted file mode 100644 index 7d8ae7d0..00000000 --- a/.github/workflows/ci.yml +++ /dev/null @@ -1,31 +0,0 @@ -name: Rust - -on: - push: - pull_request: - -env: - CARGO_TERM_COLOR: always - -jobs: - test: - name: test - runs-on: ubuntu-latest - steps: - - uses: actions/checkout@v2 - - name: Configure toolchain - uses: actions-rs/toolchain@v1 - with: - toolchain: stable - override: true - - name: Configure caching - uses: actions/cache@v2 - with: - key: ${{ matrix.os }}-${{ matrix.target }}-testing - path: | - ${{ env.HOME }}/.cargo - target - - name: test - uses: actions-rs/cargo@v1 - with: - command: test From cc13a23b076ec10fbe3c07a6a7d5955c3d1021c0 Mon Sep 17 00:00:00 2001 From: Jiahao XU Date: Thu, 9 Jun 2022 12:58:39 +1000 Subject: [PATCH 36/49] Mark all internal types & fn as `pub(crate)` Signed-off-by: Jiahao XU --- src/helpers/async_file_writer.rs | 2 +- src/helpers/extracter.rs | 4 ++-- src/helpers/readable_rx.rs | 4 ++-- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/src/helpers/async_file_writer.rs b/src/helpers/async_file_writer.rs index eacb20bf..ab90f8a3 100644 --- a/src/helpers/async_file_writer.rs +++ b/src/helpers/async_file_writer.rs @@ -10,7 +10,7 @@ use tokio::{sync::mpsc, task::spawn_blocking}; use super::{extracter::*, readable_rx::*, AutoAbortJoinHandle}; use crate::{BinstallError, PkgFmt}; -pub enum Content { +pub(crate) enum Content { /// Data to write to file Data(Bytes), diff --git a/src/helpers/extracter.rs b/src/helpers/extracter.rs index 77ad3665..c851b0c8 100644 --- a/src/helpers/extracter.rs +++ b/src/helpers/extracter.rs @@ -14,7 +14,7 @@ use crate::{BinstallError, PkgFmt}; /// Extract files from the specified source onto the specified path. /// /// * `fmt` - must not be `PkgFmt::Bin` or `PkgFmt::Zip`. -pub fn extract_compressed_from_readable( +pub(crate) fn extract_compressed_from_readable( dat: impl Read, fmt: PkgFmt, path: &Path, @@ -66,7 +66,7 @@ pub fn extract_compressed_from_readable( Ok(()) } -pub fn unzip(dat: File, dst: &Path) -> Result<(), BinstallError> { +pub(crate) fn unzip(dat: File, dst: &Path) -> Result<(), BinstallError> { debug!("Decompressing from zip archive to `{dst:?}`"); let mut zip = ZipArchive::new(dat)?; diff --git a/src/helpers/readable_rx.rs b/src/helpers/readable_rx.rs index 02ccc344..8b387ab6 100644 --- a/src/helpers/readable_rx.rs +++ b/src/helpers/readable_rx.rs @@ -7,13 +7,13 @@ use tokio::sync::mpsc::Receiver; use super::async_file_writer::Content; #[derive(Debug)] -pub struct ReadableRx<'a> { +pub(crate) struct ReadableRx<'a> { rx: &'a mut Receiver, bytes: Bytes, } impl<'a> ReadableRx<'a> { - pub fn new(rx: &'a mut Receiver) -> Self { + pub(crate) fn new(rx: &'a mut Receiver) -> Self { Self { rx, bytes: Bytes::new(), From be5e8616a230038f6d0c360d17bcd31702344a34 Mon Sep 17 00:00:00 2001 From: Jiahao XU Date: Thu, 9 Jun 2022 13:15:23 +1000 Subject: [PATCH 37/49] Impl new fn `helpers::extracter::untar` Signed-off-by: Jiahao XU --- src/helpers/extracter.rs | 26 ++++++++++++++++++++++++++ 1 file changed, 26 insertions(+) diff --git a/src/helpers/extracter.rs b/src/helpers/extracter.rs index c851b0c8..d0e2ea6e 100644 --- a/src/helpers/extracter.rs +++ b/src/helpers/extracter.rs @@ -1,3 +1,4 @@ +use std::borrow::Cow; use std::fs::File; use std::io::Read; use std::path::Path; @@ -11,6 +12,31 @@ use zstd::stream::Decoder as ZstdDecoder; use crate::{BinstallError, PkgFmt}; +fn untar( + dat: impl Read, + path: &Path, + desired_outputs: Option<&[Cow<'_, Path>]>, +) -> Result<(), BinstallError> { + let mut tar = Archive::new(dat); + + if let Some(desired_outputs) = desired_outputs { + for res in tar.entries()? { + let mut entry = res?; + let entry_path = entry.path()?; + + if desired_outputs.contains(&entry_path) { + let dst = path.join(entry_path); + + entry.unpack(dst)?; + } + } + } else { + tar.unpack(path)?; + } + + Ok(()) +} + /// Extract files from the specified source onto the specified path. /// /// * `fmt` - must not be `PkgFmt::Bin` or `PkgFmt::Zip`. From 72983e41138426755c1e4500319440995ce9996c Mon Sep 17 00:00:00 2001 From: Jiahao XU Date: Thu, 9 Jun 2022 13:26:47 +1000 Subject: [PATCH 38/49] Use `untar` in `extract_compressed_from_readable` So that we can specify the files we want to extract to avoid io and save disk usage. Signed-off-by: Jiahao XU --- src/helpers.rs | 12 +++++++++--- src/helpers/async_file_writer.rs | 27 +++++++++++++++++++++++---- src/helpers/extracter.rs | 21 +++++++++------------ 3 files changed, 41 insertions(+), 19 deletions(-) diff --git a/src/helpers.rs b/src/helpers.rs index 3eac6bc6..614819ab 100644 --- a/src/helpers.rs +++ b/src/helpers.rs @@ -1,4 +1,5 @@ use std::{ + borrow::Cow, fs, io::{stderr, stdin, Write}, path::{Path, PathBuf}, @@ -53,14 +54,19 @@ pub async fn remote_exists(url: Url, method: Method) -> Result>(url: &str, path: P) -> Result<(), BinstallError> { let url = Url::parse(url)?; - download_and_extract(url, PkgFmt::Bin, path.as_ref()).await + download_and_extract::<_, 0>(url, PkgFmt::Bin, path.as_ref(), None).await } /// Download a file from the provided URL and extract it to the provided path -pub async fn download_and_extract>( +/// +/// * `desired_outputs - If Some(_) and `fmt` is not `PkgFmt::Bin` or +/// `PkgFmt::Zip`, then it will filter the tar and only extract files +/// specified in it. +pub async fn download_and_extract, const N: usize>( url: Url, fmt: PkgFmt, path: P, + desired_outputs: Option<[Cow<'static, Path>; N]>, ) -> Result<(), BinstallError> { debug!("Downloading from: '{url}'"); @@ -77,7 +83,7 @@ pub async fn download_and_extract>( debug!("Downloading to file: '{}'", path.display()); let mut bytes_stream = resp.bytes_stream(); - let mut writer = AsyncFileWriter::new(path, fmt); + let mut writer = AsyncFileWriter::new(path, fmt, desired_outputs); while let Some(res) = bytes_stream.next().await { writer.write(res?).await?; diff --git a/src/helpers/async_file_writer.rs b/src/helpers/async_file_writer.rs index ab90f8a3..755cde72 100644 --- a/src/helpers/async_file_writer.rs +++ b/src/helpers/async_file_writer.rs @@ -1,3 +1,4 @@ +use std::borrow::Cow; use std::fs; use std::io::{self, Seek, Write}; use std::path::Path; @@ -27,7 +28,13 @@ struct AsyncFileWriterInner { } impl AsyncFileWriterInner { - fn new(path: &Path, fmt: PkgFmt) -> Self { + /// * `desired_outputs - If Some(_), then it will filter the tar + /// and only extract files specified in it. + fn new( + path: &Path, + fmt: PkgFmt, + desired_outputs: Option<[Cow<'static, Path>; N]>, + ) -> Self { let path = path.to_owned(); let (tx, rx) = mpsc::channel::(100); @@ -65,7 +72,12 @@ impl AsyncFileWriterInner { unzip(file, &path)?; } - _ => extract_compressed_from_readable(ReadableRx::new(&mut rx), fmt, &path)?, + _ => extract_compressed_from_readable( + ReadableRx::new(&mut rx), + fmt, + &path, + desired_outputs.as_ref().map(|arr| &arr[..]), + )?, } Ok(()) @@ -141,8 +153,15 @@ impl AsyncFileWriterInner { pub struct AsyncFileWriter(ScopeGuard); impl AsyncFileWriter { - pub fn new(path: &Path, fmt: PkgFmt) -> Self { - let inner = AsyncFileWriterInner::new(path, fmt); + /// * `desired_outputs - If Some(_) and `fmt` is not `PkgFmt::Bin` or + /// `PkgFmt::Zip`, then it will filter the tar and only extract files + /// specified in it. + pub fn new( + path: &Path, + fmt: PkgFmt, + desired_outputs: Option<[Cow<'static, Path>; N]>, + ) -> Self { + let inner = AsyncFileWriterInner::new(path, fmt, desired_outputs); Self(guard(inner, AsyncFileWriterInner::abort)) } diff --git a/src/helpers/extracter.rs b/src/helpers/extracter.rs index d0e2ea6e..41797210 100644 --- a/src/helpers/extracter.rs +++ b/src/helpers/extracter.rs @@ -12,6 +12,8 @@ use zstd::stream::Decoder as ZstdDecoder; use crate::{BinstallError, PkgFmt}; +/// * `desired_outputs - If Some(_), then it will filter the tar +/// and only extract files specified in it. fn untar( dat: impl Read, path: &Path, @@ -40,37 +42,34 @@ fn untar( /// Extract files from the specified source onto the specified path. /// /// * `fmt` - must not be `PkgFmt::Bin` or `PkgFmt::Zip`. +/// * `desired_outputs - If Some(_), then it will filter the tar +/// and only extract files specified in it. pub(crate) fn extract_compressed_from_readable( dat: impl Read, fmt: PkgFmt, path: &Path, + desired_outputs: Option<&[Cow<'_, Path>]>, ) -> Result<(), BinstallError> { match fmt { PkgFmt::Tar => { // Extract to install dir debug!("Extracting from tar archive to `{path:?}`"); - let mut tar = Archive::new(dat); - - tar.unpack(path)?; + untar(dat, path, desired_outputs)? } PkgFmt::Tgz => { // Extract to install dir debug!("Decompressing from tgz archive to `{path:?}`"); let tar = GzDecoder::new(dat); - let mut tgz = Archive::new(tar); - - tgz.unpack(path)?; + untar(tar, path, desired_outputs)?; } PkgFmt::Txz => { // Extract to install dir debug!("Decompressing from txz archive to `{path:?}`"); let tar = XzDecoder::new(dat); - let mut txz = Archive::new(tar); - - txz.unpack(path)?; + untar(tar, path, desired_outputs)?; } PkgFmt::Tzstd => { // Extract to install dir @@ -81,9 +80,7 @@ pub(crate) fn extract_compressed_from_readable( // as &[] by ZstdDecoder::new, thus ZstdDecoder::new // should not return any error. let tar = ZstdDecoder::new(dat)?; - let mut txz = Archive::new(tar); - - txz.unpack(path)?; + untar(tar, path, desired_outputs)?; } PkgFmt::Zip => panic!("Unexpected PkgFmt::Zip!"), PkgFmt::Bin => panic!("Unexpected PkgFmt::Bin!"), From b6bfd40c3a682fec1c1ece3f51d492e34513ffe0 Mon Sep 17 00:00:00 2001 From: Jiahao XU Date: Thu, 9 Jun 2022 14:12:44 +1000 Subject: [PATCH 39/49] Use `download_and_extract` in `fetch_crate_cratesio` Signed-off-by: Jiahao XU --- src/drivers.rs | 18 ++++++++++-------- 1 file changed, 10 insertions(+), 8 deletions(-) diff --git a/src/drivers.rs b/src/drivers.rs index 49b615c1..3ddf654d 100644 --- a/src/drivers.rs +++ b/src/drivers.rs @@ -5,6 +5,7 @@ use std::time::Duration; use crates_io_api::AsyncClient; use log::debug; use semver::{Version, VersionReq}; +use url::Url; use crate::{helpers::*, BinstallError, PkgFmt}; @@ -50,7 +51,7 @@ fn find_version<'a, V: Iterator>( .ok_or(BinstallError::VersionMismatch { req: version_req }) } -/// Fetch a crate by name and version from crates.io +/// Fetch a crate Cargo.toml by name and version from crates.io pub async fn fetch_crate_cratesio( name: &str, version_req: &str, @@ -98,16 +99,17 @@ pub async fn fetch_crate_cratesio( // Download crate to temporary dir (crates.io or git?) let crate_url = format!("https://crates.io/{}", version.dl_path); - let tgz_path = temp_dir.join(format!("{name}.tgz")); - debug!("Fetching crate from: {crate_url}"); + debug!("Fetching crate from: {crate_url} and extracting Cargo.toml from it"); - // Download crate - download(&crate_url, &tgz_path).await?; + download_and_extract( + Url::parse(&crate_url)?, + PkgFmt::Tgz, + &temp_dir, + Some([Path::new("Cargo.toml").into()]), + ) + .await?; - // Decompress downloaded tgz - debug!("Decompressing crate archive"); - extract(&tgz_path, PkgFmt::Tgz, &temp_dir)?; let crate_path = temp_dir.join(format!("{name}-{version_name}")); // Return crate directory From c9b0d45a24c107d6bf2260b11c437d3fb241ae36 Mon Sep 17 00:00:00 2001 From: Jiahao XU Date: Thu, 9 Jun 2022 14:46:00 +1000 Subject: [PATCH 40/49] Use `download_and_extract` in fetchers to improve efficiency by avoiding disk io (except for `PkgFmt::Zip` and `PkgFmt::Bin`) and run the compresser in parallel to the downloader. Signed-off-by: Jiahao XU --- src/fetchers.rs | 4 ++-- src/fetchers/gh_crate_meta.rs | 6 +++--- src/fetchers/quickinstall.rs | 6 +++--- src/main.rs | 27 +++++++++++---------------- 4 files changed, 19 insertions(+), 24 deletions(-) diff --git a/src/fetchers.rs b/src/fetchers.rs index e0b95063..a39a0e9a 100644 --- a/src/fetchers.rs +++ b/src/fetchers.rs @@ -17,8 +17,8 @@ pub trait Fetcher: Send + Sync { where Self: Sized; - /// Fetch a package - async fn fetch(&self, dst: &Path) -> Result<(), BinstallError>; + /// Fetch a package and extract + async fn fetch_and_extract(&self, dst: &Path) -> Result<(), BinstallError>; /// Check if a package is available for download async fn check(&self) -> Result; diff --git a/src/fetchers/gh_crate_meta.rs b/src/fetchers/gh_crate_meta.rs index e38d7ae1..c6ed669b 100644 --- a/src/fetchers/gh_crate_meta.rs +++ b/src/fetchers/gh_crate_meta.rs @@ -7,7 +7,7 @@ use serde::Serialize; use url::Url; use super::Data; -use crate::{download, remote_exists, BinstallError, PkgFmt, Template}; +use crate::{download_and_extract, remote_exists, BinstallError, PkgFmt, Template}; pub struct GhCrateMeta { data: Data, @@ -40,10 +40,10 @@ impl super::Fetcher for GhCrateMeta { remote_exists(url, Method::HEAD).await } - async fn fetch(&self, dst: &Path) -> Result<(), BinstallError> { + async fn fetch_and_extract(&self, dst: &Path) -> Result<(), BinstallError> { let url = self.url()?; info!("Downloading package from: '{url}'"); - download(url.as_str(), dst).await + download_and_extract::<_, 0>(url, self.pkg_fmt(), dst, None).await } fn pkg_fmt(&self) -> PkgFmt { diff --git a/src/fetchers/quickinstall.rs b/src/fetchers/quickinstall.rs index ec83493c..b934302e 100644 --- a/src/fetchers/quickinstall.rs +++ b/src/fetchers/quickinstall.rs @@ -6,7 +6,7 @@ use reqwest::Method; use url::Url; use super::Data; -use crate::{download, remote_exists, BinstallError, PkgFmt}; +use crate::{download_and_extract, remote_exists, BinstallError, PkgFmt}; const BASE_URL: &str = "https://github.com/alsuren/cargo-quickinstall/releases/download"; const STATS_URL: &str = "https://warehouse-clerk-tmp.vercel.app/api/crate"; @@ -36,10 +36,10 @@ impl super::Fetcher for QuickInstall { remote_exists(Url::parse(&url)?, Method::HEAD).await } - async fn fetch(&self, dst: &Path) -> Result<(), BinstallError> { + async fn fetch_and_extract(&self, dst: &Path) -> Result<(), BinstallError> { let url = self.package_url(); info!("Downloading package from: '{url}'"); - download(&url, &dst).await + download_and_extract::<_, 0>(Url::parse(&url)?, self.pkg_fmt(), dst, None).await } fn pkg_fmt(&self) -> PkgFmt { diff --git a/src/main.rs b/src/main.rs index 9b3dafb0..58460ebb 100644 --- a/src/main.rs +++ b/src/main.rs @@ -361,11 +361,21 @@ async fn install_from_package( meta.bin_dir = "{ bin }{ binary-ext }".to_string(); } + let bin_path = temp_dir.path().join(format!("bin-{}", opts.name)); + debug!("Using temporary binary path: {}", bin_path.display()); + // Download package if opts.dry_run { info!("Dry run, not downloading package"); } else { - fetcher.fetch(&pkg_path).await?; + fetcher.fetch_and_extract(&bin_path).await?; + + if binaries.is_empty() { + error!("No binaries specified (or inferred from file system)"); + return Err(miette!( + "No binaries specified (or inferred from file system)" + )); + } } #[cfg(incomplete)] @@ -392,21 +402,6 @@ async fn install_from_package( } } - let bin_path = temp_dir.path().join(format!("bin-{}", opts.name)); - debug!("Using temporary binary path: {}", bin_path.display()); - - if !opts.dry_run { - // Extract files - extract(&pkg_path, fetcher.pkg_fmt(), &bin_path)?; - - if binaries.is_empty() { - error!("No binaries specified (or inferred from file system)"); - return Err(miette!( - "No binaries specified (or inferred from file system)" - )); - } - } - // List files to be installed // based on those found via Cargo.toml let bin_data = bins::Data { From 1879a719e4a2abb20ba913349e067cce7c6c0050 Mon Sep 17 00:00:00 2001 From: Jiahao XU Date: Thu, 9 Jun 2022 14:47:15 +1000 Subject: [PATCH 41/49] Rm fn `helpers::download` Signed-off-by: Jiahao XU --- src/helpers.rs | 6 ------ 1 file changed, 6 deletions(-) diff --git a/src/helpers.rs b/src/helpers.rs index 614819ab..b0451295 100644 --- a/src/helpers.rs +++ b/src/helpers.rs @@ -51,12 +51,6 @@ pub async fn remote_exists(url: Url, method: Method) -> Result>(url: &str, path: P) -> Result<(), BinstallError> { - let url = Url::parse(url)?; - download_and_extract::<_, 0>(url, PkgFmt::Bin, path.as_ref(), None).await -} - /// Download a file from the provided URL and extract it to the provided path /// /// * `desired_outputs - If Some(_) and `fmt` is not `PkgFmt::Bin` or From 728d1fd6dd4790eb0ed9e9068760fa9dc2f9a39d Mon Sep 17 00:00:00 2001 From: Jiahao XU Date: Thu, 9 Jun 2022 14:47:43 +1000 Subject: [PATCH 42/49] Rm unused param `pkg_path` in `install_from_package` Signed-off-by: Jiahao XU --- src/main.rs | 8 -------- 1 file changed, 8 deletions(-) diff --git a/src/main.rs b/src/main.rs index 58460ebb..55ce22bb 100644 --- a/src/main.rs +++ b/src/main.rs @@ -295,12 +295,6 @@ async fn entry() -> Result<()> { fetcher.source_name() ); - // Compute temporary directory for downloads - let pkg_path = temp_dir - .path() - .join(format!("pkg-{}.{}", opts.name, meta.pkg_fmt)); - debug!("Using temporary download path: {}", pkg_path.display()); - install_from_package( binaries, fetcher.as_ref(), @@ -308,7 +302,6 @@ async fn entry() -> Result<()> { meta, opts, package, - pkg_path, temp_dir, ) .await @@ -337,7 +330,6 @@ async fn install_from_package( mut meta: PkgMeta, opts: Options, package: Package, - pkg_path: PathBuf, temp_dir: TempDir, ) -> Result<()> { // Prompt user for third-party source From 5ba8b07bcba69cbb7285f4969cdb070872b9da3e Mon Sep 17 00:00:00 2001 From: Jiahao XU Date: Thu, 9 Jun 2022 14:52:14 +1000 Subject: [PATCH 43/49] Rm `helpers::extract` Signed-off-by: Jiahao XU --- src/helpers.rs | 79 -------------------------------------------------- 1 file changed, 79 deletions(-) diff --git a/src/helpers.rs b/src/helpers.rs index b0451295..6d84bb3c 100644 --- a/src/helpers.rs +++ b/src/helpers.rs @@ -1,22 +1,16 @@ use std::{ borrow::Cow, - fs, io::{stderr, stdin, Write}, path::{Path, PathBuf}, }; use cargo_toml::Manifest; -use flate2::read::GzDecoder; use futures_util::stream::StreamExt; use log::{debug, info}; use reqwest::Method; use serde::Serialize; -use tar::Archive; use tinytemplate::TinyTemplate; use url::Url; -use xz2::read::XzDecoder; -use zip::read::ZipArchive; -use zstd::stream::Decoder as ZstdDecoder; use crate::{BinstallError, Meta, PkgFmt}; @@ -90,79 +84,6 @@ pub async fn download_and_extract, const N: usize>( Ok(()) } -/// Extract files from the specified source onto the specified path -pub fn extract, P: AsRef>( - source: S, - fmt: PkgFmt, - path: P, -) -> Result<(), BinstallError> { - let source = source.as_ref(); - let path = path.as_ref(); - - match fmt { - PkgFmt::Tar => { - // Extract to install dir - debug!("Extracting from tar archive '{source:?}' to `{path:?}`"); - - let dat = fs::File::open(source)?; - let mut tar = Archive::new(dat); - - tar.unpack(path)?; - } - PkgFmt::Tgz => { - // Extract to install dir - debug!("Decompressing from tgz archive '{source:?}' to `{path:?}`"); - - let dat = fs::File::open(source)?; - let tar = GzDecoder::new(dat); - let mut tgz = Archive::new(tar); - - tgz.unpack(path)?; - } - PkgFmt::Txz => { - // Extract to install dir - debug!("Decompressing from txz archive '{source:?}' to `{path:?}`"); - - let dat = fs::File::open(source)?; - let tar = XzDecoder::new(dat); - let mut txz = Archive::new(tar); - - txz.unpack(path)?; - } - PkgFmt::Tzstd => { - // Extract to install dir - debug!("Decompressing from tzstd archive '{source:?}' to `{path:?}`"); - - let dat = std::fs::File::open(source)?; - - // The error can only come from raw::Decoder::with_dictionary - // as of zstd 0.10.2 and 0.11.2, which is specified - // as &[] by ZstdDecoder::new, thus ZstdDecoder::new - // should not return any error. - let tar = ZstdDecoder::new(dat)?; - let mut txz = Archive::new(tar); - - txz.unpack(path)?; - } - PkgFmt::Zip => { - // Extract to install dir - debug!("Decompressing from zip archive '{source:?}' to `{path:?}`"); - - let dat = fs::File::open(source)?; - let mut zip = ZipArchive::new(dat)?; - - zip.extract(path)?; - } - PkgFmt::Bin => { - debug!("Copying binary '{source:?}' to `{path:?}`"); - // Copy to install dir - fs::copy(source, path)?; - } - }; - - Ok(()) -} - /// Fetch install path from environment /// roughly follows pub fn get_install_path>(install_path: Option

) -> Option { From e62775a9ecb31fa8f7b8cfb687198864617ddc69 Mon Sep 17 00:00:00 2001 From: Jiahao XU Date: Thu, 9 Jun 2022 21:34:07 +1000 Subject: [PATCH 44/49] Add more doc for `AsyncFileWriter` Signed-off-by: Jiahao XU --- src/helpers/async_file_writer.rs | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/src/helpers/async_file_writer.rs b/src/helpers/async_file_writer.rs index 755cde72..53a61ceb 100644 --- a/src/helpers/async_file_writer.rs +++ b/src/helpers/async_file_writer.rs @@ -148,6 +148,11 @@ impl AsyncFileWriterInner { } } +/// AsyncFileWriter will pass the `Bytes` you give to another thread via +/// a `mpsc` and decompress and unpack it if needed. +/// +/// # Cancellation +/// /// AsyncFileWriter removes the file if `done` isn't called. #[derive(Debug)] pub struct AsyncFileWriter(ScopeGuard); From 945687c281616c1bc9216db0e370cdf08327c503 Mon Sep 17 00:00:00 2001 From: Jiahao XU Date: Thu, 9 Jun 2022 21:58:51 +1000 Subject: [PATCH 45/49] Rename `AsyncFileWriter` to `AsyncExtracter` Signed-off-by: Jiahao XU --- src/helpers.rs | 10 +++++----- .../{async_file_writer.rs => async_extracter.rs} | 16 ++++++++-------- src/helpers/readable_rx.rs | 2 +- 3 files changed, 14 insertions(+), 14 deletions(-) rename src/helpers/{async_file_writer.rs => async_extracter.rs} (92%) diff --git a/src/helpers.rs b/src/helpers.rs index 6d84bb3c..bc7ce7c8 100644 --- a/src/helpers.rs +++ b/src/helpers.rs @@ -14,8 +14,8 @@ use url::Url; use crate::{BinstallError, Meta, PkgFmt}; -mod async_file_writer; -pub use async_file_writer::AsyncFileWriter; +mod async_extracter; +pub use async_extracter::AsyncExtracter; mod auto_abort_join_handle; pub use auto_abort_join_handle::AutoAbortJoinHandle; @@ -71,13 +71,13 @@ pub async fn download_and_extract, const N: usize>( debug!("Downloading to file: '{}'", path.display()); let mut bytes_stream = resp.bytes_stream(); - let mut writer = AsyncFileWriter::new(path, fmt, desired_outputs); + let mut extracter = AsyncExtracter::new(path, fmt, desired_outputs); while let Some(res) = bytes_stream.next().await { - writer.write(res?).await?; + extracter.write(res?).await?; } - writer.done().await?; + extracter.done().await?; debug!("Download OK, written to file: '{}'", path.display()); diff --git a/src/helpers/async_file_writer.rs b/src/helpers/async_extracter.rs similarity index 92% rename from src/helpers/async_file_writer.rs rename to src/helpers/async_extracter.rs index 53a61ceb..ea581ebb 100644 --- a/src/helpers/async_file_writer.rs +++ b/src/helpers/async_extracter.rs @@ -20,14 +20,14 @@ pub(crate) enum Content { } #[derive(Debug)] -struct AsyncFileWriterInner { +struct AsyncExtracterInner { /// Use AutoAbortJoinHandle so that the task /// will be cancelled on failure. handle: AutoAbortJoinHandle>, tx: mpsc::Sender, } -impl AsyncFileWriterInner { +impl AsyncExtracterInner { /// * `desired_outputs - If Some(_), then it will filter the tar /// and only extract files specified in it. fn new( @@ -148,16 +148,16 @@ impl AsyncFileWriterInner { } } -/// AsyncFileWriter will pass the `Bytes` you give to another thread via +/// AsyncExtracter will pass the `Bytes` you give to another thread via /// a `mpsc` and decompress and unpack it if needed. /// /// # Cancellation /// -/// AsyncFileWriter removes the file if `done` isn't called. +/// AsyncExtracter removes the file if `done` isn't called. #[derive(Debug)] -pub struct AsyncFileWriter(ScopeGuard); +pub struct AsyncExtracter(ScopeGuard); -impl AsyncFileWriter { +impl AsyncExtracter { /// * `desired_outputs - If Some(_) and `fmt` is not `PkgFmt::Bin` or /// `PkgFmt::Zip`, then it will filter the tar and only extract files /// specified in it. @@ -166,8 +166,8 @@ impl AsyncFileWriter { fmt: PkgFmt, desired_outputs: Option<[Cow<'static, Path>; N]>, ) -> Self { - let inner = AsyncFileWriterInner::new(path, fmt, desired_outputs); - Self(guard(inner, AsyncFileWriterInner::abort)) + let inner = AsyncExtracterInner::new(path, fmt, desired_outputs); + Self(guard(inner, AsyncExtracterInner::abort)) } /// Upon error, this writer shall not be reused. diff --git a/src/helpers/readable_rx.rs b/src/helpers/readable_rx.rs index 8b387ab6..15aa1300 100644 --- a/src/helpers/readable_rx.rs +++ b/src/helpers/readable_rx.rs @@ -4,7 +4,7 @@ use std::io::{self, Read}; use bytes::{Buf, Bytes}; use tokio::sync::mpsc::Receiver; -use super::async_file_writer::Content; +use super::async_extracter::Content; #[derive(Debug)] pub(crate) struct ReadableRx<'a> { From 432376224f93ea89c254384975f7c4e63fdb2e0d Mon Sep 17 00:00:00 2001 From: Jiahao XU Date: Thu, 9 Jun 2022 22:01:51 +1000 Subject: [PATCH 46/49] Update doc of `AsyncExtracter` Signed-off-by: Jiahao XU --- src/helpers/async_extracter.rs | 33 ++++++++++++++++++++++++++++----- 1 file changed, 28 insertions(+), 5 deletions(-) diff --git a/src/helpers/async_extracter.rs b/src/helpers/async_extracter.rs index ea581ebb..8f0b2e5e 100644 --- a/src/helpers/async_extracter.rs +++ b/src/helpers/async_extracter.rs @@ -151,16 +151,39 @@ impl AsyncExtracterInner { /// AsyncExtracter will pass the `Bytes` you give to another thread via /// a `mpsc` and decompress and unpack it if needed. /// -/// # Cancellation +/// After all write is done, you must call `AsyncExtracter::done`, +/// otherwise the extracted content will be removed on drop. /// -/// AsyncExtracter removes the file if `done` isn't called. +/// # Advantages +/// +/// `download_and_extract` has the following advantages over downloading +/// plus extracting in on the same thread: +/// +/// - The code is pipelined instead of storing the downloaded file in memory +/// and extract it, except for `PkgFmt::Zip`, since `ZipArchiver::new` +/// requires `std::io::Seek`, so it fallbacks to writing the a file then +/// unzip it. +/// - The async part (downloading) and the extracting part runs in parallel +/// using `tokio::spawn_nonblocking`. +/// - Compressing/writing which takes a lot of CPU time will not block +/// the runtime anymore. +/// - For any PkgFmt except for `PkgFmt::Zip` and `PkgFmt::Bin` (basically +/// all `tar` based formats), it can extract only specified files. +/// This means that `super::drivers::fetch_crate_cratesio` no longer need to +/// extract the whole crate and write them to disk, it now only extract the +/// relevant part (`Cargo.toml`) out to disk and open it. #[derive(Debug)] pub struct AsyncExtracter(ScopeGuard); impl AsyncExtracter { - /// * `desired_outputs - If Some(_) and `fmt` is not `PkgFmt::Bin` or - /// `PkgFmt::Zip`, then it will filter the tar and only extract files - /// specified in it. + /// * `path` - If `fmt` is `PkgFmt::Bin`, then this is the filename + /// for the bin. + /// Otherwise, it is the directory where the extracted content will be put. + /// * `fmt` - The format of the archive to feed in. + /// * `desired_outputs - If Some(_), then it will filter the tar and + /// only extract files specified in it. + /// Note that it only works when `fmt` is not `PkgFmt::Bin` or + /// `PkgFmt::Zip`. pub fn new( path: &Path, fmt: PkgFmt, From 8a812c8d220c6ccbe324b12e98c221649083e7df Mon Sep 17 00:00:00 2001 From: Jiahao XU Date: Thu, 9 Jun 2022 22:09:46 +1000 Subject: [PATCH 47/49] Rename `AsyncExtracter::write` to `feed` Signed-off-by: Jiahao XU --- src/helpers.rs | 2 +- src/helpers/async_extracter.rs | 10 +++++----- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/src/helpers.rs b/src/helpers.rs index bc7ce7c8..600b5755 100644 --- a/src/helpers.rs +++ b/src/helpers.rs @@ -74,7 +74,7 @@ pub async fn download_and_extract, const N: usize>( let mut extracter = AsyncExtracter::new(path, fmt, desired_outputs); while let Some(res) = bytes_stream.next().await { - extracter.write(res?).await?; + extracter.feed(res?).await?; } extracter.done().await?; diff --git a/src/helpers/async_extracter.rs b/src/helpers/async_extracter.rs index 8f0b2e5e..066304c8 100644 --- a/src/helpers/async_extracter.rs +++ b/src/helpers/async_extracter.rs @@ -104,9 +104,9 @@ impl AsyncExtracterInner { Ok(()) } - /// Upon error, this writer shall not be reused. + /// Upon error, this extracter shall not be reused. /// Otherwise, `Self::done` would panic. - async fn write(&mut self, bytes: Bytes) -> Result<(), BinstallError> { + async fn feed(&mut self, bytes: Bytes) -> Result<(), BinstallError> { if self.tx.send(Content::Data(bytes)).await.is_err() { // task failed Err(Self::wait(&mut self.handle).await.expect_err( @@ -193,10 +193,10 @@ impl AsyncExtracter { Self(guard(inner, AsyncExtracterInner::abort)) } - /// Upon error, this writer shall not be reused. + /// Upon error, this extracter shall not be reused. /// Otherwise, `Self::done` would panic. - pub async fn write(&mut self, bytes: Bytes) -> Result<(), BinstallError> { - self.0.write(bytes).await + pub async fn feed(&mut self, bytes: Bytes) -> Result<(), BinstallError> { + self.0.feed(bytes).await } pub async fn done(self) -> Result<(), BinstallError> { From 00242a40c61bc6854c9501188dc06f4e96f0e429 Mon Sep 17 00:00:00 2001 From: Jiahao XU Date: Thu, 9 Jun 2022 22:22:39 +1000 Subject: [PATCH 48/49] Update doc for `AsyncExtracter::new` Signed-off-by: Jiahao XU --- src/helpers/async_extracter.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/helpers/async_extracter.rs b/src/helpers/async_extracter.rs index 066304c8..49594c3d 100644 --- a/src/helpers/async_extracter.rs +++ b/src/helpers/async_extracter.rs @@ -182,8 +182,8 @@ impl AsyncExtracter { /// * `fmt` - The format of the archive to feed in. /// * `desired_outputs - If Some(_), then it will filter the tar and /// only extract files specified in it. - /// Note that it only works when `fmt` is not `PkgFmt::Bin` or - /// `PkgFmt::Zip`. + /// Note that this is a best-effort and it only works when `fmt` + /// is not `PkgFmt::Bin` or `PkgFmt::Zip`. pub fn new( path: &Path, fmt: PkgFmt, From cadf045d0a12c493000b8d3f2a8c8ca326a1a00d Mon Sep 17 00:00:00 2001 From: Jiahao XU Date: Thu, 9 Jun 2022 22:28:48 +1000 Subject: [PATCH 49/49] Refactor: Abstract away `AsyncExtracter` by new fn `extract_archive_stream` Signed-off-by: Jiahao XU --- src/helpers.rs | 12 ++---------- src/helpers/async_extracter.rs | 35 ++++++++++++++++++++++++++++++---- 2 files changed, 33 insertions(+), 14 deletions(-) diff --git a/src/helpers.rs b/src/helpers.rs index 600b5755..da0a54c0 100644 --- a/src/helpers.rs +++ b/src/helpers.rs @@ -5,7 +5,6 @@ use std::{ }; use cargo_toml::Manifest; -use futures_util::stream::StreamExt; use log::{debug, info}; use reqwest::Method; use serde::Serialize; @@ -15,7 +14,7 @@ use url::Url; use crate::{BinstallError, Meta, PkgFmt}; mod async_extracter; -pub use async_extracter::AsyncExtracter; +pub use async_extracter::extract_archive_stream; mod auto_abort_join_handle; pub use auto_abort_join_handle::AutoAbortJoinHandle; @@ -70,14 +69,7 @@ pub async fn download_and_extract, const N: usize>( let path = path.as_ref(); debug!("Downloading to file: '{}'", path.display()); - let mut bytes_stream = resp.bytes_stream(); - let mut extracter = AsyncExtracter::new(path, fmt, desired_outputs); - - while let Some(res) = bytes_stream.next().await { - extracter.feed(res?).await?; - } - - extracter.done().await?; + extract_archive_stream(resp.bytes_stream(), path, fmt, desired_outputs).await?; debug!("Download OK, written to file: '{}'", path.display()); diff --git a/src/helpers/async_extracter.rs b/src/helpers/async_extracter.rs index 49594c3d..4eed3276 100644 --- a/src/helpers/async_extracter.rs +++ b/src/helpers/async_extracter.rs @@ -4,6 +4,7 @@ use std::io::{self, Seek, Write}; use std::path::Path; use bytes::Bytes; +use futures_util::stream::{Stream, StreamExt}; use scopeguard::{guard, Always, ScopeGuard}; use tempfile::tempfile; use tokio::{sync::mpsc, task::spawn_blocking}; @@ -173,7 +174,7 @@ impl AsyncExtracterInner { /// extract the whole crate and write them to disk, it now only extract the /// relevant part (`Cargo.toml`) out to disk and open it. #[derive(Debug)] -pub struct AsyncExtracter(ScopeGuard); +struct AsyncExtracter(ScopeGuard); impl AsyncExtracter { /// * `path` - If `fmt` is `PkgFmt::Bin`, then this is the filename @@ -184,7 +185,7 @@ impl AsyncExtracter { /// only extract files specified in it. /// Note that this is a best-effort and it only works when `fmt` /// is not `PkgFmt::Bin` or `PkgFmt::Zip`. - pub fn new( + fn new( path: &Path, fmt: PkgFmt, desired_outputs: Option<[Cow<'static, Path>; N]>, @@ -195,11 +196,37 @@ impl AsyncExtracter { /// Upon error, this extracter shall not be reused. /// Otherwise, `Self::done` would panic. - pub async fn feed(&mut self, bytes: Bytes) -> Result<(), BinstallError> { + async fn feed(&mut self, bytes: Bytes) -> Result<(), BinstallError> { self.0.feed(bytes).await } - pub async fn done(self) -> Result<(), BinstallError> { + async fn done(self) -> Result<(), BinstallError> { ScopeGuard::into_inner(self.0).done().await } } + +/// * `output` - If `fmt` is `PkgFmt::Bin`, then this is the filename +/// for the bin. +/// Otherwise, it is the directory where the extracted content will be put. +/// * `fmt` - The format of the archive to feed in. +/// * `desired_outputs - If Some(_), then it will filter the tar and +/// only extract files specified in it. +/// Note that this is a best-effort and it only works when `fmt` +/// is not `PkgFmt::Bin` or `PkgFmt::Zip`. +pub async fn extract_archive_stream( + mut stream: impl Stream> + Unpin, + output: &Path, + fmt: PkgFmt, + desired_outputs: Option<[Cow<'static, Path>; N]>, +) -> Result<(), BinstallError> +where + BinstallError: From, +{ + let mut extracter = AsyncExtracter::new(output, fmt, desired_outputs); + + while let Some(res) = stream.next().await { + extracter.feed(res?).await?; + } + + extracter.done().await +}