diff --git a/Cargo.lock b/Cargo.lock index 908aa1f6..fb3ba359 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -59,11 +59,15 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "942c7cd7ae39e91bde4820d74132e9862e62c2f386c3aa90ccf55949f5bad63a" dependencies = [ "brotli", + "bzip2", "flate2", "futures-core", "memchr", "pin-project-lite", "tokio", + "xz2", + "zstd 0.11.2+zstd.1.5.2", + "zstd-safe 5.0.2+zstd.1.5.2", ] [[package]] @@ -150,6 +154,8 @@ dependencies = [ name = "binstalk-downloader" version = "0.1.0" dependencies = [ + "async-compression", + "async-trait", "binstalk-types", "binstall-tar", "bytes", @@ -164,6 +170,8 @@ dependencies = [ "tempfile", "thiserror", "tokio", + "tokio-tar", + "tokio-util", "tower", "tracing", "trust-dns-resolver", @@ -2245,6 +2253,32 @@ dependencies = [ "webpki", ] +[[package]] +name = "tokio-stream" +version = "0.1.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d660770404473ccd7bc9f8b28494a811bc18542b915c0855c51e8f419d5223ce" +dependencies = [ + "futures-core", + "pin-project-lite", + "tokio", +] + +[[package]] +name = "tokio-tar" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a50188549787c32c1c3d9c8c71ad7e003ccf2f102489c5a96e385c84760477f4" +dependencies = [ + "filetime", + "futures-core", + "libc", + "redox_syscall", + "tokio", + "tokio-stream", + "xattr", +] + [[package]] name = "tokio-util" version = "0.7.4" diff --git a/crates/binstalk-downloader/Cargo.toml b/crates/binstalk-downloader/Cargo.toml index c081ccd9..211d9ce4 100644 --- a/crates/binstalk-downloader/Cargo.toml +++ b/crates/binstalk-downloader/Cargo.toml @@ -10,6 +10,8 @@ edition = "2021" license = "GPL-3.0" [dependencies] +async-trait = "0.1.59" +async-compression = { version = "0.3.15", features = ["gzip", "zstd", "xz", "bzip2", "tokio"] } binstalk-types = { version = "0.1.0", path = "../binstalk-types" } bytes = "1.3.0" bzip2 = "0.4.3" @@ -28,6 +30,8 @@ tar = { package = "binstall-tar", version = "0.4.39" } tempfile = "3.3.0" thiserror = "1.0.37" tokio = { version = "1.22.0", features = ["macros", "rt-multi-thread", "sync", "time"], default-features = false } +tokio-tar = "0.3.0" +tokio-util = { version = "0.7.4", features = ["io"] } tower = { version = "0.4.13", features = ["limit", "util"] } tracing = "0.1.37" trust-dns-resolver = { version = "0.22.0", optional = true, default-features = false, features = ["dnssec-ring"] } diff --git a/crates/binstalk-downloader/src/download.rs b/crates/binstalk-downloader/src/download.rs index 7cac3ff6..ac9eacb6 100644 --- a/crates/binstalk-downloader/src/download.rs +++ b/crates/binstalk-downloader/src/download.rs @@ -2,19 +2,21 @@ use std::{fmt::Debug, future::Future, io, marker::PhantomData, path::Path, pin:: use binstalk_types::cargo_toml_binstall::{PkgFmtDecomposed, TarBasedFmt}; use digest::{Digest, FixedOutput, HashMarker, Output, OutputSizeUser, Update}; +use futures_util::stream::StreamExt; use thiserror::Error as ThisError; use tracing::{debug, instrument}; pub use binstalk_types::cargo_toml_binstall::PkgFmt; -pub use tar::Entries; pub use zip::result::ZipError; use crate::remote::{Client, Error as RemoteError, Url}; mod async_extracter; -pub use async_extracter::TarEntriesVisitor; use async_extracter::*; +mod async_tar_visitor; +pub use async_tar_visitor::*; + mod extracter; mod stream_readable; @@ -92,6 +94,9 @@ impl Download { /// /// `cancellation_future` can be used to cancel the extraction and return /// [`DownloadError::UserAbort`] error. + /// + /// NOTE that this API does not support gnu extension sparse file unlike + /// [`Download::and_extract`]. #[instrument(skip(visitor, cancellation_future))] pub async fn and_visit_tar( self, @@ -99,12 +104,24 @@ impl Download { visitor: V, cancellation_future: CancellationFuture, ) -> Result { - let stream = self.client.get_stream(self.url).await?; + let stream = self + .client + .get_stream(self.url) + .await? + .map(|res| res.map_err(DownloadError::from)); debug!("Downloading and extracting then in-memory processing"); - let ret = - extract_tar_based_stream_and_visit(stream, fmt, visitor, cancellation_future).await?; + let ret = if let Some(cancellation_future) = cancellation_future { + tokio::select! { + res = extract_tar_based_stream_and_visit(stream, fmt, visitor) => res?, + res = cancellation_future => { + Err(res.err().unwrap_or_else(|| io::Error::from(DownloadError::UserAbort)))? + } + } + } else { + extract_tar_based_stream_and_visit(stream, fmt, visitor).await? + }; debug!("Download, extraction and in-memory procession OK"); diff --git a/crates/binstalk-downloader/src/download/async_extracter.rs b/crates/binstalk-downloader/src/download/async_extracter.rs index b36deed9..bf7fd9e2 100644 --- a/crates/binstalk-downloader/src/download/async_extracter.rs +++ b/crates/binstalk-downloader/src/download/async_extracter.rs @@ -1,14 +1,8 @@ -use std::{ - fmt::Debug, - fs, - io::{Read, Seek}, - path::Path, -}; +use std::{fs, io::Seek, path::Path}; use bytes::Bytes; use futures_util::stream::Stream; use scopeguard::{guard, ScopeGuard}; -use tar::Entries; use tempfile::tempfile; use tokio::task::block_in_place; use tracing::debug; @@ -93,33 +87,3 @@ where Ok(()) }) } - -/// Visitor must iterate over all entries. -/// Entires can be in arbitary order. -pub trait TarEntriesVisitor { - type Target; - - fn visit(&mut self, entries: Entries<'_, R>) -> Result<(), DownloadError>; - fn finish(self) -> Result; -} - -pub async fn extract_tar_based_stream_and_visit( - stream: S, - fmt: TarBasedFmt, - mut visitor: V, - cancellation_future: CancellationFuture, -) -> Result -where - S: Stream> + Unpin + 'static, - V: TarEntriesVisitor + Debug + Send + 'static, - DownloadError: From, -{ - let reader = StreamReadable::new(stream, cancellation_future).await; - block_in_place(move || { - debug!("Extracting from {fmt} archive to process it in memory"); - - let mut tar = create_tar_decoder(reader, fmt)?; - visitor.visit(tar.entries()?)?; - visitor.finish() - }) -} diff --git a/crates/binstalk-downloader/src/download/async_tar_visitor.rs b/crates/binstalk-downloader/src/download/async_tar_visitor.rs new file mode 100644 index 00000000..d33ba2ef --- /dev/null +++ b/crates/binstalk-downloader/src/download/async_tar_visitor.rs @@ -0,0 +1,129 @@ +use std::{borrow::Cow, fmt::Debug, io, path::Path, pin::Pin}; + +use async_compression::tokio::bufread; +use bytes::Bytes; +use futures_util::stream::{Stream, StreamExt}; +use tokio::io::{copy, sink, AsyncRead}; +use tokio_tar::{Archive, Entry, EntryType}; +use tokio_util::io::StreamReader; +use tracing::debug; + +use super::{ + DownloadError, + TarBasedFmt::{self, *}, +}; + +pub trait TarEntry: AsyncRead + Send + Sync + Unpin + Debug { + /// Returns the path name for this entry. + /// + /// This method may fail if the pathname is not valid Unicode and + /// this is called on a Windows platform. + /// + /// Note that this function will convert any `\` characters to + /// directory separators. + fn path(&self) -> io::Result>; + + fn size(&self) -> io::Result; + + fn entry_type(&self) -> TarEntryType; +} + +impl TarEntry for &mut T { + fn path(&self) -> io::Result> { + T::path(self) + } + + fn size(&self) -> io::Result { + T::size(self) + } + + fn entry_type(&self) -> TarEntryType { + T::entry_type(self) + } +} + +impl TarEntry for Entry { + fn path(&self) -> io::Result> { + Entry::path(self) + } + + fn size(&self) -> io::Result { + self.header().size() + } + + fn entry_type(&self) -> TarEntryType { + match self.header().entry_type() { + EntryType::Regular => TarEntryType::Regular, + EntryType::Link => TarEntryType::Link, + EntryType::Symlink => TarEntryType::Symlink, + EntryType::Char => TarEntryType::Char, + EntryType::Block => TarEntryType::Block, + EntryType::Directory => TarEntryType::Directory, + EntryType::Fifo => TarEntryType::Fifo, + // Implementation-defined ‘high-performance’ type, treated as regular file + EntryType::Continuous => TarEntryType::Regular, + _ => TarEntryType::Unknown, + } + } +} + +#[derive(Copy, Clone, Debug)] +#[non_exhaustive] +pub enum TarEntryType { + Regular, + Link, + Symlink, + Char, + Block, + Directory, + Fifo, + Unknown, +} + +/// Visitor must iterate over all entries. +/// Entires can be in arbitary order. +#[async_trait::async_trait] +pub trait TarEntriesVisitor: Send + Sync { + type Target; + + /// Will be called once per entry + async fn visit(&mut self, entry: &mut dyn TarEntry) -> Result<(), DownloadError>; + fn finish(self) -> Result; +} + +pub async fn extract_tar_based_stream_and_visit( + stream: S, + fmt: TarBasedFmt, + mut visitor: V, +) -> Result +where + S: Stream> + Send + Sync, + V: TarEntriesVisitor, +{ + debug!("Extracting from {fmt} archive to process it in memory"); + + let reader = StreamReader::new(stream); + let decoder: Pin> = match fmt { + Tar => Box::pin(reader), + Tbz2 => Box::pin(bufread::BzDecoder::new(reader)), + Tgz => Box::pin(bufread::GzipDecoder::new(reader)), + Txz => Box::pin(bufread::XzDecoder::new(reader)), + Tzstd => Box::pin(bufread::ZstdDecoder::new(reader)), + }; + + let mut tar = Archive::new(decoder); + let mut entries = tar.entries()?; + + let mut sink = sink(); + + while let Some(res) = entries.next().await { + let mut entry = res?; + visitor.visit(&mut entry).await?; + + // Consume all remaining data so that next iteration would work fine + // instead of reading the data of prevoius entry. + copy(&mut entry, &mut sink).await?; + } + + visitor.finish() +} diff --git a/crates/binstalk/src/drivers/crates_io/visitor.rs b/crates/binstalk/src/drivers/crates_io/visitor.rs index b182659d..26366759 100644 --- a/crates/binstalk/src/drivers/crates_io/visitor.rs +++ b/crates/binstalk/src/drivers/crates_io/visitor.rs @@ -1,16 +1,17 @@ use std::{ - io::{self, Read}, + io, path::{Path, PathBuf}, }; use cargo_toml::Manifest; use normalize_path::NormalizePath; +use tokio::io::AsyncReadExt; use tracing::debug; use super::vfs::Vfs; use crate::{ errors::BinstallError, - helpers::download::{DownloadError, Entries, TarEntriesVisitor}, + helpers::download::{DownloadError, TarEntriesVisitor, TarEntry}, manifests::cargo_toml_binstall::Meta, }; @@ -34,36 +35,37 @@ impl ManifestVisitor { } } +#[async_trait::async_trait] impl TarEntriesVisitor for ManifestVisitor { type Target = Manifest; - fn visit(&mut self, entries: Entries<'_, R>) -> Result<(), DownloadError> { - for res in entries { - let mut entry = res?; - let path = entry.path()?; - let path = path.normalize(); + async fn visit(&mut self, entry: &mut dyn TarEntry) -> Result<(), DownloadError> { + let path = entry.path()?; + let path = path.normalize(); - let Ok(path) = path.strip_prefix(&self.manifest_dir_path) - else { - // The path is outside of the curr dir (manifest dir), - // ignore it. - continue; - }; + let path = if let Ok(path) = path.strip_prefix(&self.manifest_dir_path) { + path + } else { + // The path is outside of the curr dir (manifest dir), + // ignore it. + return Ok(()); + }; - if path == Path::new("Cargo.toml") - || path == Path::new("src/main.rs") - || path.starts_with("src/bin") - { - self.vfs.add_path(path); - } + if path == Path::new("Cargo.toml") + || path == Path::new("src/main.rs") + || path.starts_with("src/bin") + { + self.vfs.add_path(path); + } - if path == Path::new("Cargo.toml") { - // Since it is possible for the same Cargo.toml to appear - // multiple times using `tar --keep-old-files`, here we - // clear the buffer first before reading into it. - self.cargo_toml_content.clear(); - entry.read_to_end(&mut self.cargo_toml_content)?; - } + if path == Path::new("Cargo.toml") { + // Since it is possible for the same Cargo.toml to appear + // multiple times using `tar --keep-old-files`, here we + // clear the buffer first before reading into it. + self.cargo_toml_content.clear(); + self.cargo_toml_content + .reserve_exact(entry.size()?.try_into().unwrap_or(usize::MAX)); + entry.read_to_end(&mut self.cargo_toml_content).await?; } Ok(())