Merge pull request #172 from NobodyXu/refactor/AsyncExtracter

This commit is contained in:
Félix Saparelli 2022-06-10 18:26:06 +12:00 committed by GitHub
commit 1b39a7c86e
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23

View file

@ -7,9 +7,12 @@ use bytes::Bytes;
use futures_util::stream::{Stream, StreamExt}; use futures_util::stream::{Stream, StreamExt};
use scopeguard::{guard, Always, ScopeGuard}; use scopeguard::{guard, Always, ScopeGuard};
use tempfile::tempfile; use tempfile::tempfile;
use tokio::{sync::mpsc, task::spawn_blocking}; use tokio::{
sync::mpsc,
task::{spawn_blocking, JoinHandle},
};
use super::{extracter::*, readable_rx::*, AutoAbortJoinHandle}; use super::{extracter::*, readable_rx::*};
use crate::{BinstallError, PkgFmt}; use crate::{BinstallError, PkgFmt};
pub(crate) enum Content { pub(crate) enum Content {
@ -24,7 +27,7 @@ pub(crate) enum Content {
struct AsyncExtracterInner { struct AsyncExtracterInner {
/// Use AutoAbortJoinHandle so that the task /// Use AutoAbortJoinHandle so that the task
/// will be cancelled on failure. /// will be cancelled on failure.
handle: AutoAbortJoinHandle<Result<(), BinstallError>>, handle: JoinHandle<Result<(), BinstallError>>,
tx: mpsc::Sender<Content>, tx: mpsc::Sender<Content>,
} }
@ -37,14 +40,9 @@ impl AsyncExtracterInner {
desired_outputs: Option<[Cow<'static, Path>; N]>, desired_outputs: Option<[Cow<'static, Path>; N]>,
) -> Self { ) -> Self {
let path = path.to_owned(); let path = path.to_owned();
let (tx, rx) = mpsc::channel::<Content>(100); let (tx, mut rx) = mpsc::channel::<Content>(100);
let handle = AutoAbortJoinHandle::new(spawn_blocking(move || {
// close rx on error so that tx.send will return an error
let mut rx = guard(rx, |mut rx| {
rx.close();
});
let handle = spawn_blocking(move || {
fs::create_dir_all(path.parent().unwrap())?; fs::create_dir_all(path.parent().unwrap())?;
match fmt { match fmt {
@ -82,7 +80,7 @@ impl AsyncExtracterInner {
} }
Ok(()) Ok(())
})); });
Self { handle, tx } Self { handle, tx }
} }
@ -126,9 +124,7 @@ impl AsyncExtracterInner {
Self::wait(&mut self.handle).await Self::wait(&mut self.handle).await
} }
async fn wait( async fn wait(handle: &mut JoinHandle<Result<(), BinstallError>>) -> Result<(), BinstallError> {
handle: &mut AutoAbortJoinHandle<Result<(), BinstallError>>,
) -> Result<(), BinstallError> {
match handle.await { match handle.await {
Ok(res) => res, Ok(res) => res,
Err(join_err) => Err(io::Error::new(io::ErrorKind::Other, join_err).into()), Err(join_err) => Err(io::Error::new(io::ErrorKind::Other, join_err).into()),