mirror of
https://github.com/cargo-bins/cargo-binstall.git
synced 2025-05-03 18:50:02 +00:00
Refactor binstalk-downloader
APIs: Remove cancellation_future
plus optimizations (#591)
- Refactor: Mv fn `utils::asyncify` into mod `utils` - Improve err msg for task failure in `utils::asyncify` - Make sure `asyncify` always returns the same annoymous type that implements `Future` if the `T` is same. - Rewrite `extract_bin` to avoid `block_in_place` support cancellation by dropping - Rm unused dep scopeguard from binstalk-downloader - Rewrite `extract_tar_based_stream` so that it is cancellable by dropping - Unbox `extract_future` in `async_extracter::extract_zip` - Refactor `Download` API: Remove `CancellationFuture` as param since all futures returned by `Download::and_*` does not call `block_in_place`, so they can be cancelled by drop instead of using this cumbersome hack. - Fix exports from mod `async_tar_visitor` - Make `signal::{ignore_signals, wait_on_cancellation_signal}` private - Rm the global variable `CANCELLED` in `wait_on_cancellation_signal` and rm fn `wait_on_cancellation_signal_inner` - Optimize `wait_on_cancellation_signal`: Avoid `tokio::select!` on `not(unix)` - Rm unnecessary `tokio::select!` in `wait_on_cancellation_signal` on unix Since `unix::wait_on_cancellation_signal_unix` already waits for ctrl + c signal. - Optimize `extract_bin`: Send `Bytes` to blocking thread for zero-copy - Optimize `extract_with_blocking_decoder`: Avoid dup monomorphization - Box fut of `fetch_crate_cratesio` in `PackageInfo::resolve` - Optimize `extract_zip_entry`: Spawn only one blocking task per fn call by using a mspc queue for the data to be written to the `outfile`. This would improve efficiency as using `tokio::fs::File` is expensive: It spawns a new blocking task, which needs one heap allocation and then pushed to a mpmc queue, and then wait for it to be done on every loop. This also fix a race condition where the unix permission is set before the whole file is written, which might be used by attackers. - Optimize `extract_zip`: Use one `BytesMut` for entire extraction process To avoid frequent allocation and deallocation. - Optimize `extract_zip_entry`: Inc prob of reusing alloc in `BytesMut` Performs the reserve before sending the buf over mpsc queue to increase the possibility of reusing the previous allocation. NOTE: `BytesMut` only reuses the previous allocation if it is the only one holds the reference to it, which is either on the first allocation or all the `Bytes` in the mpsc queue has been consumed, written to the file and dropped. Since reading from entry would have to wait for external file I/O, this would give the blocking thread some time to flush `Bytes` out. - Disable unused feature fs of dep tokio Signed-off-by: Jiahao XU <Jiahao_XU@outlook.com>
This commit is contained in:
parent
058208bae9
commit
db45f2fb7f
13 changed files with 234 additions and 263 deletions
|
@ -1,96 +1,137 @@
|
|||
use std::{fs, path::Path};
|
||||
use std::{
|
||||
fs,
|
||||
future::Future,
|
||||
io::{self, Write},
|
||||
path::Path,
|
||||
};
|
||||
|
||||
use async_zip::read::stream::ZipFileReader;
|
||||
use bytes::Bytes;
|
||||
use futures_util::stream::Stream;
|
||||
use scopeguard::{guard, ScopeGuard};
|
||||
use tokio::task::block_in_place;
|
||||
use bytes::{Bytes, BytesMut};
|
||||
use futures_util::{
|
||||
future::try_join,
|
||||
stream::{Stream, StreamExt},
|
||||
};
|
||||
use tokio::sync::mpsc;
|
||||
use tokio_util::io::StreamReader;
|
||||
use tracing::debug;
|
||||
|
||||
use super::{
|
||||
await_on_option, extracter::*, stream_readable::StreamReadable,
|
||||
zip_extraction::extract_zip_entry, CancellationFuture, DownloadError, TarBasedFmt, ZipError,
|
||||
extracter::*, stream_readable::StreamReadable, utils::asyncify,
|
||||
zip_extraction::extract_zip_entry, DownloadError, TarBasedFmt, ZipError,
|
||||
};
|
||||
|
||||
pub async fn extract_bin<S>(
|
||||
stream: S,
|
||||
path: &Path,
|
||||
cancellation_future: CancellationFuture,
|
||||
) -> Result<(), DownloadError>
|
||||
pub async fn extract_bin<S>(stream: S, path: &Path) -> Result<(), DownloadError>
|
||||
where
|
||||
S: Stream<Item = Result<Bytes, DownloadError>> + Unpin + 'static,
|
||||
S: Stream<Item = Result<Bytes, DownloadError>> + Send + Sync + Unpin + 'static,
|
||||
{
|
||||
let mut reader = StreamReadable::new(stream, cancellation_future).await;
|
||||
block_in_place(move || {
|
||||
fs::create_dir_all(path.parent().unwrap())?;
|
||||
debug!("Writing to `{}`", path.display());
|
||||
|
||||
extract_with_blocking_decoder(stream, path, |mut rx, path| {
|
||||
let mut file = fs::File::create(path)?;
|
||||
|
||||
// remove it unless the operation isn't aborted and no write
|
||||
// fails.
|
||||
let remove_guard = guard(&path, |path| {
|
||||
fs::remove_file(path).ok();
|
||||
});
|
||||
while let Some(bytes) = rx.blocking_recv() {
|
||||
file.write_all(&bytes)?;
|
||||
}
|
||||
|
||||
reader.copy(&mut file)?;
|
||||
|
||||
// Operation isn't aborted and all writes succeed,
|
||||
// disarm the remove_guard.
|
||||
ScopeGuard::into_inner(remove_guard);
|
||||
|
||||
Ok(())
|
||||
file.flush()
|
||||
})
|
||||
.await
|
||||
}
|
||||
|
||||
pub async fn extract_zip<S>(
|
||||
stream: S,
|
||||
path: &Path,
|
||||
cancellation_future: CancellationFuture,
|
||||
) -> Result<(), DownloadError>
|
||||
pub async fn extract_zip<S>(stream: S, path: &Path) -> Result<(), DownloadError>
|
||||
where
|
||||
S: Stream<Item = Result<Bytes, DownloadError>> + Unpin + Send + Sync + 'static,
|
||||
{
|
||||
debug!("Decompressing from zip archive to `{}`", path.display());
|
||||
|
||||
let extract_future = Box::pin(async move {
|
||||
let reader = StreamReader::new(stream);
|
||||
let mut zip = ZipFileReader::new(reader);
|
||||
let reader = StreamReader::new(stream);
|
||||
let mut zip = ZipFileReader::new(reader);
|
||||
let mut buf = BytesMut::with_capacity(4 * 4096);
|
||||
|
||||
while let Some(entry) = zip.entry_reader().await.map_err(ZipError::from_inner)? {
|
||||
extract_zip_entry(entry, path).await?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
});
|
||||
|
||||
tokio::select! {
|
||||
biased;
|
||||
|
||||
res = await_on_option(cancellation_future) => {
|
||||
Err(res.err().map(DownloadError::from).unwrap_or(DownloadError::UserAbort))
|
||||
}
|
||||
res = extract_future => res,
|
||||
while let Some(entry) = zip.entry_reader().await.map_err(ZipError::from_inner)? {
|
||||
extract_zip_entry(entry, path, &mut buf).await?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn extract_tar_based_stream<S>(
|
||||
stream: S,
|
||||
path: &Path,
|
||||
fmt: TarBasedFmt,
|
||||
cancellation_future: CancellationFuture,
|
||||
) -> Result<(), DownloadError>
|
||||
where
|
||||
S: Stream<Item = Result<Bytes, DownloadError>> + Unpin + 'static,
|
||||
S: Stream<Item = Result<Bytes, DownloadError>> + Send + Sync + Unpin + 'static,
|
||||
{
|
||||
let reader = StreamReadable::new(stream, cancellation_future).await;
|
||||
block_in_place(move || {
|
||||
fs::create_dir_all(path.parent().unwrap())?;
|
||||
debug!("Extracting from {fmt} archive to {path:#?}");
|
||||
|
||||
debug!("Extracting from {fmt} archive to {path:#?}");
|
||||
extract_with_blocking_decoder(stream, path, move |rx, path| {
|
||||
create_tar_decoder(StreamReadable::new(rx), fmt)?.unpack(path)
|
||||
})
|
||||
.await
|
||||
}
|
||||
|
||||
create_tar_decoder(reader, fmt)?.unpack(path)?;
|
||||
async fn extract_with_blocking_decoder<S, F>(
|
||||
stream: S,
|
||||
path: &Path,
|
||||
f: F,
|
||||
) -> Result<(), DownloadError>
|
||||
where
|
||||
S: Stream<Item = Result<Bytes, DownloadError>> + Send + Sync + Unpin + 'static,
|
||||
F: FnOnce(mpsc::Receiver<Bytes>, &Path) -> io::Result<()> + Send + Sync + 'static,
|
||||
{
|
||||
async fn inner<S, Fut>(
|
||||
mut stream: S,
|
||||
task: Fut,
|
||||
tx: mpsc::Sender<Bytes>,
|
||||
) -> Result<(), DownloadError>
|
||||
where
|
||||
// We do not use trait object for S since there will only be one
|
||||
// S used with this function.
|
||||
S: Stream<Item = Result<Bytes, DownloadError>> + Send + Sync + Unpin + 'static,
|
||||
// asyncify would always return the same future, so no need to
|
||||
// use trait object here.
|
||||
Fut: Future<Output = io::Result<()>> + Send + Sync,
|
||||
{
|
||||
try_join(
|
||||
async move {
|
||||
while let Some(bytes) = stream.next().await.transpose()? {
|
||||
if tx.send(bytes).await.is_err() {
|
||||
// The extract tar returns, which could be that:
|
||||
// - Extraction fails with an error
|
||||
// - Extraction success without the rest of the data
|
||||
//
|
||||
//
|
||||
// It's hard to tell the difference here, so we assume
|
||||
// the first scienario occurs.
|
||||
//
|
||||
// Even if the second scienario occurs, it won't affect the
|
||||
// extraction process anyway, so we can jsut ignore it.
|
||||
return Ok(());
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
},
|
||||
task,
|
||||
)
|
||||
.await?;
|
||||
|
||||
Ok(())
|
||||
})
|
||||
}
|
||||
|
||||
// Use channel size = 5 to minimize the waiting time in the extraction task
|
||||
let (tx, rx) = mpsc::channel(5);
|
||||
|
||||
let path = path.to_owned();
|
||||
|
||||
let task = asyncify(move || {
|
||||
if let Some(parent) = path.parent() {
|
||||
fs::create_dir_all(parent)?;
|
||||
}
|
||||
|
||||
f(rx, &path)
|
||||
});
|
||||
|
||||
inner(stream, task, tx).await
|
||||
}
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue