diff --git a/src/helpers/async_extracter.rs b/src/helpers/async_extracter.rs index 4eed3276..a6a139b9 100644 --- a/src/helpers/async_extracter.rs +++ b/src/helpers/async_extracter.rs @@ -7,9 +7,12 @@ use bytes::Bytes; use futures_util::stream::{Stream, StreamExt}; use scopeguard::{guard, Always, ScopeGuard}; use tempfile::tempfile; -use tokio::{sync::mpsc, task::spawn_blocking}; +use tokio::{ + sync::mpsc, + task::{spawn_blocking, JoinHandle}, +}; -use super::{extracter::*, readable_rx::*, AutoAbortJoinHandle}; +use super::{extracter::*, readable_rx::*}; use crate::{BinstallError, PkgFmt}; pub(crate) enum Content { @@ -24,7 +27,7 @@ pub(crate) enum Content { struct AsyncExtracterInner { /// Use AutoAbortJoinHandle so that the task /// will be cancelled on failure. - handle: AutoAbortJoinHandle>, + handle: JoinHandle>, tx: mpsc::Sender, } @@ -37,14 +40,9 @@ impl AsyncExtracterInner { desired_outputs: Option<[Cow<'static, Path>; N]>, ) -> Self { let path = path.to_owned(); - let (tx, rx) = mpsc::channel::(100); - - let handle = AutoAbortJoinHandle::new(spawn_blocking(move || { - // close rx on error so that tx.send will return an error - let mut rx = guard(rx, |mut rx| { - rx.close(); - }); + let (tx, mut rx) = mpsc::channel::(100); + let handle = spawn_blocking(move || { fs::create_dir_all(path.parent().unwrap())?; match fmt { @@ -82,7 +80,7 @@ impl AsyncExtracterInner { } Ok(()) - })); + }); Self { handle, tx } } @@ -126,9 +124,7 @@ impl AsyncExtracterInner { Self::wait(&mut self.handle).await } - async fn wait( - handle: &mut AutoAbortJoinHandle>, - ) -> Result<(), BinstallError> { + async fn wait(handle: &mut JoinHandle>) -> Result<(), BinstallError> { match handle.await { Ok(res) => res, Err(join_err) => Err(io::Error::new(io::ErrorKind::Other, join_err).into()),