From 911c52d8e1b56f64eab019558771ad36753913ab Mon Sep 17 00:00:00 2001 From: Jiahao XU Date: Wed, 8 Jun 2022 22:27:19 +1000 Subject: [PATCH] Auto remove file in `AsyncFileWriter` unless done is called. Also moves creation of the dir/file into the blocking thread to avoid blocking. Signed-off-by: Jiahao XU --- src/helpers.rs | 7 --- src/helpers/async_file_writer.rs | 84 ++++++++++++++++++++++++++------ 2 files changed, 70 insertions(+), 21 deletions(-) diff --git a/src/helpers.rs b/src/helpers.rs index fd014acb..f60df2d8 100644 --- a/src/helpers.rs +++ b/src/helpers.rs @@ -9,7 +9,6 @@ use flate2::read::GzDecoder; use futures_util::stream::StreamExt; use log::{debug, info}; use reqwest::Method; -use scopeguard::ScopeGuard; use serde::Serialize; use tar::Archive; use tinytemplate::TinyTemplate; @@ -68,17 +67,11 @@ pub async fn download>(url: &str, path: P) -> Result<(), Binstall let mut bytes_stream = resp.bytes_stream(); let mut writer = AsyncFileWriter::new(path)?; - let guard = scopeguard::guard(path, |path| { - fs::remove_file(path).ok(); - }); - while let Some(res) = bytes_stream.next().await { writer.write(res?).await?; } writer.done().await?; - // Disarm as it is successfully downloaded and written to file. - ScopeGuard::into_inner(guard); debug!("Download OK, written to file: '{}'", path.display()); diff --git a/src/helpers/async_file_writer.rs b/src/helpers/async_file_writer.rs index 21a577c7..6405c777 100644 --- a/src/helpers/async_file_writer.rs +++ b/src/helpers/async_file_writer.rs @@ -3,37 +3,60 @@ use std::io::{self, Write}; use std::path::Path; use bytes::Bytes; +use scopeguard::{guard, Always, ScopeGuard}; use tokio::{sync::mpsc, task::spawn_blocking}; use super::AutoAbortJoinHandle; +enum Content { + /// Data to write to file + Data(Bytes), + + /// Abort the writing and remove the file. + Abort, +} + #[derive(Debug)] -pub struct AsyncFileWriter { +struct AsyncFileWriterInner { /// Use AutoAbortJoinHandle so that the task /// will be cancelled on failure. handle: AutoAbortJoinHandle>, - tx: mpsc::Sender, + tx: mpsc::Sender, } -impl AsyncFileWriter { - pub fn new(path: &Path) -> io::Result { - fs::create_dir_all(path.parent().unwrap())?; - - let mut file = fs::File::create(path)?; - let (tx, rx) = mpsc::channel::(100); +impl AsyncFileWriterInner { + fn new(path: &Path) -> io::Result { + let path = path.to_owned(); + let (tx, rx) = mpsc::channel::(100); let handle = AutoAbortJoinHandle::new(spawn_blocking(move || { // close rx on error so that tx.send will return an error - let mut rx = scopeguard::guard(rx, |mut rx| { + let mut rx = guard(rx, |mut rx| { rx.close(); }); - while let Some(bytes) = rx.blocking_recv() { - file.write_all(&*bytes)?; + fs::create_dir_all(path.parent().unwrap())?; + let mut file = fs::File::create(&path)?; + + // remove it unless the operation isn't aborted and no write + // fails. + let remove_guard = guard(path, |path| { + fs::remove_file(path).ok(); + }); + + while let Some(content) = rx.blocking_recv() { + match content { + Content::Data(bytes) => file.write_all(&*bytes)?, + Content::Abort => return Err(io::Error::new(io::ErrorKind::Other, "Aborted")), + } } file.flush()?; + // Operation isn't aborted and all writes succeed, + // disarm the remove_guard. + ScopeGuard::into_inner(remove_guard); + Ok(()) })); @@ -42,8 +65,8 @@ impl AsyncFileWriter { /// Upon error, this writer shall not be reused. /// Otherwise, `Self::done` would panic. - pub async fn write(&mut self, bytes: Bytes) -> io::Result<()> { - if self.tx.send(bytes).await.is_err() { + async fn write(&mut self, bytes: Bytes) -> io::Result<()> { + if self.tx.send(Content::Data(bytes)).await.is_err() { // task failed Err(Self::wait(&mut self.handle).await.expect_err( "Implementation bug: write task finished successfully before all writes are done", @@ -53,7 +76,7 @@ impl AsyncFileWriter { } } - pub async fn done(mut self) -> io::Result<()> { + async fn done(mut self) -> io::Result<()> { // Drop tx as soon as possible so that the task would wrap up what it // was doing and flush out all the pending data. drop(self.tx); @@ -67,4 +90,37 @@ impl AsyncFileWriter { Err(join_err) => Err(io::Error::new(io::ErrorKind::Other, join_err)), } } + + fn abort(self) { + let tx = self.tx; + // If Self::write fail, then the task is already tear down, + // tx closed and no need to abort. + if !tx.is_closed() { + // Use send here because blocking_send would panic if used + // in async threads. + tokio::spawn(async move { + tx.send(Content::Abort).await.ok(); + }); + } + } +} + +/// AsyncFileWriter removes the file if `done` isn't called. +#[derive(Debug)] +pub struct AsyncFileWriter(ScopeGuard); + +impl AsyncFileWriter { + pub fn new(path: &Path) -> io::Result { + AsyncFileWriterInner::new(path).map(|inner| Self(guard(inner, AsyncFileWriterInner::abort))) + } + + /// Upon error, this writer shall not be reused. + /// Otherwise, `Self::done` would panic. + pub async fn write(&mut self, bytes: Bytes) -> io::Result<()> { + self.0.write(bytes).await + } + + pub async fn done(self) -> io::Result<()> { + ScopeGuard::into_inner(self.0).done().await + } }