Return a list of files written to disk in binstalk_downloader::download::Download::and_extract (#856)

to avoid collecting extracted files from disk again in resolution stage.

Signed-off-by: Jiahao XU <Jiahao_XU@outlook.com>
This commit is contained in:
Jiahao XU 2023-03-03 23:31:27 +11:00 committed by GitHub
parent 44ac63ce0d
commit 9c7da6a179
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
10 changed files with 366 additions and 84 deletions

View file

@ -19,6 +19,9 @@ pub use async_tar_visitor::{TarEntriesVisitor, TarEntry, TarEntryType};
mod extracter;
mod extracted_files;
pub use extracted_files::{ExtractedFiles, ExtractedFilesEntry};
mod zip_extraction;
pub use zip_extraction::ZipError;
@ -90,9 +93,6 @@ impl Download {
/// This does not support verifying a checksum due to the partial extraction
/// and will ignore one if specified.
///
/// `cancellation_future` can be used to cancel the extraction and return
/// [`DownloadError::UserAbort`] error.
///
/// NOTE that this API does not support gnu extension sparse file unlike
/// [`Download::and_extract`].
#[instrument(skip(visitor))]
@ -118,15 +118,18 @@ impl Download {
/// Download a file from the provided URL and extract it to the provided path.
///
/// `cancellation_future` can be used to cancel the extraction and return
/// [`DownloadError::UserAbort`] error.
/// NOTE that this would only extract directory and regular files.
#[instrument(skip(path))]
pub async fn and_extract(
self,
fmt: PkgFmt,
path: impl AsRef<Path>,
) -> Result<(), DownloadError> {
async fn inner(this: Download, fmt: PkgFmt, path: &Path) -> Result<(), DownloadError> {
) -> Result<ExtractedFiles, DownloadError> {
async fn inner(
this: Download,
fmt: PkgFmt,
path: &Path,
) -> Result<ExtractedFiles, DownloadError> {
let stream = this
.client
.get_stream(this.url)
@ -135,15 +138,15 @@ impl Download {
debug!("Downloading and extracting to: '{}'", path.display());
match fmt.decompose() {
let extracted_files = match fmt.decompose() {
PkgFmtDecomposed::Tar(fmt) => extract_tar_based_stream(stream, path, fmt).await?,
PkgFmtDecomposed::Bin => extract_bin(stream, path).await?,
PkgFmtDecomposed::Zip => extract_zip(stream, path).await?,
}
};
debug!("Download OK, extracted to: '{}'", path.display());
Ok(())
Ok(extracted_files)
}
inner(self, fmt, path.as_ref()).await
@ -179,3 +182,99 @@ impl Update for NoDigest {
}
impl HashMarker for NoDigest {}
#[cfg(test)]
mod test {
use super::*;
use std::{
collections::{HashMap, HashSet},
ffi::OsStr,
};
use tempfile::tempdir;
#[tokio::test]
async fn test_and_extract() {
let client = crate::remote::Client::new(
concat!(env!("CARGO_PKG_NAME"), "/", env!("CARGO_PKG_VERSION")),
None,
std::time::Duration::from_millis(10),
1.try_into().unwrap(),
[],
)
.unwrap();
let cargo_binstall_url = "https://github.com/cargo-bins/cargo-binstall/releases/download/v0.20.1/cargo-binstall-aarch64-unknown-linux-musl.tgz";
let extracted_files =
Download::new(client.clone(), Url::parse(cargo_binstall_url).unwrap())
.and_extract(PkgFmt::Tgz, tempdir().unwrap())
.await
.unwrap();
assert!(extracted_files.has_file(Path::new("cargo-binstall")));
assert!(!extracted_files.has_file(Path::new("1234")));
let files = HashSet::from([OsStr::new("cargo-binstall").into()]);
assert_eq!(extracted_files.get_dir(Path::new(".")).unwrap(), &files);
assert_eq!(
extracted_files.0,
HashMap::from([
(
Path::new("cargo-binstall").into(),
ExtractedFilesEntry::File
),
(
Path::new(".").into(),
ExtractedFilesEntry::Dir(Box::new(files))
)
])
);
let cargo_watch_url = "https://github.com/watchexec/cargo-watch/releases/download/v8.4.0/cargo-watch-v8.4.0-aarch64-unknown-linux-gnu.tar.xz";
let extracted_files = Download::new(client, Url::parse(cargo_watch_url).unwrap())
.and_extract(PkgFmt::Txz, tempdir().unwrap())
.await
.unwrap();
let dir = Path::new("cargo-watch-v8.4.0-aarch64-unknown-linux-gnu");
assert_eq!(
extracted_files.get_dir(Path::new(".")).unwrap(),
&HashSet::from([dir.as_os_str().into()])
);
assert_eq!(
extracted_files.get_dir(dir).unwrap(),
&HashSet::from_iter(
[
"README.md",
"LICENSE",
"completions",
"cargo-watch",
"cargo-watch.1"
]
.iter()
.map(OsStr::new)
.map(Box::<OsStr>::from)
),
);
assert_eq!(
extracted_files.get_dir(&dir.join("completions")).unwrap(),
&HashSet::from([OsStr::new("zsh").into()]),
);
assert!(extracted_files.has_file(&dir.join("cargo-watch")));
assert!(extracted_files.has_file(&dir.join("cargo-watch.1")));
assert!(extracted_files.has_file(&dir.join("LICENSE")));
assert!(extracted_files.has_file(&dir.join("README.md")));
assert!(!extracted_files.has_file(&dir.join("completions")));
assert!(!extracted_files.has_file(&dir.join("asdfcqwe")));
assert!(extracted_files.has_file(&dir.join("completions/zsh")));
}
}

View file

@ -1,4 +1,5 @@
use std::{
borrow::Cow,
fs,
future::Future,
io::{self, Write},
@ -13,11 +14,12 @@ use tokio_util::io::StreamReader;
use tracing::debug;
use super::{
extracter::*, zip_extraction::extract_zip_entry, DownloadError, TarBasedFmt, ZipError,
extracter::*, zip_extraction::extract_zip_entry, DownloadError, ExtractedFiles, TarBasedFmt,
ZipError,
};
use crate::utils::{extract_with_blocking_task, StreamReadable};
pub async fn extract_bin<S>(stream: S, path: &Path) -> Result<(), DownloadError>
pub async fn extract_bin<S>(stream: S, path: &Path) -> Result<ExtractedFiles, DownloadError>
where
S: Stream<Item = Result<Bytes, DownloadError>> + Send + Sync + Unpin + 'static,
{
@ -32,10 +34,16 @@ where
file.flush()
})
.await
.await?;
let mut extracted_files = ExtractedFiles::new();
extracted_files.add_file(Path::new(path.file_name().unwrap()));
Ok(extracted_files)
}
pub async fn extract_zip<S>(stream: S, path: &Path) -> Result<(), DownloadError>
pub async fn extract_zip<S>(stream: S, path: &Path) -> Result<ExtractedFiles, DownloadError>
where
S: Stream<Item = Result<Bytes, DownloadError>> + Unpin + Send + Sync + 'static,
{
@ -44,9 +52,10 @@ where
let reader = StreamReader::new(stream);
let mut zip = ZipFileReader::new(reader);
let mut buf = BytesMut::with_capacity(4 * 4096);
let mut extracted_files = ExtractedFiles::new();
while let Some(mut zip_reader) = zip.next_entry().await.map_err(ZipError::from_inner)? {
extract_zip_entry(&mut zip_reader, path, &mut buf).await?;
extract_zip_entry(&mut zip_reader, path, &mut buf, &mut extracted_files).await?;
// extract_zip_entry would read the zip_reader until read the file until
// eof unless extract_zip itself is cancelled or an error is raised.
@ -55,33 +64,82 @@ where
zip = zip_reader.done().await.map_err(ZipError::from_inner)?;
}
Ok(())
Ok(extracted_files)
}
pub async fn extract_tar_based_stream<S>(
stream: S,
path: &Path,
dst: &Path,
fmt: TarBasedFmt,
) -> Result<(), DownloadError>
) -> Result<ExtractedFiles, DownloadError>
where
S: Stream<Item = Result<Bytes, DownloadError>> + Send + Sync + Unpin + 'static,
{
debug!("Extracting from {fmt} archive to {path:#?}");
debug!("Extracting from {fmt} archive to {}", dst.display());
extract_with_blocking_decoder(stream, path, move |rx, path| {
create_tar_decoder(StreamReadable::new(rx), fmt)?.unpack(path)
extract_with_blocking_decoder(stream, dst, move |rx, dst| {
// Adapted from https://docs.rs/tar/latest/src/tar/archive.rs.html#189-219
if dst.symlink_metadata().is_err() {
fs::create_dir_all(dst)?;
}
// Canonicalizing the dst directory will prepend the path with '\\?\'
// on windows which will allow windows APIs to treat the path as an
// extended-length path with a 32,767 character limit. Otherwise all
// unpacked paths over 260 characters will fail on creation with a
// NotFound exception.
let dst = &dst
.canonicalize()
.map(Cow::Owned)
.unwrap_or(Cow::Borrowed(dst));
let mut tar = create_tar_decoder(StreamReadable::new(rx), fmt)?;
let mut entries = tar.entries()?;
let mut extracted_files = ExtractedFiles::new();
// Delay any directory entries until the end (they will be created if needed by
// descendants), to ensure that directory permissions do not interfer with descendant
// extraction.
let mut directories = Vec::new();
while let Some(mut entry) = entries.next().transpose()? {
match entry.header().entry_type() {
tar::EntryType::Regular => {
// unpack_in returns false if the path contains ".."
// and is skipped.
if entry.unpack_in(dst)? {
extracted_files.add_file(&entry.path()?);
}
}
tar::EntryType::Directory => {
directories.push(entry);
}
_ => (),
}
}
for mut dir in directories {
if dir.unpack_in(dst)? {
extracted_files.add_dir(&dir.path()?);
}
}
Ok(extracted_files)
})
.await
}
fn extract_with_blocking_decoder<S, F>(
fn extract_with_blocking_decoder<S, F, T>(
stream: S,
path: &Path,
f: F,
) -> impl Future<Output = Result<(), DownloadError>>
) -> impl Future<Output = Result<T, DownloadError>>
where
S: Stream<Item = Result<Bytes, DownloadError>> + Send + Sync + Unpin + 'static,
F: FnOnce(mpsc::Receiver<Bytes>, &Path) -> io::Result<()> + Send + Sync + 'static,
F: FnOnce(mpsc::Receiver<Bytes>, &Path) -> io::Result<T> + Send + Sync + 'static,
T: Send + 'static,
{
let path = path.to_owned();

View file

@ -0,0 +1,102 @@
use std::{
collections::{hash_map::Entry as HashMapEntry, HashMap, HashSet},
ffi::OsStr,
path::Path,
};
#[derive(Debug)]
#[cfg_attr(test, derive(Eq, PartialEq))]
pub enum ExtractedFilesEntry {
Dir(Box<HashSet<Box<OsStr>>>),
File,
}
impl ExtractedFilesEntry {
fn new_dir(file_name: Option<&OsStr>) -> Self {
ExtractedFilesEntry::Dir(Box::new(
file_name
.map(|file_name| HashSet::from([file_name.into()]))
.unwrap_or_else(HashSet::default),
))
}
}
#[derive(Debug)]
pub struct ExtractedFiles(pub(super) HashMap<Box<Path>, ExtractedFilesEntry>);
impl ExtractedFiles {
pub(super) fn new() -> Self {
Self(Default::default())
}
/// * `path` - must be canonical and must not be empty
///
/// NOTE that if the entry for the `path` is previously set to a dir,
/// it would be replaced with a file.
pub(super) fn add_file(&mut self, path: &Path) {
self.0.insert(path.into(), ExtractedFilesEntry::File);
self.add_dir_if_has_parent(path);
}
fn add_dir_if_has_parent(&mut self, path: &Path) {
if let Some(parent) = path.parent() {
if !parent.as_os_str().is_empty() {
self.add_dir_inner(parent, path.file_name());
self.add_dir_if_has_parent(parent);
} else {
self.add_dir_inner(Path::new("."), path.file_name())
}
}
}
/// * `path` - must be canonical and must not be empty
///
/// NOTE that if the entry for the `path` is previously set to a dir,
/// it would be replaced with an empty Dir entry.
pub(super) fn add_dir(&mut self, path: &Path) {
self.add_dir_inner(path, None);
self.add_dir_if_has_parent(path);
}
/// * `path` - must be canonical and must not be empty
///
/// NOTE that if the entry for the `path` is previously set to a dir,
/// it would be replaced with a Dir entry containing `file_name` if it
/// is `Some(..)`, or an empty Dir entry.
fn add_dir_inner(&mut self, path: &Path, file_name: Option<&OsStr>) {
match self.0.entry(path.into()) {
HashMapEntry::Vacant(entry) => {
entry.insert(ExtractedFilesEntry::new_dir(file_name));
}
HashMapEntry::Occupied(entry) => match entry.into_mut() {
ExtractedFilesEntry::Dir(hash_set) => {
if let Some(file_name) = file_name {
hash_set.insert(file_name.into());
}
}
entry => *entry = ExtractedFilesEntry::new_dir(file_name),
},
}
}
/// * `path` - must be canonical and must not be empty, but could be "."
/// for top-level.
pub fn get_entry(&self, path: &Path) -> Option<&ExtractedFilesEntry> {
self.0.get(path)
}
/// * `path` - must be canonical and must not be empty, but could be "."
/// for top-level.
pub fn get_dir(&self, path: &Path) -> Option<&HashSet<Box<OsStr>>> {
match self.get_entry(path)? {
ExtractedFilesEntry::Dir(file_names) => Some(file_names),
ExtractedFilesEntry::File => None,
}
}
/// * `path` - must be canonical and must not be empty, but could be "."
/// for top-level.
pub fn has_file(&self, path: &Path) -> bool {
matches!(self.get_entry(path), Some(ExtractedFilesEntry::File))
}
}

View file

@ -12,7 +12,7 @@ use tokio::{
sync::mpsc,
};
use super::DownloadError;
use super::{DownloadError, ExtractedFiles};
use crate::utils::asyncify;
#[derive(Debug, ThisError)]
@ -38,6 +38,7 @@ pub(super) async fn extract_zip_entry<R>(
zip_reader: &mut ZipFileReader<Reading<'_, Take<R>>>,
path: &Path,
buf: &mut BytesMut,
extracted_files: &mut ExtractedFiles,
) -> Result<(), DownloadError>
where
R: AsyncRead + Unpin + Send + Sync,
@ -48,7 +49,7 @@ where
.ok_or_else(|| ZipError(ZipErrorInner::InvalidFilePath(raw_filename.into())))?;
// Calculates the outpath
let outpath = path.join(filename);
let outpath = path.join(&filename);
// Get permissions
let mut perms = None;
@ -64,6 +65,8 @@ where
}
if raw_filename.ends_with('/') {
extracted_files.add_dir(&filename);
// This entry is a dir.
asyncify(move || {
std::fs::create_dir_all(&outpath)?;
@ -75,6 +78,8 @@ where
})
.await?;
} else {
extracted_files.add_file(&filename);
// Use channel size = 5 to minimize the waiting time in the extraction task
let (tx, mut rx) = mpsc::channel::<Bytes>(5);