cargo-binstall/crates/binstalk-downloader/src/download/zip_extraction.rs
Jiahao XU db45f2fb7f
Refactor binstalk-downloader APIs: Remove cancellation_future plus optimizations (#591)
- Refactor: Mv fn `utils::asyncify` into mod `utils`
 - Improve err msg for task failure in `utils::asyncify`
 - Make sure `asyncify` always returns the same annoymous type
   that implements `Future` if the `T` is same.
 - Rewrite `extract_bin` to avoid `block_in_place`
   support cancellation by dropping
 - Rm unused dep scopeguard from binstalk-downloader
 - Rewrite `extract_tar_based_stream` so that it is cancellable by dropping
 - Unbox `extract_future` in `async_extracter::extract_zip`
 - Refactor `Download` API: Remove `CancellationFuture` as param

   since all futures returned by `Download::and_*` does not call
   `block_in_place`, so they can be cancelled by drop instead of using this
   cumbersome hack.
 - Fix exports from mod `async_tar_visitor`
 - Make `signal::{ignore_signals, wait_on_cancellation_signal}` private
 - Rm the global variable `CANCELLED` in `wait_on_cancellation_signal`
   and rm fn `wait_on_cancellation_signal_inner`
 - Optimize `wait_on_cancellation_signal`: Avoid `tokio::select!` on `not(unix)`
 - Rm unnecessary `tokio::select!` in `wait_on_cancellation_signal` on unix
   Since `unix::wait_on_cancellation_signal_unix` already waits for ctrl + c signal.
 - Optimize `extract_bin`: Send `Bytes` to blocking thread for zero-copy
 - Optimize `extract_with_blocking_decoder`: Avoid dup monomorphization
 - Box fut of `fetch_crate_cratesio` in `PackageInfo::resolve`
 - Optimize `extract_zip_entry`: Spawn only one blocking task per fn call

   by using a mspc queue for the data to be written to the `outfile`.

   This would improve efficiency as using `tokio::fs::File` is expensive:
   It spawns a new blocking task, which needs one heap allocation and then
   pushed to a mpmc queue, and then wait for it to be done on every loop.

   This also fix a race condition where the unix permission is set before
   the whole file is written, which might be used by attackers.
 - Optimize `extract_zip`: Use one `BytesMut` for entire extraction process
   To avoid frequent allocation and deallocation.
 - Optimize `extract_zip_entry`: Inc prob of reusing alloc in `BytesMut`

   Performs the reserve before sending the buf over mpsc queue to
   increase the possibility of reusing the previous allocation.

   NOTE: `BytesMut` only reuses the previous allocation if it is the
   only one holds the reference to it, which is either on the first
   allocation or all the `Bytes` in the mpsc queue has been consumed,
   written to the file and dropped.

   Since reading from entry would have to wait for external file I/O,
   this would give the blocking thread some time to flush `Bytes`
   out.
 - Disable unused feature fs of dep tokio

Signed-off-by: Jiahao XU <Jiahao_XU@outlook.com>
2022-12-12 03:15:30 +00:00

191 lines
5.5 KiB
Rust

use std::{
io::Write,
path::{Component, Path, PathBuf},
};
use async_zip::{read::ZipEntryReader, ZipEntryExt};
use bytes::{Bytes, BytesMut};
use futures_util::future::{try_join, TryFutureExt};
use thiserror::Error as ThisError;
use tokio::{
io::{AsyncRead, AsyncReadExt},
sync::mpsc,
};
use super::{utils::asyncify, DownloadError};
#[derive(Debug, ThisError)]
enum ZipErrorInner {
#[error(transparent)]
Inner(#[from] async_zip::error::ZipError),
#[error("Invalid file path: {0}")]
InvalidFilePath(Box<str>),
}
#[derive(Debug, ThisError)]
#[error(transparent)]
pub struct ZipError(#[from] ZipErrorInner);
impl ZipError {
pub(super) fn from_inner(err: async_zip::error::ZipError) -> Self {
Self(ZipErrorInner::Inner(err))
}
}
pub(super) async fn extract_zip_entry<R>(
entry: ZipEntryReader<'_, R>,
path: &Path,
buf: &mut BytesMut,
) -> Result<(), DownloadError>
where
R: AsyncRead + Unpin + Send + Sync,
{
// Sanitize filename
let raw_filename = entry.entry().filename();
let filename = check_filename_and_normalize(raw_filename)
.ok_or_else(|| ZipError(ZipErrorInner::InvalidFilePath(raw_filename.into())))?;
// Calculates the outpath
let outpath = path.join(filename);
// Get permissions
let mut perms = None;
#[cfg(unix)]
{
use std::{fs::Permissions, os::unix::fs::PermissionsExt};
if let Some(mode) = entry.entry().unix_permissions() {
let mode: u16 = mode;
perms = Some(Permissions::from_mode(mode as u32));
}
}
if raw_filename.ends_with('/') {
// This entry is a dir.
asyncify(move || {
std::fs::create_dir_all(&outpath)?;
if let Some(perms) = perms {
std::fs::set_permissions(&outpath, perms)?;
}
Ok(())
})
.await?;
} else {
// Use channel size = 5 to minimize the waiting time in the extraction task
let (tx, mut rx) = mpsc::channel::<Bytes>(5);
// This entry is a file.
try_join(
asyncify(move || {
if let Some(p) = outpath.parent() {
std::fs::create_dir_all(p)?;
}
let mut outfile = std::fs::File::create(&outpath)?;
while let Some(bytes) = rx.blocking_recv() {
outfile.write_all(&bytes)?;
}
outfile.flush()?;
if let Some(perms) = perms {
outfile.set_permissions(perms)?;
}
Ok(())
})
.err_into(),
copy_file_to_mpsc(entry, tx, buf)
.map_err(ZipError::from_inner)
.map_err(DownloadError::from),
)
.await?;
}
Ok(())
}
async fn copy_file_to_mpsc<R>(
mut entry: ZipEntryReader<'_, R>,
tx: mpsc::Sender<Bytes>,
buf: &mut BytesMut,
) -> Result<(), async_zip::error::ZipError>
where
R: AsyncRead + Unpin + Send + Sync,
{
// Since BytesMut does not have a max cap, if AsyncReadExt::read_buf returns
// 0 then it means Eof.
while entry.read_buf(buf).await? != 0 {
// Ensure AsyncReadExt::read_buf can read at least 4096B to avoid
// frequent expensive read syscalls.
//
// Performs this reserve before sending the buf over mpsc queue to
// increase the possibility of reusing the previous allocation.
//
// NOTE: `BytesMut` only reuses the previous allocation if it is the
// only one holds the reference to it, which is either on the first
// iteration or all the `Bytes` in the mpsc queue has been consumed,
// written to the file and dropped.
//
// Since reading from entry would have to wait for external file I/O,
// this would give the blocking thread some time to flush `Bytes`
// out.
//
// If all `Bytes` are flushed out, then we can reuse the allocation here.
buf.reserve(4096);
if tx.send(buf.split().freeze()).await.is_err() {
// Same reason as extract_with_blocking_decoder
break;
}
}
if entry.compare_crc() {
Ok(())
} else {
Err(async_zip::error::ZipError::CRC32CheckError)
}
}
/// Ensure the file path is safe to use as a [`Path`].
///
/// - It can't contain NULL bytes
/// - It can't resolve to a path outside the current directory
/// > `foo/../bar` is fine, `foo/../../bar` is not.
/// - It can't be an absolute path
///
/// It will then return a normalized path.
///
/// This will read well-formed ZIP files correctly, and is resistant
/// to path-based exploits.
///
/// This function is adapted from `zip::ZipFile::enclosed_name`.
fn check_filename_and_normalize(filename: &str) -> Option<PathBuf> {
if filename.contains('\0') {
return None;
}
let mut path = PathBuf::new();
// The following loop is adapted from
// `normalize_path::NormalizePath::normalize`.
for component in Path::new(filename).components() {
match component {
Component::Prefix(_) | Component::RootDir => return None,
Component::CurDir => (),
Component::ParentDir => {
if !path.pop() {
// `PathBuf::pop` returns false if there is no parent.
// which means the path is invalid.
return None;
}
}
Component::Normal(c) => path.push(c),
}
}
Some(path)
}