From c3b5cb11c292d12ea714131077e6b6d9b9032949 Mon Sep 17 00:00:00 2001 From: Jiahao XU Date: Thu, 9 Jun 2022 01:20:37 +1000 Subject: [PATCH] Support for any `PkgFmt` in `AsyncFileWriter` Signed-off-by: Jiahao XU --- src/helpers.rs | 2 +- src/helpers/async_file_writer.rs | 72 ++++++++++++++++++++++---------- 2 files changed, 50 insertions(+), 24 deletions(-) diff --git a/src/helpers.rs b/src/helpers.rs index 9e981189..8473cfe3 100644 --- a/src/helpers.rs +++ b/src/helpers.rs @@ -68,7 +68,7 @@ pub async fn download>(url: &str, path: P) -> Result<(), Binstall debug!("Downloading to file: '{}'", path.display()); let mut bytes_stream = resp.bytes_stream(); - let mut writer = AsyncFileWriter::new(path); + let mut writer = AsyncFileWriter::new(path, PkgFmt::Bin); while let Some(res) = bytes_stream.next().await { writer.write(res?).await?; diff --git a/src/helpers/async_file_writer.rs b/src/helpers/async_file_writer.rs index ad549558..eacb20bf 100644 --- a/src/helpers/async_file_writer.rs +++ b/src/helpers/async_file_writer.rs @@ -1,12 +1,13 @@ use std::fs; -use std::io::{self, Write}; +use std::io::{self, Seek, Write}; use std::path::Path; use bytes::Bytes; use scopeguard::{guard, Always, ScopeGuard}; +use tempfile::tempfile; use tokio::{sync::mpsc, task::spawn_blocking}; -use super::AutoAbortJoinHandle; +use super::{extracter::*, readable_rx::*, AutoAbortJoinHandle}; use crate::{BinstallError, PkgFmt}; pub enum Content { @@ -26,7 +27,7 @@ struct AsyncFileWriterInner { } impl AsyncFileWriterInner { - fn new(path: &Path) -> Self { + fn new(path: &Path, fmt: PkgFmt) -> Self { let path = path.to_owned(); let (tx, rx) = mpsc::channel::(100); @@ -37,35 +38,60 @@ impl AsyncFileWriterInner { }); fs::create_dir_all(path.parent().unwrap())?; - let mut file = fs::File::create(&path)?; - // remove it unless the operation isn't aborted and no write - // fails. - let remove_guard = guard(path, |path| { - fs::remove_file(path).ok(); - }); + match fmt { + PkgFmt::Bin => { + let mut file = fs::File::create(&path)?; - while let Some(content) = rx.blocking_recv() { - match content { - Content::Data(bytes) => file.write_all(&*bytes)?, - Content::Abort => { - return Err(io::Error::new(io::ErrorKind::Other, "Aborted").into()) - } + // remove it unless the operation isn't aborted and no write + // fails. + let remove_guard = guard(&path, |path| { + fs::remove_file(path).ok(); + }); + + Self::read_into_file(&mut file, &mut rx)?; + + // Operation isn't aborted and all writes succeed, + // disarm the remove_guard. + ScopeGuard::into_inner(remove_guard); } + PkgFmt::Zip => { + let mut file = tempfile()?; + + Self::read_into_file(&mut file, &mut rx)?; + + // rewind it so that we can pass it to unzip + file.rewind()?; + + unzip(file, &path)?; + } + _ => extract_compressed_from_readable(ReadableRx::new(&mut rx), fmt, &path)?, } - file.flush()?; - - // Operation isn't aborted and all writes succeed, - // disarm the remove_guard. - ScopeGuard::into_inner(remove_guard); - Ok(()) })); Self { handle, tx } } + fn read_into_file( + file: &mut fs::File, + rx: &mut mpsc::Receiver, + ) -> Result<(), BinstallError> { + while let Some(content) = rx.blocking_recv() { + match content { + Content::Data(bytes) => file.write_all(&*bytes)?, + Content::Abort => { + return Err(io::Error::new(io::ErrorKind::Other, "Aborted").into()) + } + } + } + + file.flush()?; + + Ok(()) + } + /// Upon error, this writer shall not be reused. /// Otherwise, `Self::done` would panic. async fn write(&mut self, bytes: Bytes) -> Result<(), BinstallError> { @@ -115,8 +141,8 @@ impl AsyncFileWriterInner { pub struct AsyncFileWriter(ScopeGuard); impl AsyncFileWriter { - pub fn new(path: &Path) -> Self { - let inner = AsyncFileWriterInner::new(path); + pub fn new(path: &Path, fmt: PkgFmt) -> Self { + let inner = AsyncFileWriterInner::new(path, fmt); Self(guard(inner, AsyncFileWriterInner::abort)) }