Support for any PkgFmt in AsyncFileWriter

Signed-off-by: Jiahao XU <Jiahao_XU@outlook.com>
This commit is contained in:
Jiahao XU 2022-06-09 01:20:37 +10:00
parent 58c775a648
commit c3b5cb11c2
No known key found for this signature in database
GPG key ID: 591C0B03040416D6
2 changed files with 50 additions and 24 deletions

View file

@ -68,7 +68,7 @@ pub async fn download<P: AsRef<Path>>(url: &str, path: P) -> Result<(), Binstall
debug!("Downloading to file: '{}'", path.display()); debug!("Downloading to file: '{}'", path.display());
let mut bytes_stream = resp.bytes_stream(); let mut bytes_stream = resp.bytes_stream();
let mut writer = AsyncFileWriter::new(path); let mut writer = AsyncFileWriter::new(path, PkgFmt::Bin);
while let Some(res) = bytes_stream.next().await { while let Some(res) = bytes_stream.next().await {
writer.write(res?).await?; writer.write(res?).await?;

View file

@ -1,12 +1,13 @@
use std::fs; use std::fs;
use std::io::{self, Write}; use std::io::{self, Seek, Write};
use std::path::Path; use std::path::Path;
use bytes::Bytes; use bytes::Bytes;
use scopeguard::{guard, Always, ScopeGuard}; use scopeguard::{guard, Always, ScopeGuard};
use tempfile::tempfile;
use tokio::{sync::mpsc, task::spawn_blocking}; use tokio::{sync::mpsc, task::spawn_blocking};
use super::AutoAbortJoinHandle; use super::{extracter::*, readable_rx::*, AutoAbortJoinHandle};
use crate::{BinstallError, PkgFmt}; use crate::{BinstallError, PkgFmt};
pub enum Content { pub enum Content {
@ -26,7 +27,7 @@ struct AsyncFileWriterInner {
} }
impl AsyncFileWriterInner { impl AsyncFileWriterInner {
fn new(path: &Path) -> Self { fn new(path: &Path, fmt: PkgFmt) -> Self {
let path = path.to_owned(); let path = path.to_owned();
let (tx, rx) = mpsc::channel::<Content>(100); let (tx, rx) = mpsc::channel::<Content>(100);
@ -37,35 +38,60 @@ impl AsyncFileWriterInner {
}); });
fs::create_dir_all(path.parent().unwrap())?; fs::create_dir_all(path.parent().unwrap())?;
let mut file = fs::File::create(&path)?;
// remove it unless the operation isn't aborted and no write match fmt {
// fails. PkgFmt::Bin => {
let remove_guard = guard(path, |path| { let mut file = fs::File::create(&path)?;
fs::remove_file(path).ok();
});
while let Some(content) = rx.blocking_recv() { // remove it unless the operation isn't aborted and no write
match content { // fails.
Content::Data(bytes) => file.write_all(&*bytes)?, let remove_guard = guard(&path, |path| {
Content::Abort => { fs::remove_file(path).ok();
return Err(io::Error::new(io::ErrorKind::Other, "Aborted").into()) });
}
Self::read_into_file(&mut file, &mut rx)?;
// Operation isn't aborted and all writes succeed,
// disarm the remove_guard.
ScopeGuard::into_inner(remove_guard);
} }
PkgFmt::Zip => {
let mut file = tempfile()?;
Self::read_into_file(&mut file, &mut rx)?;
// rewind it so that we can pass it to unzip
file.rewind()?;
unzip(file, &path)?;
}
_ => extract_compressed_from_readable(ReadableRx::new(&mut rx), fmt, &path)?,
} }
file.flush()?;
// Operation isn't aborted and all writes succeed,
// disarm the remove_guard.
ScopeGuard::into_inner(remove_guard);
Ok(()) Ok(())
})); }));
Self { handle, tx } Self { handle, tx }
} }
fn read_into_file(
file: &mut fs::File,
rx: &mut mpsc::Receiver<Content>,
) -> Result<(), BinstallError> {
while let Some(content) = rx.blocking_recv() {
match content {
Content::Data(bytes) => file.write_all(&*bytes)?,
Content::Abort => {
return Err(io::Error::new(io::ErrorKind::Other, "Aborted").into())
}
}
}
file.flush()?;
Ok(())
}
/// Upon error, this writer shall not be reused. /// Upon error, this writer shall not be reused.
/// Otherwise, `Self::done` would panic. /// Otherwise, `Self::done` would panic.
async fn write(&mut self, bytes: Bytes) -> Result<(), BinstallError> { async fn write(&mut self, bytes: Bytes) -> Result<(), BinstallError> {
@ -115,8 +141,8 @@ impl AsyncFileWriterInner {
pub struct AsyncFileWriter(ScopeGuard<AsyncFileWriterInner, fn(AsyncFileWriterInner), Always>); pub struct AsyncFileWriter(ScopeGuard<AsyncFileWriterInner, fn(AsyncFileWriterInner), Always>);
impl AsyncFileWriter { impl AsyncFileWriter {
pub fn new(path: &Path) -> Self { pub fn new(path: &Path, fmt: PkgFmt) -> Self {
let inner = AsyncFileWriterInner::new(path); let inner = AsyncFileWriterInner::new(path, fmt);
Self(guard(inner, AsyncFileWriterInner::abort)) Self(guard(inner, AsyncFileWriterInner::abort))
} }