mirror of
https://github.com/cargo-bins/cargo-binstall.git
synced 2025-05-04 11:10:02 +00:00
Optimize extract_zip
: Use async_zip::read::stream::ZipFileReader
to avoid temporary file (#590)
* Add new dep async_zip v0.0.9 to binstalk-downloader with features "gzip", "zstd", "xz", "bzip2", "tokio". * Refactor: Simplify `async_extracter::extract_*` API * Refactor: Create newtype wrapper of `ZipError` so that the zip can be upgraded without affecting API of this crate. * Enable feature fs of dep tokio in binstalk-downloader * Rewrite `extract_zip` to use `async_zip::read::stream::ZipFileReader` which avoids writing the zip file to a temporary file and then read it back into memory. * Refactor: Impl new fn `await_on_option` and use it * Optimize `tokio::select!`: Make them biased and check for cancellation first to make cancellation takes effect ASAP. * Rm unused dep zip from binstalk-downloader Signed-off-by: Jiahao XU <Jiahao_XU@outlook.com>
This commit is contained in:
parent
e6f969245a
commit
3b1a7f2c78
8 changed files with 372 additions and 94 deletions
|
@ -7,7 +7,6 @@ use thiserror::Error as ThisError;
|
|||
use tracing::{debug, instrument};
|
||||
|
||||
pub use binstalk_types::cargo_toml_binstall::PkgFmt;
|
||||
pub use zip::result::ZipError;
|
||||
|
||||
use crate::remote::{Client, Error as RemoteError, Url};
|
||||
|
||||
|
@ -20,6 +19,12 @@ pub use async_tar_visitor::*;
|
|||
mod extracter;
|
||||
mod stream_readable;
|
||||
|
||||
mod zip_extraction;
|
||||
pub use zip_extraction::ZipError;
|
||||
|
||||
mod utils;
|
||||
use utils::await_on_option;
|
||||
|
||||
pub type CancellationFuture = Option<Pin<Box<dyn Future<Output = Result<(), io::Error>> + Send>>>;
|
||||
|
||||
#[derive(Debug, ThisError)]
|
||||
|
@ -112,15 +117,13 @@ impl Download {
|
|||
|
||||
debug!("Downloading and extracting then in-memory processing");
|
||||
|
||||
let ret = if let Some(cancellation_future) = cancellation_future {
|
||||
tokio::select! {
|
||||
res = extract_tar_based_stream_and_visit(stream, fmt, visitor) => res?,
|
||||
res = cancellation_future => {
|
||||
Err(res.err().unwrap_or_else(|| io::Error::from(DownloadError::UserAbort)))?
|
||||
}
|
||||
let ret = tokio::select! {
|
||||
biased;
|
||||
|
||||
res = await_on_option(cancellation_future) => {
|
||||
Err(res.err().unwrap_or_else(|| io::Error::from(DownloadError::UserAbort)))?
|
||||
}
|
||||
} else {
|
||||
extract_tar_based_stream_and_visit(stream, fmt, visitor).await?
|
||||
res = extract_tar_based_stream_and_visit(stream, fmt, visitor) => res?,
|
||||
};
|
||||
|
||||
debug!("Download, extraction and in-memory procession OK");
|
||||
|
@ -145,7 +148,11 @@ impl Download {
|
|||
path: &Path,
|
||||
cancellation_future: CancellationFuture,
|
||||
) -> Result<(), DownloadError> {
|
||||
let stream = this.client.get_stream(this.url).await?;
|
||||
let stream = this
|
||||
.client
|
||||
.get_stream(this.url)
|
||||
.await?
|
||||
.map(|res| res.map_err(DownloadError::from));
|
||||
|
||||
debug!("Downloading and extracting to: '{}'", path.display());
|
||||
|
||||
|
|
|
@ -1,24 +1,25 @@
|
|||
use std::{fs, io::Seek, path::Path};
|
||||
use std::{fs, path::Path};
|
||||
|
||||
use async_zip::read::stream::ZipFileReader;
|
||||
use bytes::Bytes;
|
||||
use futures_util::stream::Stream;
|
||||
use scopeguard::{guard, ScopeGuard};
|
||||
use tempfile::tempfile;
|
||||
use tokio::task::block_in_place;
|
||||
use tokio_util::io::StreamReader;
|
||||
use tracing::debug;
|
||||
|
||||
use super::{
|
||||
extracter::*, stream_readable::StreamReadable, CancellationFuture, DownloadError, TarBasedFmt,
|
||||
await_on_option, extracter::*, stream_readable::StreamReadable,
|
||||
zip_extraction::extract_zip_entry, CancellationFuture, DownloadError, TarBasedFmt, ZipError,
|
||||
};
|
||||
|
||||
pub async fn extract_bin<S, E>(
|
||||
pub async fn extract_bin<S>(
|
||||
stream: S,
|
||||
path: &Path,
|
||||
cancellation_future: CancellationFuture,
|
||||
) -> Result<(), DownloadError>
|
||||
where
|
||||
S: Stream<Item = Result<Bytes, E>> + Unpin + 'static,
|
||||
DownloadError: From<E>,
|
||||
S: Stream<Item = Result<Bytes, DownloadError>> + Unpin + 'static,
|
||||
{
|
||||
let mut reader = StreamReadable::new(stream, cancellation_future).await;
|
||||
block_in_place(move || {
|
||||
|
@ -42,39 +43,45 @@ where
|
|||
})
|
||||
}
|
||||
|
||||
pub async fn extract_zip<S, E>(
|
||||
pub async fn extract_zip<S>(
|
||||
stream: S,
|
||||
path: &Path,
|
||||
cancellation_future: CancellationFuture,
|
||||
) -> Result<(), DownloadError>
|
||||
where
|
||||
S: Stream<Item = Result<Bytes, E>> + Unpin + 'static,
|
||||
DownloadError: From<E>,
|
||||
S: Stream<Item = Result<Bytes, DownloadError>> + Unpin + Send + Sync + 'static,
|
||||
{
|
||||
let mut reader = StreamReadable::new(stream, cancellation_future).await;
|
||||
block_in_place(move || {
|
||||
fs::create_dir_all(path.parent().unwrap())?;
|
||||
debug!("Decompressing from zip archive to `{}`", path.display());
|
||||
|
||||
let mut file = tempfile()?;
|
||||
let extract_future = Box::pin(async move {
|
||||
let reader = StreamReader::new(stream);
|
||||
let mut zip = ZipFileReader::new(reader);
|
||||
|
||||
reader.copy(&mut file)?;
|
||||
while let Some(entry) = zip.entry_reader().await.map_err(ZipError::from_inner)? {
|
||||
extract_zip_entry(entry, path).await?;
|
||||
}
|
||||
|
||||
// rewind it so that we can pass it to unzip
|
||||
file.rewind()?;
|
||||
Ok(())
|
||||
});
|
||||
|
||||
unzip(file, path)
|
||||
})
|
||||
tokio::select! {
|
||||
biased;
|
||||
|
||||
res = await_on_option(cancellation_future) => {
|
||||
Err(res.err().map(DownloadError::from).unwrap_or(DownloadError::UserAbort))
|
||||
}
|
||||
res = extract_future => res,
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn extract_tar_based_stream<S, E>(
|
||||
pub async fn extract_tar_based_stream<S>(
|
||||
stream: S,
|
||||
path: &Path,
|
||||
fmt: TarBasedFmt,
|
||||
cancellation_future: CancellationFuture,
|
||||
) -> Result<(), DownloadError>
|
||||
where
|
||||
S: Stream<Item = Result<Bytes, E>> + Unpin + 'static,
|
||||
DownloadError: From<E>,
|
||||
S: Stream<Item = Result<Bytes, DownloadError>> + Unpin + 'static,
|
||||
{
|
||||
let reader = StreamReadable::new(stream, cancellation_future).await;
|
||||
block_in_place(move || {
|
||||
|
|
|
@ -1,18 +1,12 @@
|
|||
use std::{
|
||||
fs::File,
|
||||
io::{self, BufRead, Read},
|
||||
path::Path,
|
||||
};
|
||||
use std::io::{self, BufRead, Read};
|
||||
|
||||
use bzip2::bufread::BzDecoder;
|
||||
use flate2::bufread::GzDecoder;
|
||||
use tar::Archive;
|
||||
use tracing::debug;
|
||||
use xz2::bufread::XzDecoder;
|
||||
use zip::read::ZipArchive;
|
||||
use zstd::stream::Decoder as ZstdDecoder;
|
||||
|
||||
use super::{DownloadError, TarBasedFmt};
|
||||
use super::TarBasedFmt;
|
||||
|
||||
pub fn create_tar_decoder(
|
||||
dat: impl BufRead + 'static,
|
||||
|
@ -35,12 +29,3 @@ pub fn create_tar_decoder(
|
|||
|
||||
Ok(Archive::new(r))
|
||||
}
|
||||
|
||||
pub fn unzip(dat: File, dst: &Path) -> Result<(), DownloadError> {
|
||||
debug!("Decompressing from zip archive to `{dst:?}`");
|
||||
|
||||
let mut zip = ZipArchive::new(dat)?;
|
||||
zip.extract(dst)?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
|
|
@ -7,7 +7,7 @@ use bytes::{Buf, Bytes};
|
|||
use futures_util::stream::{Stream, StreamExt};
|
||||
use tokio::runtime::Handle;
|
||||
|
||||
use super::{CancellationFuture, DownloadError};
|
||||
use super::{await_on_option, CancellationFuture, DownloadError};
|
||||
|
||||
/// This wraps an AsyncIterator as a `Read`able.
|
||||
/// It must be used in non-async context only,
|
||||
|
@ -120,17 +120,14 @@ where
|
|||
|
||||
if !bytes.has_remaining() {
|
||||
let option = self.handle.block_on(async {
|
||||
if let Some(cancellation_future) = self.cancellation_future.as_mut() {
|
||||
tokio::select! {
|
||||
biased;
|
||||
let cancellation_future = self.cancellation_future.as_mut();
|
||||
tokio::select! {
|
||||
biased;
|
||||
|
||||
res = cancellation_future => {
|
||||
Err(res.err().unwrap_or_else(|| io::Error::from(DownloadError::UserAbort)))
|
||||
},
|
||||
res = next_stream(&mut self.stream) => res,
|
||||
}
|
||||
} else {
|
||||
next_stream(&mut self.stream).await
|
||||
res = await_on_option(cancellation_future) => {
|
||||
Err(res.err().unwrap_or_else(|| io::Error::from(DownloadError::UserAbort)))
|
||||
},
|
||||
res = next_stream(&mut self.stream) => res,
|
||||
}
|
||||
})?;
|
||||
|
||||
|
|
16
crates/binstalk-downloader/src/download/utils.rs
Normal file
16
crates/binstalk-downloader/src/download/utils.rs
Normal file
|
@ -0,0 +1,16 @@
|
|||
use std::future::{pending, Future};
|
||||
|
||||
/// Await on `future` if it is not `None`, or call [`pending`]
|
||||
/// so that this branch would never get selected again.
|
||||
///
|
||||
/// Designed to use with [`tokio::select`].
|
||||
pub(super) async fn await_on_option<Fut, R>(future: Option<Fut>) -> R
|
||||
where
|
||||
Fut: Future<Output = R>,
|
||||
{
|
||||
if let Some(future) = future {
|
||||
future.await
|
||||
} else {
|
||||
pending().await
|
||||
}
|
||||
}
|
149
crates/binstalk-downloader/src/download/zip_extraction.rs
Normal file
149
crates/binstalk-downloader/src/download/zip_extraction.rs
Normal file
|
@ -0,0 +1,149 @@
|
|||
use std::{
|
||||
io,
|
||||
path::{Component, Path, PathBuf},
|
||||
};
|
||||
|
||||
use async_zip::{read::ZipEntryReader, ZipEntryExt};
|
||||
use thiserror::Error as ThisError;
|
||||
use tokio::{fs, io::AsyncRead, task::spawn_blocking};
|
||||
|
||||
use super::DownloadError;
|
||||
|
||||
#[derive(Debug, ThisError)]
|
||||
enum ZipErrorInner {
|
||||
#[error(transparent)]
|
||||
Inner(#[from] async_zip::error::ZipError),
|
||||
|
||||
#[error("Invalid file path: {0}")]
|
||||
InvalidFilePath(Box<str>),
|
||||
}
|
||||
|
||||
#[derive(Debug, ThisError)]
|
||||
#[error(transparent)]
|
||||
pub struct ZipError(#[from] ZipErrorInner);
|
||||
|
||||
impl ZipError {
|
||||
pub(super) fn from_inner(err: async_zip::error::ZipError) -> Self {
|
||||
Self(ZipErrorInner::Inner(err))
|
||||
}
|
||||
}
|
||||
|
||||
pub(super) async fn extract_zip_entry<R>(
|
||||
entry: ZipEntryReader<'_, R>,
|
||||
path: &Path,
|
||||
) -> Result<(), DownloadError>
|
||||
where
|
||||
R: AsyncRead + Unpin + Send + Sync,
|
||||
{
|
||||
// Sanitize filename
|
||||
let raw_filename = entry.entry().filename();
|
||||
let filename = check_filename_and_normalize(raw_filename)
|
||||
.ok_or_else(|| ZipError(ZipErrorInner::InvalidFilePath(raw_filename.into())))?;
|
||||
|
||||
// Calculates the outpath
|
||||
let outpath = path.join(filename);
|
||||
|
||||
// Get permissions
|
||||
let mut perms = None;
|
||||
|
||||
#[cfg(unix)]
|
||||
{
|
||||
use std::{fs::Permissions, os::unix::fs::PermissionsExt};
|
||||
|
||||
if let Some(mode) = entry.entry().unix_permissions() {
|
||||
let mode: u16 = mode;
|
||||
perms = Some(Permissions::from_mode(mode as u32));
|
||||
}
|
||||
}
|
||||
|
||||
if raw_filename.ends_with('/') {
|
||||
// This entry is a dir.
|
||||
asyncify(move || {
|
||||
std::fs::create_dir_all(&outpath)?;
|
||||
if let Some(perms) = perms {
|
||||
std::fs::set_permissions(&outpath, perms)?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
})
|
||||
.await?;
|
||||
} else {
|
||||
// This entry is a file.
|
||||
let mut outfile = asyncify(move || {
|
||||
if let Some(p) = outpath.parent() {
|
||||
std::fs::create_dir_all(p)?;
|
||||
}
|
||||
let outfile = std::fs::File::create(&outpath)?;
|
||||
|
||||
if let Some(perms) = perms {
|
||||
outfile.set_permissions(perms)?;
|
||||
}
|
||||
|
||||
Ok(outfile)
|
||||
})
|
||||
.await
|
||||
.map(fs::File::from_std)?;
|
||||
|
||||
entry
|
||||
.copy_to_end_crc(&mut outfile, 64 * 1024)
|
||||
.await
|
||||
.map_err(ZipError::from_inner)?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Ensure the file path is safe to use as a [`Path`].
|
||||
///
|
||||
/// - It can't contain NULL bytes
|
||||
/// - It can't resolve to a path outside the current directory
|
||||
/// > `foo/../bar` is fine, `foo/../../bar` is not.
|
||||
/// - It can't be an absolute path
|
||||
///
|
||||
/// It will then return a normalized path.
|
||||
///
|
||||
/// This will read well-formed ZIP files correctly, and is resistant
|
||||
/// to path-based exploits.
|
||||
///
|
||||
/// This function is adapted from `zip::ZipFile::enclosed_name`.
|
||||
fn check_filename_and_normalize(filename: &str) -> Option<PathBuf> {
|
||||
if filename.contains('\0') {
|
||||
return None;
|
||||
}
|
||||
|
||||
let mut path = PathBuf::new();
|
||||
|
||||
// The following loop is adapted from
|
||||
// `normalize_path::NormalizePath::normalize`.
|
||||
for component in Path::new(filename).components() {
|
||||
match component {
|
||||
Component::Prefix(_) | Component::RootDir => return None,
|
||||
Component::CurDir => (),
|
||||
Component::ParentDir => {
|
||||
if !path.pop() {
|
||||
// `PathBuf::pop` returns false if there is no parent.
|
||||
// which means the path is invalid.
|
||||
return None;
|
||||
}
|
||||
}
|
||||
Component::Normal(c) => path.push(c),
|
||||
}
|
||||
}
|
||||
|
||||
Some(path)
|
||||
}
|
||||
|
||||
/// Copied from tokio https://docs.rs/tokio/latest/src/tokio/fs/mod.rs.html#132
|
||||
async fn asyncify<F, T>(f: F) -> io::Result<T>
|
||||
where
|
||||
F: FnOnce() -> io::Result<T> + Send + 'static,
|
||||
T: Send + 'static,
|
||||
{
|
||||
match spawn_blocking(f).await {
|
||||
Ok(res) => res,
|
||||
Err(_) => Err(io::Error::new(
|
||||
io::ErrorKind::Other,
|
||||
"background task failed",
|
||||
)),
|
||||
}
|
||||
}
|
Loading…
Add table
Add a link
Reference in a new issue