diff --git a/src/helpers/async_extracter.rs b/src/helpers/async_extracter.rs index 334812aa..d37d1db0 100644 --- a/src/helpers/async_extracter.rs +++ b/src/helpers/async_extracter.rs @@ -16,7 +16,7 @@ use std::fmt::Debug; use std::fs; -use std::io::{self, Read, Seek, Write}; +use std::io::{self, copy, Read, Seek, Write}; use std::path::Path; use bytes::Bytes; @@ -27,10 +27,10 @@ use tar::Entries; use tempfile::tempfile; use tokio::{ sync::mpsc, - task::{spawn_blocking, JoinHandle}, + task::{block_in_place, spawn_blocking, JoinHandle}, }; -use super::{extracter::*, readable_rx::*}; +use super::{extracter::*, readable_rx::*, stream_readable::StreamReadable}; use crate::{BinstallError, TarBasedFmt}; pub(crate) enum Content { @@ -107,20 +107,15 @@ impl AsyncExtracterInner { } } -async fn extract_impl> + Unpin, E>( - mut stream: S, - f: Box) -> Result + Send>, -) -> Result +async fn extract_impl(stream: S, f: F) -> Result where + T: Debug + Send + 'static, + S: Stream> + Unpin, + F: FnOnce(StreamReadable) -> Result, BinstallError: From, { - let mut extracter = guard(AsyncExtracterInner::new(f), AsyncExtracterInner::abort); - - while let Some(res) = stream.next().await { - extracter.feed(res?).await?; - } - - ScopeGuard::into_inner(extracter).done().await + let readable = StreamReadable::new(stream).await; + block_in_place(move || f(readable)) } fn read_into_file( @@ -148,28 +143,25 @@ where { let path = output.to_owned(); - extract_impl( - stream, - Box::new(move |mut rx| { - fs::create_dir_all(path.parent().unwrap())?; + extract_impl(stream, move |mut reader| { + fs::create_dir_all(path.parent().unwrap())?; - let mut file = fs::File::create(&path)?; + let mut file = fs::File::create(&path)?; - // remove it unless the operation isn't aborted and no write - // fails. - let remove_guard = guard(&path, |path| { - fs::remove_file(path).ok(); - }); + // remove it unless the operation isn't aborted and no write + // fails. + let remove_guard = guard(&path, |path| { + fs::remove_file(path).ok(); + }); - read_into_file(&mut file, &mut rx)?; + copy(&mut reader, &mut file)?; - // Operation isn't aborted and all writes succeed, - // disarm the remove_guard. - ScopeGuard::into_inner(remove_guard); + // Operation isn't aborted and all writes succeed, + // disarm the remove_guard. + ScopeGuard::into_inner(remove_guard); - Ok(()) - }), - ) + Ok(()) + }) .await } @@ -182,26 +174,23 @@ where { let path = output.to_owned(); - extract_impl( - stream, - Box::new(move |mut rx| { - fs::create_dir_all(path.parent().unwrap())?; + extract_impl(stream, move |mut reader| { + fs::create_dir_all(path.parent().unwrap())?; - let mut file = tempfile()?; + let mut file = tempfile()?; - read_into_file(&mut file, &mut rx)?; + copy(&mut reader, &mut file)?; - // rewind it so that we can pass it to unzip - file.rewind()?; + // rewind it so that we can pass it to unzip + file.rewind()?; - unzip(file, &path) - }), - ) + unzip(file, &path) + }) .await } pub async fn extract_tar_based_stream( - stream: impl Stream> + Unpin, + stream: impl Stream> + Unpin + 'static, output: &Path, fmt: TarBasedFmt, ) -> Result<(), BinstallError> @@ -210,17 +199,15 @@ where { let path = output.to_owned(); - extract_impl( - stream, - Box::new(move |rx| { - fs::create_dir_all(path.parent().unwrap())?; + extract_impl(stream, move |reader| { + fs::create_dir_all(path.parent().unwrap())?; - debug!("Extracting from {fmt} archive to {path:#?}"); - create_tar_decoder(ReadableRx::new(rx), fmt)?.unpack(path)?; + debug!("Extracting from {fmt} archive to {path:#?}"); - Ok(()) - }), - ) + create_tar_decoder(reader, fmt)?.unpack(path)?; + + Ok(()) + }) .await } @@ -237,22 +224,19 @@ impl TarEntriesVisitor for &mut V { } pub async fn extract_tar_based_stream_and_visit( - stream: impl Stream> + Unpin, + stream: impl Stream> + Unpin + 'static, fmt: TarBasedFmt, mut visitor: V, ) -> Result where BinstallError: From, { - extract_impl( - stream, - Box::new(move |rx| { - debug!("Extracting from {fmt} archive to process it in memory"); + extract_impl(stream, move |reader| { + debug!("Extracting from {fmt} archive to process it in memory"); - let mut tar = create_tar_decoder(ReadableRx::new(rx), fmt)?; - visitor.visit(tar.entries()?)?; - Ok(visitor) - }), - ) + let mut tar = create_tar_decoder(reader, fmt)?; + visitor.visit(tar.entries()?)?; + Ok(visitor) + }) .await }