From 7b52eaad5b3be1b143fe686bf4f3190fed73183c Mon Sep 17 00:00:00 2001 From: Jiahao XU Date: Sat, 11 Jun 2022 20:10:46 +1000 Subject: [PATCH] Rewrite `AsyncExtracter`: Extract fmt logic as callback fn Signed-off-by: Jiahao XU --- src/helpers/async_extracter.rs | 232 ++++++++++++++++++--------------- src/helpers/extracter.rs | 4 +- 2 files changed, 128 insertions(+), 108 deletions(-) diff --git a/src/helpers/async_extracter.rs b/src/helpers/async_extracter.rs index 7d0b4542..26274d65 100644 --- a/src/helpers/async_extracter.rs +++ b/src/helpers/async_extracter.rs @@ -1,3 +1,4 @@ +use std::fmt::Debug; use std::fs; use std::io::{self, Seek, Write}; use std::path::Path; @@ -22,28 +23,47 @@ pub(crate) enum Content { Abort, } +/// AsyncExtracter will pass the `Bytes` you give to another thread via +/// a `mpsc` and decompress and unpack it if needed. +/// +/// After all write is done, you must call `AsyncExtracter::done`, +/// otherwise the extracted content will be removed on drop. +/// +/// # Advantages +/// +/// `download_and_extract` has the following advantages over downloading +/// plus extracting in on the same thread: +/// +/// - The code is pipelined instead of storing the downloaded file in memory +/// and extract it, except for `PkgFmt::Zip`, since `ZipArchiver::new` +/// requires `std::io::Seek`, so it fallbacks to writing the a file then +/// unzip it. +/// - The async part (downloading) and the extracting part runs in parallel +/// using `tokio::spawn_nonblocking`. +/// - Compressing/writing which takes a lot of CPU time will not block +/// the runtime anymore. +/// - For any PkgFmt except for `PkgFmt::Zip` and `PkgFmt::Bin` (basically +/// all `tar` based formats), it can extract only specified files. +/// This means that `super::drivers::fetch_crate_cratesio` no longer need to +/// extract the whole crate and write them to disk, it now only extract the +/// relevant part (`Cargo.toml`) out to disk and open it. #[derive(Debug)] -struct AsyncExtracterInner { +struct AsyncExtracterInner { /// Use AutoAbortJoinHandle so that the task /// will be cancelled on failure. - handle: JoinHandle>, + handle: JoinHandle>, tx: mpsc::Sender, } -impl AsyncExtracterInner { - /// * `filter` - If Some, then it will pass the path of the file to it - /// and only extract ones which filter returns `true`. - /// Note that this is a best-effort and it only works when `fmt` - /// is not `PkgFmt::Bin` or `PkgFmt::Zip`. - fn new bool + Send + 'static>( - path: &Path, - fmt: PkgFmt, - filter: Option, +impl AsyncExtracterInner { + fn new) -> Result + Send + 'static>( + f: F, ) -> Self { - let path = path.to_owned(); - let (tx, mut rx) = mpsc::channel::(100); + let (tx, rx) = mpsc::channel::(100); let handle = spawn_blocking(move || { + f(rx) + /* fs::create_dir_all(path.parent().unwrap())?; match fmt { @@ -78,29 +98,12 @@ impl AsyncExtracterInner { } Ok(()) + */ }); Self { handle, tx } } - fn read_into_file( - file: &mut fs::File, - rx: &mut mpsc::Receiver, - ) -> Result<(), BinstallError> { - while let Some(content) = rx.blocking_recv() { - match content { - Content::Data(bytes) => file.write_all(&*bytes)?, - Content::Abort => { - return Err(io::Error::new(io::ErrorKind::Other, "Aborted").into()) - } - } - } - - file.flush()?; - - Ok(()) - } - /// Upon error, this extracter shall not be reused. /// Otherwise, `Self::done` would panic. async fn feed(&mut self, bytes: Bytes) -> Result<(), BinstallError> { @@ -114,7 +117,7 @@ impl AsyncExtracterInner { } } - async fn done(mut self) -> Result<(), BinstallError> { + async fn done(mut self) -> Result { // Drop tx as soon as possible so that the task would wrap up what it // was doing and flush out all the pending data. drop(self.tx); @@ -122,7 +125,7 @@ impl AsyncExtracterInner { Self::wait(&mut self.handle).await } - async fn wait(handle: &mut JoinHandle>) -> Result<(), BinstallError> { + async fn wait(handle: &mut JoinHandle>) -> Result { match handle.await { Ok(res) => res, Err(join_err) => Err(io::Error::new(io::ErrorKind::Other, join_err).into()), @@ -143,92 +146,98 @@ impl AsyncExtracterInner { } } -/// AsyncExtracter will pass the `Bytes` you give to another thread via -/// a `mpsc` and decompress and unpack it if needed. -/// -/// After all write is done, you must call `AsyncExtracter::done`, -/// otherwise the extracted content will be removed on drop. -/// -/// # Advantages -/// -/// `download_and_extract` has the following advantages over downloading -/// plus extracting in on the same thread: -/// -/// - The code is pipelined instead of storing the downloaded file in memory -/// and extract it, except for `PkgFmt::Zip`, since `ZipArchiver::new` -/// requires `std::io::Seek`, so it fallbacks to writing the a file then -/// unzip it. -/// - The async part (downloading) and the extracting part runs in parallel -/// using `tokio::spawn_nonblocking`. -/// - Compressing/writing which takes a lot of CPU time will not block -/// the runtime anymore. -/// - For any PkgFmt except for `PkgFmt::Zip` and `PkgFmt::Bin` (basically -/// all `tar` based formats), it can extract only specified files. -/// This means that `super::drivers::fetch_crate_cratesio` no longer need to -/// extract the whole crate and write them to disk, it now only extract the -/// relevant part (`Cargo.toml`) out to disk and open it. -#[derive(Debug)] -struct AsyncExtracter(ScopeGuard); +async fn extract_impl< + F: FnOnce(mpsc::Receiver) -> Result + Send + 'static, + T: Debug + Send + 'static, + S: Stream> + Unpin, + E, +>( + mut stream: S, + f: F, +) -> Result +where + BinstallError: From, +{ + let mut extracter = guard(AsyncExtracterInner::new(f), AsyncExtracterInner::abort); -impl AsyncExtracter { - /// * `path` - If `fmt` is `PkgFmt::Bin`, then this is the filename - /// for the bin. - /// Otherwise, it is the directory where the extracted content will be put. - /// * `fmt` - The format of the archive to feed in. - /// * `filter` - If Some, then it will pass the path of the file to it - /// and only extract ones which filter returns `true`. - /// Note that this is a best-effort and it only works when `fmt` - /// is not `PkgFmt::Bin` or `PkgFmt::Zip`. - fn new bool + Send + 'static>( - path: &Path, - fmt: PkgFmt, - filter: Option, - ) -> Self { - let inner = AsyncExtracterInner::new(path, fmt, filter); - Self(guard(inner, AsyncExtracterInner::abort)) + while let Some(res) = stream.next().await { + extracter.feed(res?).await?; } - /// Upon error, this extracter shall not be reused. - /// Otherwise, `Self::done` would panic. - async fn feed(&mut self, bytes: Bytes) -> Result<(), BinstallError> { - self.0.feed(bytes).await + ScopeGuard::into_inner(extracter).done().await +} + +fn read_into_file( + file: &mut fs::File, + rx: &mut mpsc::Receiver, +) -> Result<(), BinstallError> { + while let Some(content) = rx.blocking_recv() { + match content { + Content::Data(bytes) => file.write_all(&*bytes)?, + Content::Abort => return Err(io::Error::new(io::ErrorKind::Other, "Aborted").into()), + } } - async fn done(self) -> Result<(), BinstallError> { - ScopeGuard::into_inner(self.0).done().await - } + file.flush()?; + + Ok(()) } pub async fn extract_bin( - mut stream: impl Stream> + Unpin, + stream: impl Stream> + Unpin, output: &Path, ) -> Result<(), BinstallError> where BinstallError: From, { - let mut extracter = AsyncExtracter::new:: bool>(output, PkgFmt::Bin, None); + let path = output.to_owned(); - while let Some(res) = stream.next().await { - extracter.feed(res?).await?; - } + extract_impl(stream, move |mut rx| { + fs::create_dir_all(path.parent().unwrap())?; - extracter.done().await + let mut file = fs::File::create(&path)?; + + // remove it unless the operation isn't aborted and no write + // fails. + let remove_guard = guard(&path, |path| { + fs::remove_file(path).ok(); + }); + + read_into_file(&mut file, &mut rx)?; + + // Operation isn't aborted and all writes succeed, + // disarm the remove_guard. + ScopeGuard::into_inner(remove_guard); + + Ok(()) + }) + .await } pub async fn extract_zip( - mut stream: impl Stream> + Unpin, + stream: impl Stream> + Unpin, output: &Path, ) -> Result<(), BinstallError> where BinstallError: From, { - let mut extracter = AsyncExtracter::new:: bool>(output, PkgFmt::Zip, None); + let path = output.to_owned(); - while let Some(res) = stream.next().await { - extracter.feed(res?).await?; - } + extract_impl(stream, move |mut rx| { + fs::create_dir_all(path.parent().unwrap())?; - extracter.done().await + let mut file = tempfile()?; + + read_into_file(&mut file, &mut rx)?; + + // rewind it so that we can pass it to unzip + file.rewind()?; + + unzip(file, &path)?; + + Ok(()) + }) + .await } /// * `filter` - If Some, then it will pass the path of the file to it @@ -237,7 +246,7 @@ pub async fn extract_tar_based_stream_with_filter< Filter: FnMut(&Path) -> bool + Send + 'static, E, >( - mut stream: impl Stream> + Unpin, + stream: impl Stream> + Unpin, output: &Path, fmt: TarBasedFmt, filter: Option, @@ -245,28 +254,39 @@ pub async fn extract_tar_based_stream_with_filter< where BinstallError: From, { - let mut extracter = AsyncExtracter::new(output, fmt.into(), filter); + let path = output.to_owned(); - while let Some(res) = stream.next().await { - extracter.feed(res?).await?; - } + extract_impl(stream, move |mut rx| { + fs::create_dir_all(path.parent().unwrap())?; - extracter.done().await + extract_compressed_from_readable(ReadableRx::new(&mut rx), fmt.into(), &path, filter)?; + + Ok(()) + }) + .await } pub async fn extract_tar_based_stream( - mut stream: impl Stream> + Unpin, + stream: impl Stream> + Unpin, output: &Path, fmt: TarBasedFmt, ) -> Result<(), BinstallError> where BinstallError: From, { - let mut extracter = AsyncExtracter::new:: bool>(output, fmt.into(), None); + let path = output.to_owned(); - while let Some(res) = stream.next().await { - extracter.feed(res?).await?; - } + extract_impl(stream, move |mut rx| { + fs::create_dir_all(path.parent().unwrap())?; - extracter.done().await + extract_compressed_from_readable:: bool, _>( + ReadableRx::new(&mut rx), + fmt.into(), + &path, + None, + )?; + + Ok(()) + }) + .await } diff --git a/src/helpers/extracter.rs b/src/helpers/extracter.rs index 42426693..a393c935 100644 --- a/src/helpers/extracter.rs +++ b/src/helpers/extracter.rs @@ -56,8 +56,8 @@ fn untar bool>( /// and only extract ones which filter returns `true`. /// Note that this is a best-effort and it only works when `fmt` /// is not `PkgFmt::Bin` or `PkgFmt::Zip`. -pub(crate) fn extract_compressed_from_readable bool>( - dat: impl BufRead, +pub(crate) fn extract_compressed_from_readable bool, R: BufRead>( + dat: R, fmt: PkgFmt, path: &Path, filter: Option,