mirror of
https://github.com/cargo-bins/cargo-binstall.git
synced 2025-04-24 14:28:42 +00:00
Use rc-zip-sync for zip extraction (#1942)
* Use rc-zip-sync for zip extraction Fixed #1080 In this commit, binstalk-downloader is updated to - first download the zip into a temporary file, since there is no correct way to extract zip from a stream. - then use rc-zip-sync to read from the zip and extract it to filesystem. Signed-off-by: Jiahao XU <Jiahao_XU@outlook.com> * Fix returned `ExtractedFiles` in `do_extract_zip` Signed-off-by: Jiahao XU <30436523+NobodyXu@users.noreply.github.com> * Fix clippy in zip_extraction.rs Signed-off-by: Jiahao XU <30436523+NobodyXu@users.noreply.github.com> --------- Signed-off-by: Jiahao XU <Jiahao_XU@outlook.com> Signed-off-by: Jiahao XU <30436523+NobodyXu@users.noreply.github.com>
This commit is contained in:
parent
c16790a16f
commit
e704abe7ac
5 changed files with 406 additions and 327 deletions
|
@ -7,6 +7,7 @@ use thiserror::Error as ThisError;
|
|||
use tracing::{debug, error, instrument};
|
||||
|
||||
pub use binstalk_types::cargo_toml_binstall::{PkgFmt, TarBasedFmt};
|
||||
pub use rc_zip_sync::rc_zip::error::Error as ZipError;
|
||||
|
||||
use crate::remote::{Client, Error as RemoteError, Response, Url};
|
||||
|
||||
|
@ -23,7 +24,6 @@ mod extracted_files;
|
|||
pub use extracted_files::{ExtractedFiles, ExtractedFilesEntry};
|
||||
|
||||
mod zip_extraction;
|
||||
pub use zip_extraction::ZipError;
|
||||
|
||||
#[derive(Debug, ThisError)]
|
||||
#[non_exhaustive]
|
||||
|
|
|
@ -6,18 +6,17 @@ use std::{
|
|||
path::{Component, Path, PathBuf},
|
||||
};
|
||||
|
||||
use async_zip::base::read::stream::ZipFileReader;
|
||||
use bytes::{Bytes, BytesMut};
|
||||
use bytes::Bytes;
|
||||
use futures_util::Stream;
|
||||
use tempfile::tempfile as create_tmpfile;
|
||||
use tokio::sync::mpsc;
|
||||
use tokio_util::io::StreamReader;
|
||||
use tracing::debug;
|
||||
|
||||
use super::{
|
||||
extracter::*, zip_extraction::extract_zip_entry, DownloadError, ExtractedFiles, TarBasedFmt,
|
||||
ZipError,
|
||||
use super::{extracter::*, DownloadError, ExtractedFiles, TarBasedFmt};
|
||||
use crate::{
|
||||
download::zip_extraction::do_extract_zip,
|
||||
utils::{extract_with_blocking_task, StreamReadable},
|
||||
};
|
||||
use crate::utils::{extract_with_blocking_task, StreamReadable};
|
||||
|
||||
pub async fn extract_bin<S>(stream: S, path: &Path) -> Result<ExtractedFiles, DownloadError>
|
||||
where
|
||||
|
@ -25,52 +24,30 @@ where
|
|||
{
|
||||
debug!("Writing to `{}`", path.display());
|
||||
|
||||
extract_with_blocking_decoder(stream, path, |mut rx, path| {
|
||||
let mut file = fs::File::create(path)?;
|
||||
extract_with_blocking_decoder(stream, path, |rx, path| {
|
||||
let mut extracted_files = ExtractedFiles::new();
|
||||
|
||||
while let Some(bytes) = rx.blocking_recv() {
|
||||
file.write_all(&bytes)?;
|
||||
}
|
||||
extracted_files.add_file(Path::new(path.file_name().unwrap()));
|
||||
|
||||
file.flush()
|
||||
write_stream_to_file(rx, fs::File::create(path)?)?;
|
||||
|
||||
Ok(extracted_files)
|
||||
})
|
||||
.await?;
|
||||
|
||||
let mut extracted_files = ExtractedFiles::new();
|
||||
|
||||
extracted_files.add_file(Path::new(path.file_name().unwrap()));
|
||||
|
||||
Ok(extracted_files)
|
||||
.await
|
||||
}
|
||||
|
||||
pub async fn extract_zip<S>(stream: S, path: &Path) -> Result<ExtractedFiles, DownloadError>
|
||||
where
|
||||
S: Stream<Item = Result<Bytes, DownloadError>> + Unpin + Send + Sync,
|
||||
{
|
||||
debug!("Decompressing from zip archive to `{}`", path.display());
|
||||
debug!("Downloading from zip archive to tempfile");
|
||||
|
||||
let reader = StreamReader::new(stream);
|
||||
let mut zip = ZipFileReader::with_tokio(reader);
|
||||
let mut buf = BytesMut::with_capacity(4 * 4096);
|
||||
let mut extracted_files = ExtractedFiles::new();
|
||||
extract_with_blocking_decoder(stream, path, |rx, path| {
|
||||
debug!("Decompressing from zip archive to `{}`", path.display());
|
||||
|
||||
while let Some(mut zip_reader) = zip.next_with_entry().await.map_err(ZipError::from_inner)? {
|
||||
extract_zip_entry(
|
||||
zip_reader.reader_mut(),
|
||||
path,
|
||||
&mut buf,
|
||||
&mut extracted_files,
|
||||
)
|
||||
.await?;
|
||||
|
||||
// extract_zip_entry would read the zip_reader until read the file until
|
||||
// eof unless extract_zip itself is cancelled or an error is raised.
|
||||
//
|
||||
// So calling done here should not raise any error.
|
||||
zip = zip_reader.done().await.map_err(ZipError::from_inner)?;
|
||||
}
|
||||
|
||||
Ok(extracted_files)
|
||||
do_extract_zip(write_stream_to_file(rx, create_tmpfile()?)?, path).map_err(io::Error::from)
|
||||
})
|
||||
.await
|
||||
}
|
||||
|
||||
pub async fn extract_tar_based_stream<S>(
|
||||
|
@ -176,3 +153,15 @@ where
|
|||
f(rx, &path)
|
||||
})
|
||||
}
|
||||
|
||||
fn write_stream_to_file(mut rx: mpsc::Receiver<Bytes>, f: fs::File) -> io::Result<fs::File> {
|
||||
let mut f = io::BufWriter::new(f);
|
||||
|
||||
while let Some(bytes) = rx.blocking_recv() {
|
||||
f.write_all(&bytes)?;
|
||||
}
|
||||
|
||||
f.flush()?;
|
||||
|
||||
f.into_inner().map_err(io::IntoInnerError::into_error)
|
||||
}
|
||||
|
|
|
@ -1,232 +1,66 @@
|
|||
use std::{
|
||||
borrow::Cow,
|
||||
io::Write,
|
||||
path::{Component, Path, PathBuf},
|
||||
fs::{self, create_dir_all, File},
|
||||
io::{self, Read},
|
||||
path::Path,
|
||||
};
|
||||
|
||||
use async_zip::{
|
||||
base::{read::WithEntry, read::ZipEntryReader},
|
||||
ZipString,
|
||||
};
|
||||
use bytes::{Bytes, BytesMut};
|
||||
use futures_util::future::try_join;
|
||||
use thiserror::Error as ThisError;
|
||||
use tokio::{
|
||||
io::{AsyncRead, AsyncReadExt},
|
||||
sync::mpsc,
|
||||
};
|
||||
use tokio_util::compat::FuturesAsyncReadCompatExt;
|
||||
use cfg_if::cfg_if;
|
||||
use rc_zip_sync::{rc_zip::parse::EntryKind, ReadZip};
|
||||
|
||||
use super::{DownloadError, ExtractedFiles};
|
||||
use crate::utils::asyncify;
|
||||
|
||||
#[derive(Debug, ThisError)]
|
||||
enum ZipErrorInner {
|
||||
#[error(transparent)]
|
||||
Inner(#[from] async_zip::error::ZipError),
|
||||
pub(super) fn do_extract_zip(f: File, dir: &Path) -> Result<ExtractedFiles, DownloadError> {
|
||||
let mut extracted_files = ExtractedFiles::new();
|
||||
|
||||
#[error("Invalid file path: {0}")]
|
||||
InvalidFilePath(Box<str>),
|
||||
}
|
||||
for entry in f.read_zip()?.entries() {
|
||||
let Some(name) = entry.sanitized_name().map(Path::new) else {
|
||||
continue;
|
||||
};
|
||||
let path = dir.join(name);
|
||||
|
||||
#[derive(Debug, ThisError)]
|
||||
#[error(transparent)]
|
||||
pub struct ZipError(#[from] ZipErrorInner);
|
||||
let do_extract_file = || {
|
||||
let mut entry_writer = File::create(&path)?;
|
||||
let mut entry_reader = entry.reader();
|
||||
io::copy(&mut entry_reader, &mut entry_writer)?;
|
||||
|
||||
impl ZipError {
|
||||
pub(super) fn from_inner(err: async_zip::error::ZipError) -> Self {
|
||||
Self(ZipErrorInner::Inner(err))
|
||||
}
|
||||
}
|
||||
|
||||
pub(super) async fn extract_zip_entry<R>(
|
||||
zip_reader: &mut ZipEntryReader<'_, R, WithEntry<'_>>,
|
||||
path: &Path,
|
||||
buf: &mut BytesMut,
|
||||
extracted_files: &mut ExtractedFiles,
|
||||
) -> Result<(), DownloadError>
|
||||
where
|
||||
R: futures_io::AsyncBufRead + Unpin + Send + Sync,
|
||||
{
|
||||
// Sanitize filename
|
||||
let raw_filename = zip_reader.entry().filename();
|
||||
let (filename, is_dir) = check_filename_and_normalize(raw_filename)?;
|
||||
|
||||
// Calculates the outpath
|
||||
let outpath = path.join(&filename);
|
||||
|
||||
// Get permissions
|
||||
#[cfg_attr(not(unix), allow(unused_mut))]
|
||||
let mut perms = None;
|
||||
|
||||
#[cfg(unix)]
|
||||
{
|
||||
use std::{fs::Permissions, os::unix::fs::PermissionsExt};
|
||||
|
||||
if let Some(mode) = zip_reader.entry().unix_permissions() {
|
||||
// If it is a dir, then it needs to be at least rwx for the current
|
||||
// user so that we can create new files, search for existing files
|
||||
// and list its contents.
|
||||
//
|
||||
// If it is a file, then it needs to be at least readable for the
|
||||
// current user.
|
||||
let mode: u16 = mode | if is_dir { 0o700 } else { 0o400 };
|
||||
perms = Some(Permissions::from_mode(mode as u32));
|
||||
}
|
||||
}
|
||||
|
||||
if is_dir {
|
||||
extracted_files.add_dir(&filename);
|
||||
|
||||
// This entry is a dir.
|
||||
asyncify(move || {
|
||||
std::fs::create_dir_all(&outpath)?;
|
||||
if let Some(perms) = perms {
|
||||
std::fs::set_permissions(&outpath, perms)?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
})
|
||||
.await?;
|
||||
} else {
|
||||
extracted_files.add_file(&filename);
|
||||
|
||||
// Use channel size = 5 to minimize the waiting time in the extraction task
|
||||
let (tx, mut rx) = mpsc::channel::<Bytes>(5);
|
||||
|
||||
// This entry is a file.
|
||||
|
||||
let write_task = asyncify(move || {
|
||||
if let Some(p) = outpath.parent() {
|
||||
std::fs::create_dir_all(p)?;
|
||||
}
|
||||
let mut outfile = std::fs::File::create(&outpath)?;
|
||||
|
||||
while let Some(bytes) = rx.blocking_recv() {
|
||||
outfile.write_all(&bytes)?;
|
||||
}
|
||||
|
||||
outfile.flush()?;
|
||||
|
||||
if let Some(perms) = perms {
|
||||
outfile.set_permissions(perms)?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
});
|
||||
|
||||
let read_task = async move {
|
||||
// Read everything into `tx`
|
||||
copy_file_to_mpsc(zip_reader.compat(), tx, buf).await?;
|
||||
// Check crc32 checksum.
|
||||
//
|
||||
// NOTE that since everything is alread read into the channel,
|
||||
// this function should not read any byte into the `Vec` and
|
||||
// should return `0`.
|
||||
assert_eq!(zip_reader.read_to_end_checked(&mut Vec::new()).await?, 0);
|
||||
Ok(())
|
||||
Ok::<_, io::Error>(())
|
||||
};
|
||||
|
||||
try_join(
|
||||
async move { write_task.await.map_err(From::from) },
|
||||
async move {
|
||||
read_task
|
||||
.await
|
||||
.map_err(ZipError::from_inner)
|
||||
.map_err(DownloadError::from)
|
||||
},
|
||||
)
|
||||
.await?;
|
||||
}
|
||||
let parent = path
|
||||
.parent()
|
||||
.expect("all full entry paths should have parent paths");
|
||||
create_dir_all(parent)?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
match entry.kind() {
|
||||
EntryKind::Symlink => {
|
||||
extracted_files.add_file(name);
|
||||
cfg_if! {
|
||||
if #[cfg(windows)] {
|
||||
do_extract_file()?;
|
||||
} else {
|
||||
match fs::symlink_metadata(&path) {
|
||||
Ok(metadata) if metadata.is_file() => fs::remove_file(&path)?,
|
||||
_ => (),
|
||||
}
|
||||
|
||||
async fn copy_file_to_mpsc<R>(
|
||||
mut entry_reader: R,
|
||||
tx: mpsc::Sender<Bytes>,
|
||||
buf: &mut BytesMut,
|
||||
) -> Result<(), async_zip::error::ZipError>
|
||||
where
|
||||
R: AsyncRead + Unpin + Send + Sync,
|
||||
{
|
||||
// Since BytesMut does not have a max cap, if AsyncReadExt::read_buf returns
|
||||
// 0 then it means Eof.
|
||||
while entry_reader.read_buf(buf).await? != 0 {
|
||||
// Ensure AsyncReadExt::read_buf can read at least 4096B to avoid
|
||||
// frequent expensive read syscalls.
|
||||
//
|
||||
// Performs this reserve before sending the buf over mpsc queue to
|
||||
// increase the possibility of reusing the previous allocation.
|
||||
//
|
||||
// NOTE: `BytesMut` only reuses the previous allocation if it is the
|
||||
// only one holds the reference to it, which is either on the first
|
||||
// iteration or all the `Bytes` in the mpsc queue has been consumed,
|
||||
// written to the file and dropped.
|
||||
//
|
||||
// Since reading from entry would have to wait for external file I/O,
|
||||
// this would give the blocking thread some time to flush `Bytes`
|
||||
// out.
|
||||
//
|
||||
// If all `Bytes` are flushed out, then we can reuse the allocation here.
|
||||
buf.reserve(4096);
|
||||
let mut src = String::new();
|
||||
entry.reader().read_to_string(&mut src)?;
|
||||
|
||||
if tx.send(buf.split().freeze()).await.is_err() {
|
||||
// Same reason as extract_with_blocking_decoder
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Ensure the file path is safe to use as a [`Path`].
|
||||
///
|
||||
/// - It can't contain NULL bytes
|
||||
/// - It can't resolve to a path outside the current directory
|
||||
/// > `foo/../bar` is fine, `foo/../../bar` is not.
|
||||
/// - It can't be an absolute path
|
||||
///
|
||||
/// It will then return a normalized path.
|
||||
///
|
||||
/// This will read well-formed ZIP files correctly, and is resistant
|
||||
/// to path-based exploits.
|
||||
///
|
||||
/// This function is adapted from `zip::ZipFile::enclosed_name`.
|
||||
fn check_filename_and_normalize(filename: &ZipString) -> Result<(PathBuf, bool), DownloadError> {
|
||||
let filename = filename
|
||||
.as_str()
|
||||
.map(Cow::Borrowed)
|
||||
.unwrap_or_else(|_| String::from_utf8_lossy(filename.as_bytes()));
|
||||
|
||||
let bail = |filename: Cow<'_, str>| {
|
||||
Err(DownloadError::from(ZipError(
|
||||
ZipErrorInner::InvalidFilePath(filename.into_owned().into()),
|
||||
)))
|
||||
};
|
||||
|
||||
if filename.contains('\0') {
|
||||
return bail(filename);
|
||||
}
|
||||
|
||||
let mut path = PathBuf::new();
|
||||
|
||||
// The following loop is adapted from
|
||||
// `normalize_path::NormalizePath::normalize`.
|
||||
for component in Path::new(&*filename).components() {
|
||||
match component {
|
||||
Component::Prefix(_) | Component::RootDir => return bail(filename),
|
||||
Component::CurDir => (),
|
||||
Component::ParentDir => {
|
||||
if !path.pop() {
|
||||
// `PathBuf::pop` returns false if there is no parent.
|
||||
// which means the path is invalid.
|
||||
return bail(filename);
|
||||
// validate pointing path before creating a symbolic link
|
||||
if src.contains("..") {
|
||||
continue;
|
||||
}
|
||||
std::os::unix::fs::symlink(src, &path)?;
|
||||
}
|
||||
}
|
||||
}
|
||||
Component::Normal(c) => path.push(c),
|
||||
EntryKind::Directory => (),
|
||||
EntryKind::File => {
|
||||
extracted_files.add_file(name);
|
||||
do_extract_file()?;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok((path, filename.ends_with('/')))
|
||||
Ok(extracted_files)
|
||||
}
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue