From 1c3a67210846c9cc17a3493661a113778f8f19ab Mon Sep 17 00:00:00 2001 From: Jiahao XU Date: Fri, 10 Jun 2022 13:52:11 +1000 Subject: [PATCH 1/2] Rm `scopeguard` creatd for `mpsc::Receiver` Since `Receiver` always closes on drop, there is no need to call `Receiver::close`, which is there so that you can close the mpsc channel without dropping `Receiver`. Signed-off-by: Jiahao XU --- src/helpers/async_extracter.rs | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/src/helpers/async_extracter.rs b/src/helpers/async_extracter.rs index 4eed3276..a35dec91 100644 --- a/src/helpers/async_extracter.rs +++ b/src/helpers/async_extracter.rs @@ -37,14 +37,9 @@ impl AsyncExtracterInner { desired_outputs: Option<[Cow<'static, Path>; N]>, ) -> Self { let path = path.to_owned(); - let (tx, rx) = mpsc::channel::(100); + let (tx, mut rx) = mpsc::channel::(100); let handle = AutoAbortJoinHandle::new(spawn_blocking(move || { - // close rx on error so that tx.send will return an error - let mut rx = guard(rx, |mut rx| { - rx.close(); - }); - fs::create_dir_all(path.parent().unwrap())?; match fmt { From 32ad530329e57e71fa86be2387cd39938aec1f0b Mon Sep 17 00:00:00 2001 From: Jiahao XU Date: Fri, 10 Jun 2022 13:54:43 +1000 Subject: [PATCH 2/2] Rm use of `AutoAbortJoinHandle` in `AsyncExtracter` Since there is no way to abort a blocking thread, using `AutoAbortJoinHandle` does not add any value. Signed-off-by: Jiahao XU --- src/helpers/async_extracter.rs | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/src/helpers/async_extracter.rs b/src/helpers/async_extracter.rs index a35dec91..a6a139b9 100644 --- a/src/helpers/async_extracter.rs +++ b/src/helpers/async_extracter.rs @@ -7,9 +7,12 @@ use bytes::Bytes; use futures_util::stream::{Stream, StreamExt}; use scopeguard::{guard, Always, ScopeGuard}; use tempfile::tempfile; -use tokio::{sync::mpsc, task::spawn_blocking}; +use tokio::{ + sync::mpsc, + task::{spawn_blocking, JoinHandle}, +}; -use super::{extracter::*, readable_rx::*, AutoAbortJoinHandle}; +use super::{extracter::*, readable_rx::*}; use crate::{BinstallError, PkgFmt}; pub(crate) enum Content { @@ -24,7 +27,7 @@ pub(crate) enum Content { struct AsyncExtracterInner { /// Use AutoAbortJoinHandle so that the task /// will be cancelled on failure. - handle: AutoAbortJoinHandle>, + handle: JoinHandle>, tx: mpsc::Sender, } @@ -39,7 +42,7 @@ impl AsyncExtracterInner { let path = path.to_owned(); let (tx, mut rx) = mpsc::channel::(100); - let handle = AutoAbortJoinHandle::new(spawn_blocking(move || { + let handle = spawn_blocking(move || { fs::create_dir_all(path.parent().unwrap())?; match fmt { @@ -77,7 +80,7 @@ impl AsyncExtracterInner { } Ok(()) - })); + }); Self { handle, tx } } @@ -121,9 +124,7 @@ impl AsyncExtracterInner { Self::wait(&mut self.handle).await } - async fn wait( - handle: &mut AutoAbortJoinHandle>, - ) -> Result<(), BinstallError> { + async fn wait(handle: &mut JoinHandle>) -> Result<(), BinstallError> { match handle.await { Ok(res) => res, Err(join_err) => Err(io::Error::new(io::ErrorKind::Other, join_err).into()),