Optimization: Rewrite Download::and_visit_tar to use tokio-tar (#587)

* Add new dep tokio-tar v0.3.0 to binstalk-downloader
* Add new dep tokio-util v0.7.4 with feat io to binstalk-downloader
* Add dep async-trait v0.1.59 to binstalk-downloader
* Add new dep async-compression v0.3.15 to binstalk-downloader
   with features "gzip", "zstd", "xz", "bzip2", "tokio".
* Rewrite `Download::and_visit_tar` to use `tokio-tar`
   to avoid the cumbersome `block_in_place`.
* Apply temporary workaround: Rm use of let-else in mod visitor

Signed-off-by: Jiahao XU <Jiahao_XU@outlook.com>
This commit is contained in:
Jiahao XU 2022-12-04 12:31:34 +11:00 committed by GitHub
parent 39f175be04
commit 23a5937aff
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
6 changed files with 218 additions and 68 deletions

View file

@ -10,6 +10,8 @@ edition = "2021"
license = "GPL-3.0"
[dependencies]
async-trait = "0.1.59"
async-compression = { version = "0.3.15", features = ["gzip", "zstd", "xz", "bzip2", "tokio"] }
binstalk-types = { version = "0.1.0", path = "../binstalk-types" }
bytes = "1.3.0"
bzip2 = "0.4.3"
@ -28,6 +30,8 @@ tar = { package = "binstall-tar", version = "0.4.39" }
tempfile = "3.3.0"
thiserror = "1.0.37"
tokio = { version = "1.22.0", features = ["macros", "rt-multi-thread", "sync", "time"], default-features = false }
tokio-tar = "0.3.0"
tokio-util = { version = "0.7.4", features = ["io"] }
tower = { version = "0.4.13", features = ["limit", "util"] }
tracing = "0.1.37"
trust-dns-resolver = { version = "0.22.0", optional = true, default-features = false, features = ["dnssec-ring"] }

View file

@ -2,19 +2,21 @@ use std::{fmt::Debug, future::Future, io, marker::PhantomData, path::Path, pin::
use binstalk_types::cargo_toml_binstall::{PkgFmtDecomposed, TarBasedFmt};
use digest::{Digest, FixedOutput, HashMarker, Output, OutputSizeUser, Update};
use futures_util::stream::StreamExt;
use thiserror::Error as ThisError;
use tracing::{debug, instrument};
pub use binstalk_types::cargo_toml_binstall::PkgFmt;
pub use tar::Entries;
pub use zip::result::ZipError;
use crate::remote::{Client, Error as RemoteError, Url};
mod async_extracter;
pub use async_extracter::TarEntriesVisitor;
use async_extracter::*;
mod async_tar_visitor;
pub use async_tar_visitor::*;
mod extracter;
mod stream_readable;
@ -92,6 +94,9 @@ impl Download {
///
/// `cancellation_future` can be used to cancel the extraction and return
/// [`DownloadError::UserAbort`] error.
///
/// NOTE that this API does not support gnu extension sparse file unlike
/// [`Download::and_extract`].
#[instrument(skip(visitor, cancellation_future))]
pub async fn and_visit_tar<V: TarEntriesVisitor + Debug + Send + 'static>(
self,
@ -99,12 +104,24 @@ impl Download {
visitor: V,
cancellation_future: CancellationFuture,
) -> Result<V::Target, DownloadError> {
let stream = self.client.get_stream(self.url).await?;
let stream = self
.client
.get_stream(self.url)
.await?
.map(|res| res.map_err(DownloadError::from));
debug!("Downloading and extracting then in-memory processing");
let ret =
extract_tar_based_stream_and_visit(stream, fmt, visitor, cancellation_future).await?;
let ret = if let Some(cancellation_future) = cancellation_future {
tokio::select! {
res = extract_tar_based_stream_and_visit(stream, fmt, visitor) => res?,
res = cancellation_future => {
Err(res.err().unwrap_or_else(|| io::Error::from(DownloadError::UserAbort)))?
}
}
} else {
extract_tar_based_stream_and_visit(stream, fmt, visitor).await?
};
debug!("Download, extraction and in-memory procession OK");

View file

@ -1,14 +1,8 @@
use std::{
fmt::Debug,
fs,
io::{Read, Seek},
path::Path,
};
use std::{fs, io::Seek, path::Path};
use bytes::Bytes;
use futures_util::stream::Stream;
use scopeguard::{guard, ScopeGuard};
use tar::Entries;
use tempfile::tempfile;
use tokio::task::block_in_place;
use tracing::debug;
@ -93,33 +87,3 @@ where
Ok(())
})
}
/// Visitor must iterate over all entries.
/// Entires can be in arbitary order.
pub trait TarEntriesVisitor {
type Target;
fn visit<R: Read>(&mut self, entries: Entries<'_, R>) -> Result<(), DownloadError>;
fn finish(self) -> Result<Self::Target, DownloadError>;
}
pub async fn extract_tar_based_stream_and_visit<S, V, E>(
stream: S,
fmt: TarBasedFmt,
mut visitor: V,
cancellation_future: CancellationFuture,
) -> Result<V::Target, DownloadError>
where
S: Stream<Item = Result<Bytes, E>> + Unpin + 'static,
V: TarEntriesVisitor + Debug + Send + 'static,
DownloadError: From<E>,
{
let reader = StreamReadable::new(stream, cancellation_future).await;
block_in_place(move || {
debug!("Extracting from {fmt} archive to process it in memory");
let mut tar = create_tar_decoder(reader, fmt)?;
visitor.visit(tar.entries()?)?;
visitor.finish()
})
}

View file

@ -0,0 +1,129 @@
use std::{borrow::Cow, fmt::Debug, io, path::Path, pin::Pin};
use async_compression::tokio::bufread;
use bytes::Bytes;
use futures_util::stream::{Stream, StreamExt};
use tokio::io::{copy, sink, AsyncRead};
use tokio_tar::{Archive, Entry, EntryType};
use tokio_util::io::StreamReader;
use tracing::debug;
use super::{
DownloadError,
TarBasedFmt::{self, *},
};
pub trait TarEntry: AsyncRead + Send + Sync + Unpin + Debug {
/// Returns the path name for this entry.
///
/// This method may fail if the pathname is not valid Unicode and
/// this is called on a Windows platform.
///
/// Note that this function will convert any `\` characters to
/// directory separators.
fn path(&self) -> io::Result<Cow<'_, Path>>;
fn size(&self) -> io::Result<u64>;
fn entry_type(&self) -> TarEntryType;
}
impl<T: TarEntry + ?Sized> TarEntry for &mut T {
fn path(&self) -> io::Result<Cow<'_, Path>> {
T::path(self)
}
fn size(&self) -> io::Result<u64> {
T::size(self)
}
fn entry_type(&self) -> TarEntryType {
T::entry_type(self)
}
}
impl<R: AsyncRead + Unpin + Send + Sync> TarEntry for Entry<R> {
fn path(&self) -> io::Result<Cow<'_, Path>> {
Entry::path(self)
}
fn size(&self) -> io::Result<u64> {
self.header().size()
}
fn entry_type(&self) -> TarEntryType {
match self.header().entry_type() {
EntryType::Regular => TarEntryType::Regular,
EntryType::Link => TarEntryType::Link,
EntryType::Symlink => TarEntryType::Symlink,
EntryType::Char => TarEntryType::Char,
EntryType::Block => TarEntryType::Block,
EntryType::Directory => TarEntryType::Directory,
EntryType::Fifo => TarEntryType::Fifo,
// Implementation-defined high-performance type, treated as regular file
EntryType::Continuous => TarEntryType::Regular,
_ => TarEntryType::Unknown,
}
}
}
#[derive(Copy, Clone, Debug)]
#[non_exhaustive]
pub enum TarEntryType {
Regular,
Link,
Symlink,
Char,
Block,
Directory,
Fifo,
Unknown,
}
/// Visitor must iterate over all entries.
/// Entires can be in arbitary order.
#[async_trait::async_trait]
pub trait TarEntriesVisitor: Send + Sync {
type Target;
/// Will be called once per entry
async fn visit(&mut self, entry: &mut dyn TarEntry) -> Result<(), DownloadError>;
fn finish(self) -> Result<Self::Target, DownloadError>;
}
pub async fn extract_tar_based_stream_and_visit<S, V>(
stream: S,
fmt: TarBasedFmt,
mut visitor: V,
) -> Result<V::Target, DownloadError>
where
S: Stream<Item = Result<Bytes, DownloadError>> + Send + Sync,
V: TarEntriesVisitor,
{
debug!("Extracting from {fmt} archive to process it in memory");
let reader = StreamReader::new(stream);
let decoder: Pin<Box<dyn AsyncRead + Send + Sync>> = match fmt {
Tar => Box::pin(reader),
Tbz2 => Box::pin(bufread::BzDecoder::new(reader)),
Tgz => Box::pin(bufread::GzipDecoder::new(reader)),
Txz => Box::pin(bufread::XzDecoder::new(reader)),
Tzstd => Box::pin(bufread::ZstdDecoder::new(reader)),
};
let mut tar = Archive::new(decoder);
let mut entries = tar.entries()?;
let mut sink = sink();
while let Some(res) = entries.next().await {
let mut entry = res?;
visitor.visit(&mut entry).await?;
// Consume all remaining data so that next iteration would work fine
// instead of reading the data of prevoius entry.
copy(&mut entry, &mut sink).await?;
}
visitor.finish()
}