diff --git a/Cargo.lock b/Cargo.lock index ab2949c6..12580c12 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -199,7 +199,6 @@ dependencies = [ "generic-array", "httpdate", "reqwest", - "scopeguard", "tempfile", "thiserror", "tokio", diff --git a/crates/binstalk-downloader/Cargo.toml b/crates/binstalk-downloader/Cargo.toml index 5f5aab2e..67afa703 100644 --- a/crates/binstalk-downloader/Cargo.toml +++ b/crates/binstalk-downloader/Cargo.toml @@ -22,7 +22,6 @@ futures-util = { version = "0.3.25", default-features = false, features = ["std" generic-array = "0.14.6" httpdate = "1.0.2" reqwest = { version = "0.11.13", features = ["stream", "gzip", "brotli", "deflate"], default-features = false } -scopeguard = "1.1.0" # Use a fork here since we need PAX support, but the upstream # does not hav the PR merged yet. # diff --git a/crates/binstalk-downloader/src/download.rs b/crates/binstalk-downloader/src/download.rs index a3a9469a..1636577f 100644 --- a/crates/binstalk-downloader/src/download.rs +++ b/crates/binstalk-downloader/src/download.rs @@ -1,4 +1,4 @@ -use std::{fmt::Debug, future::Future, io, marker::PhantomData, path::Path, pin::Pin}; +use std::{fmt::Debug, io, marker::PhantomData, path::Path}; use binstalk_types::cargo_toml_binstall::{PkgFmtDecomposed, TarBasedFmt}; use digest::{Digest, FixedOutput, HashMarker, Output, OutputSizeUser, Update}; @@ -14,7 +14,8 @@ mod async_extracter; use async_extracter::*; mod async_tar_visitor; -pub use async_tar_visitor::*; +use async_tar_visitor::extract_tar_based_stream_and_visit; +pub use async_tar_visitor::{TarEntriesVisitor, TarEntry, TarEntryType}; mod extracter; mod stream_readable; @@ -23,9 +24,6 @@ mod zip_extraction; pub use zip_extraction::ZipError; mod utils; -use utils::await_on_option; - -pub type CancellationFuture = Option> + Send>>>; #[derive(Debug, ThisError)] pub enum DownloadError { @@ -102,12 +100,11 @@ impl Download { /// /// NOTE that this API does not support gnu extension sparse file unlike /// [`Download::and_extract`]. - #[instrument(skip(visitor, cancellation_future))] + #[instrument(skip(visitor))] pub async fn and_visit_tar( self, fmt: TarBasedFmt, visitor: V, - cancellation_future: CancellationFuture, ) -> Result { let stream = self .client @@ -117,14 +114,7 @@ impl Download { debug!("Downloading and extracting then in-memory processing"); - let ret = tokio::select! { - biased; - - res = await_on_option(cancellation_future) => { - Err(res.err().unwrap_or_else(|| io::Error::from(DownloadError::UserAbort)))? - } - res = extract_tar_based_stream_and_visit(stream, fmt, visitor) => res?, - }; + let ret = extract_tar_based_stream_and_visit(stream, fmt, visitor).await?; debug!("Download, extraction and in-memory procession OK"); @@ -135,19 +125,13 @@ impl Download { /// /// `cancellation_future` can be used to cancel the extraction and return /// [`DownloadError::UserAbort`] error. - #[instrument(skip(path, cancellation_future))] + #[instrument(skip(path))] pub async fn and_extract( self, fmt: PkgFmt, path: impl AsRef, - cancellation_future: CancellationFuture, ) -> Result<(), DownloadError> { - async fn inner( - this: Download, - fmt: PkgFmt, - path: &Path, - cancellation_future: CancellationFuture, - ) -> Result<(), DownloadError> { + async fn inner(this: Download, fmt: PkgFmt, path: &Path) -> Result<(), DownloadError> { let stream = this .client .get_stream(this.url) @@ -157,11 +141,9 @@ impl Download { debug!("Downloading and extracting to: '{}'", path.display()); match fmt.decompose() { - PkgFmtDecomposed::Tar(fmt) => { - extract_tar_based_stream(stream, path, fmt, cancellation_future).await? - } - PkgFmtDecomposed::Bin => extract_bin(stream, path, cancellation_future).await?, - PkgFmtDecomposed::Zip => extract_zip(stream, path, cancellation_future).await?, + PkgFmtDecomposed::Tar(fmt) => extract_tar_based_stream(stream, path, fmt).await?, + PkgFmtDecomposed::Bin => extract_bin(stream, path).await?, + PkgFmtDecomposed::Zip => extract_zip(stream, path).await?, } debug!("Download OK, extracted to: '{}'", path.display()); @@ -169,7 +151,7 @@ impl Download { Ok(()) } - inner(self, fmt, path.as_ref(), cancellation_future).await + inner(self, fmt, path.as_ref()).await } } diff --git a/crates/binstalk-downloader/src/download/async_extracter.rs b/crates/binstalk-downloader/src/download/async_extracter.rs index 5034374b..4850771e 100644 --- a/crates/binstalk-downloader/src/download/async_extracter.rs +++ b/crates/binstalk-downloader/src/download/async_extracter.rs @@ -1,96 +1,137 @@ -use std::{fs, path::Path}; +use std::{ + fs, + future::Future, + io::{self, Write}, + path::Path, +}; use async_zip::read::stream::ZipFileReader; -use bytes::Bytes; -use futures_util::stream::Stream; -use scopeguard::{guard, ScopeGuard}; -use tokio::task::block_in_place; +use bytes::{Bytes, BytesMut}; +use futures_util::{ + future::try_join, + stream::{Stream, StreamExt}, +}; +use tokio::sync::mpsc; use tokio_util::io::StreamReader; use tracing::debug; use super::{ - await_on_option, extracter::*, stream_readable::StreamReadable, - zip_extraction::extract_zip_entry, CancellationFuture, DownloadError, TarBasedFmt, ZipError, + extracter::*, stream_readable::StreamReadable, utils::asyncify, + zip_extraction::extract_zip_entry, DownloadError, TarBasedFmt, ZipError, }; -pub async fn extract_bin( - stream: S, - path: &Path, - cancellation_future: CancellationFuture, -) -> Result<(), DownloadError> +pub async fn extract_bin(stream: S, path: &Path) -> Result<(), DownloadError> where - S: Stream> + Unpin + 'static, + S: Stream> + Send + Sync + Unpin + 'static, { - let mut reader = StreamReadable::new(stream, cancellation_future).await; - block_in_place(move || { - fs::create_dir_all(path.parent().unwrap())?; + debug!("Writing to `{}`", path.display()); + extract_with_blocking_decoder(stream, path, |mut rx, path| { let mut file = fs::File::create(path)?; - // remove it unless the operation isn't aborted and no write - // fails. - let remove_guard = guard(&path, |path| { - fs::remove_file(path).ok(); - }); + while let Some(bytes) = rx.blocking_recv() { + file.write_all(&bytes)?; + } - reader.copy(&mut file)?; - - // Operation isn't aborted and all writes succeed, - // disarm the remove_guard. - ScopeGuard::into_inner(remove_guard); - - Ok(()) + file.flush() }) + .await } -pub async fn extract_zip( - stream: S, - path: &Path, - cancellation_future: CancellationFuture, -) -> Result<(), DownloadError> +pub async fn extract_zip(stream: S, path: &Path) -> Result<(), DownloadError> where S: Stream> + Unpin + Send + Sync + 'static, { debug!("Decompressing from zip archive to `{}`", path.display()); - let extract_future = Box::pin(async move { - let reader = StreamReader::new(stream); - let mut zip = ZipFileReader::new(reader); + let reader = StreamReader::new(stream); + let mut zip = ZipFileReader::new(reader); + let mut buf = BytesMut::with_capacity(4 * 4096); - while let Some(entry) = zip.entry_reader().await.map_err(ZipError::from_inner)? { - extract_zip_entry(entry, path).await?; - } - - Ok(()) - }); - - tokio::select! { - biased; - - res = await_on_option(cancellation_future) => { - Err(res.err().map(DownloadError::from).unwrap_or(DownloadError::UserAbort)) - } - res = extract_future => res, + while let Some(entry) = zip.entry_reader().await.map_err(ZipError::from_inner)? { + extract_zip_entry(entry, path, &mut buf).await?; } + + Ok(()) } pub async fn extract_tar_based_stream( stream: S, path: &Path, fmt: TarBasedFmt, - cancellation_future: CancellationFuture, ) -> Result<(), DownloadError> where - S: Stream> + Unpin + 'static, + S: Stream> + Send + Sync + Unpin + 'static, { - let reader = StreamReadable::new(stream, cancellation_future).await; - block_in_place(move || { - fs::create_dir_all(path.parent().unwrap())?; + debug!("Extracting from {fmt} archive to {path:#?}"); - debug!("Extracting from {fmt} archive to {path:#?}"); + extract_with_blocking_decoder(stream, path, move |rx, path| { + create_tar_decoder(StreamReadable::new(rx), fmt)?.unpack(path) + }) + .await +} - create_tar_decoder(reader, fmt)?.unpack(path)?; +async fn extract_with_blocking_decoder( + stream: S, + path: &Path, + f: F, +) -> Result<(), DownloadError> +where + S: Stream> + Send + Sync + Unpin + 'static, + F: FnOnce(mpsc::Receiver, &Path) -> io::Result<()> + Send + Sync + 'static, +{ + async fn inner( + mut stream: S, + task: Fut, + tx: mpsc::Sender, + ) -> Result<(), DownloadError> + where + // We do not use trait object for S since there will only be one + // S used with this function. + S: Stream> + Send + Sync + Unpin + 'static, + // asyncify would always return the same future, so no need to + // use trait object here. + Fut: Future> + Send + Sync, + { + try_join( + async move { + while let Some(bytes) = stream.next().await.transpose()? { + if tx.send(bytes).await.is_err() { + // The extract tar returns, which could be that: + // - Extraction fails with an error + // - Extraction success without the rest of the data + // + // + // It's hard to tell the difference here, so we assume + // the first scienario occurs. + // + // Even if the second scienario occurs, it won't affect the + // extraction process anyway, so we can jsut ignore it. + return Ok(()); + } + } + + Ok(()) + }, + task, + ) + .await?; Ok(()) - }) + } + + // Use channel size = 5 to minimize the waiting time in the extraction task + let (tx, rx) = mpsc::channel(5); + + let path = path.to_owned(); + + let task = asyncify(move || { + if let Some(parent) = path.parent() { + fs::create_dir_all(parent)?; + } + + f(rx, &path) + }); + + inner(stream, task, tx).await } diff --git a/crates/binstalk-downloader/src/download/async_tar_visitor.rs b/crates/binstalk-downloader/src/download/async_tar_visitor.rs index d33ba2ef..bab6d0eb 100644 --- a/crates/binstalk-downloader/src/download/async_tar_visitor.rs +++ b/crates/binstalk-downloader/src/download/async_tar_visitor.rs @@ -91,7 +91,7 @@ pub trait TarEntriesVisitor: Send + Sync { fn finish(self) -> Result; } -pub async fn extract_tar_based_stream_and_visit( +pub(crate) async fn extract_tar_based_stream_and_visit( stream: S, fmt: TarBasedFmt, mut visitor: V, diff --git a/crates/binstalk-downloader/src/download/stream_readable.rs b/crates/binstalk-downloader/src/download/stream_readable.rs index dee2f379..f1ec5a27 100644 --- a/crates/binstalk-downloader/src/download/stream_readable.rs +++ b/crates/binstalk-downloader/src/download/stream_readable.rs @@ -1,74 +1,28 @@ -use std::{ - cmp::min, - io::{self, BufRead, Read, Write}, -}; +use std::io::{self, BufRead, Read}; use bytes::{Buf, Bytes}; -use futures_util::stream::{Stream, StreamExt}; -use tokio::runtime::Handle; - -use super::{await_on_option, CancellationFuture, DownloadError}; +use tokio::sync::mpsc; /// This wraps an AsyncIterator as a `Read`able. /// It must be used in non-async context only, /// meaning you have to use it with /// `tokio::task::{block_in_place, spawn_blocking}` or /// `std::thread::spawn`. -pub struct StreamReadable { - stream: S, - handle: Handle, +pub struct StreamReadable { + rx: mpsc::Receiver, bytes: Bytes, - cancellation_future: CancellationFuture, } -impl StreamReadable { - pub(super) async fn new(stream: S, cancellation_future: CancellationFuture) -> Self { +impl StreamReadable { + pub(super) fn new(rx: mpsc::Receiver) -> Self { Self { - stream, - handle: Handle::current(), + rx, bytes: Bytes::new(), - cancellation_future, } } } -impl StreamReadable -where - S: Stream> + Unpin, - DownloadError: From, -{ - /// Copies from `self` to `writer`. - /// - /// Same as `io::copy` but does not allocate any internal buffer - /// since `self` is buffered. - pub(super) fn copy(&mut self, mut writer: W) -> io::Result<()> - where - W: Write, - { - self.copy_inner(&mut writer) - } - - fn copy_inner(&mut self, writer: &mut dyn Write) -> io::Result<()> { - loop { - let buf = self.fill_buf()?; - if buf.is_empty() { - // Eof - break Ok(()); - } - - writer.write_all(buf)?; - - let n = buf.len(); - self.consume(n); - } - } -} - -impl Read for StreamReadable -where - S: Stream> + Unpin, - DownloadError: From, -{ +impl Read for StreamReadable { fn read(&mut self, buf: &mut [u8]) -> io::Result { if buf.is_empty() { return Ok(0); @@ -82,60 +36,26 @@ where // copy_to_slice requires the bytes to have enough remaining bytes // to fill buf. - let n = min(buf.len(), bytes.remaining()); + let n = buf.len().min(bytes.remaining()); + // ::copy_to_slice copies and consumes the bytes bytes.copy_to_slice(&mut buf[..n]); Ok(n) } } -/// If `Ok(Some(bytes))` if returned, then `bytes.is_empty() == false`. -async fn next_stream(stream: &mut S) -> io::Result> -where - S: Stream> + Unpin, - DownloadError: From, -{ - loop { - let option = stream - .next() - .await - .transpose() - .map_err(DownloadError::from)?; - - match option { - Some(bytes) if bytes.is_empty() => continue, - option => break Ok(option), - } - } -} - -impl BufRead for StreamReadable -where - S: Stream> + Unpin, - DownloadError: From, -{ +impl BufRead for StreamReadable { fn fill_buf(&mut self) -> io::Result<&[u8]> { let bytes = &mut self.bytes; if !bytes.has_remaining() { - let option = self.handle.block_on(async { - let cancellation_future = self.cancellation_future.as_mut(); - tokio::select! { - biased; - - res = await_on_option(cancellation_future) => { - Err(res.err().unwrap_or_else(|| io::Error::from(DownloadError::UserAbort))) - }, - res = next_stream(&mut self.stream) => res, - } - })?; - - if let Some(new_bytes) = option { + if let Some(new_bytes) = self.rx.blocking_recv() { // new_bytes are guaranteed to be non-empty. *bytes = new_bytes; } } + Ok(&*bytes) } diff --git a/crates/binstalk-downloader/src/download/utils.rs b/crates/binstalk-downloader/src/download/utils.rs index 94ad39cf..9fde5239 100644 --- a/crates/binstalk-downloader/src/download/utils.rs +++ b/crates/binstalk-downloader/src/download/utils.rs @@ -1,16 +1,22 @@ -use std::future::{pending, Future}; +use std::{future::Future, io}; -/// Await on `future` if it is not `None`, or call [`pending`] -/// so that this branch would never get selected again. -/// -/// Designed to use with [`tokio::select`]. -pub(super) async fn await_on_option(future: Option) -> R +use tokio::task; + +/// Copied from tokio https://docs.rs/tokio/latest/src/tokio/fs/mod.rs.html#132 +pub(super) fn asyncify(f: F) -> impl Future> + Send + Sync + 'static where - Fut: Future, + F: FnOnce() -> io::Result + Send + 'static, + T: Send + 'static, { - if let Some(future) = future { - future.await - } else { - pending().await + async fn inner(handle: task::JoinHandle>) -> io::Result { + match handle.await { + Ok(res) => res, + Err(err) => Err(io::Error::new( + io::ErrorKind::Other, + format!("background task failed: {err}"), + )), + } } + + inner(task::spawn_blocking(f)) } diff --git a/crates/binstalk-downloader/src/download/zip_extraction.rs b/crates/binstalk-downloader/src/download/zip_extraction.rs index ca3b89f3..89e4f2ab 100644 --- a/crates/binstalk-downloader/src/download/zip_extraction.rs +++ b/crates/binstalk-downloader/src/download/zip_extraction.rs @@ -1,13 +1,18 @@ use std::{ - io, + io::Write, path::{Component, Path, PathBuf}, }; use async_zip::{read::ZipEntryReader, ZipEntryExt}; +use bytes::{Bytes, BytesMut}; +use futures_util::future::{try_join, TryFutureExt}; use thiserror::Error as ThisError; -use tokio::{fs, io::AsyncRead, task::spawn_blocking}; +use tokio::{ + io::{AsyncRead, AsyncReadExt}, + sync::mpsc, +}; -use super::DownloadError; +use super::{utils::asyncify, DownloadError}; #[derive(Debug, ThisError)] enum ZipErrorInner { @@ -31,6 +36,7 @@ impl ZipError { pub(super) async fn extract_zip_entry( entry: ZipEntryReader<'_, R>, path: &Path, + buf: &mut BytesMut, ) -> Result<(), DownloadError> where R: AsyncRead + Unpin + Send + Sync, @@ -68,31 +74,82 @@ where }) .await?; } else { + // Use channel size = 5 to minimize the waiting time in the extraction task + let (tx, mut rx) = mpsc::channel::(5); + // This entry is a file. - let mut outfile = asyncify(move || { - if let Some(p) = outpath.parent() { - std::fs::create_dir_all(p)?; - } - let outfile = std::fs::File::create(&outpath)?; + try_join( + asyncify(move || { + if let Some(p) = outpath.parent() { + std::fs::create_dir_all(p)?; + } + let mut outfile = std::fs::File::create(&outpath)?; - if let Some(perms) = perms { - outfile.set_permissions(perms)?; - } + while let Some(bytes) = rx.blocking_recv() { + outfile.write_all(&bytes)?; + } - Ok(outfile) - }) - .await - .map(fs::File::from_std)?; + outfile.flush()?; - entry - .copy_to_end_crc(&mut outfile, 64 * 1024) - .await - .map_err(ZipError::from_inner)?; + if let Some(perms) = perms { + outfile.set_permissions(perms)?; + } + + Ok(()) + }) + .err_into(), + copy_file_to_mpsc(entry, tx, buf) + .map_err(ZipError::from_inner) + .map_err(DownloadError::from), + ) + .await?; } Ok(()) } +async fn copy_file_to_mpsc( + mut entry: ZipEntryReader<'_, R>, + tx: mpsc::Sender, + buf: &mut BytesMut, +) -> Result<(), async_zip::error::ZipError> +where + R: AsyncRead + Unpin + Send + Sync, +{ + // Since BytesMut does not have a max cap, if AsyncReadExt::read_buf returns + // 0 then it means Eof. + while entry.read_buf(buf).await? != 0 { + // Ensure AsyncReadExt::read_buf can read at least 4096B to avoid + // frequent expensive read syscalls. + // + // Performs this reserve before sending the buf over mpsc queue to + // increase the possibility of reusing the previous allocation. + // + // NOTE: `BytesMut` only reuses the previous allocation if it is the + // only one holds the reference to it, which is either on the first + // iteration or all the `Bytes` in the mpsc queue has been consumed, + // written to the file and dropped. + // + // Since reading from entry would have to wait for external file I/O, + // this would give the blocking thread some time to flush `Bytes` + // out. + // + // If all `Bytes` are flushed out, then we can reuse the allocation here. + buf.reserve(4096); + + if tx.send(buf.split().freeze()).await.is_err() { + // Same reason as extract_with_blocking_decoder + break; + } + } + + if entry.compare_crc() { + Ok(()) + } else { + Err(async_zip::error::ZipError::CRC32CheckError) + } +} + /// Ensure the file path is safe to use as a [`Path`]. /// /// - It can't contain NULL bytes @@ -132,18 +189,3 @@ fn check_filename_and_normalize(filename: &str) -> Option { Some(path) } - -/// Copied from tokio https://docs.rs/tokio/latest/src/tokio/fs/mod.rs.html#132 -async fn asyncify(f: F) -> io::Result -where - F: FnOnce() -> io::Result + Send + 'static, - T: Send + 'static, -{ - match spawn_blocking(f).await { - Ok(res) => res, - Err(_) => Err(io::Error::new( - io::ErrorKind::Other, - "background task failed", - )), - } -} diff --git a/crates/binstalk/src/drivers/crates_io.rs b/crates/binstalk/src/drivers/crates_io.rs index ec4179bd..402fca46 100644 --- a/crates/binstalk/src/drivers/crates_io.rs +++ b/crates/binstalk/src/drivers/crates_io.rs @@ -10,7 +10,6 @@ use crate::{ helpers::{ download::Download, remote::{Client, Url}, - signal::wait_on_cancellation_signal, }, manifests::cargo_toml_binstall::{Meta, TarBasedFmt}, }; @@ -54,10 +53,6 @@ pub async fn fetch_crate_cratesio( let manifest_dir_path: PathBuf = format!("{name}-{version_name}").into(); Ok(Download::new(client, Url::parse(&crate_url)?) - .and_visit_tar( - TarBasedFmt::Tgz, - ManifestVisitor::new(manifest_dir_path), - Some(Box::pin(wait_on_cancellation_signal())), - ) + .and_visit_tar(TarBasedFmt::Tgz, ManifestVisitor::new(manifest_dir_path)) .await?) } diff --git a/crates/binstalk/src/fetchers/gh_crate_meta.rs b/crates/binstalk/src/fetchers/gh_crate_meta.rs index 15defd2e..5e865df2 100644 --- a/crates/binstalk/src/fetchers/gh_crate_meta.rs +++ b/crates/binstalk/src/fetchers/gh_crate_meta.rs @@ -15,7 +15,6 @@ use crate::{ helpers::{ download::Download, remote::{Client, Method}, - signal::wait_on_cancellation_signal, tasks::AutoAbortJoinHandle, }, manifests::cargo_toml_binstall::{PkgFmt, PkgMeta}, @@ -167,7 +166,7 @@ impl super::Fetcher for GhCrateMeta { let (url, pkg_fmt) = self.resolution.get().unwrap(); // find() is called first debug!("Downloading package from: '{url}' dst:{dst:?} fmt:{pkg_fmt:?}"); Ok(Download::new(self.client.clone(), url.clone()) - .and_extract(*pkg_fmt, dst, Some(Box::pin(wait_on_cancellation_signal()))) + .and_extract(*pkg_fmt, dst) .await?) } diff --git a/crates/binstalk/src/fetchers/quickinstall.rs b/crates/binstalk/src/fetchers/quickinstall.rs index d49847e3..90daef53 100644 --- a/crates/binstalk/src/fetchers/quickinstall.rs +++ b/crates/binstalk/src/fetchers/quickinstall.rs @@ -9,7 +9,6 @@ use crate::{ helpers::{ download::Download, remote::{Client, Method}, - signal::wait_on_cancellation_signal, tasks::AutoAbortJoinHandle, }, manifests::cargo_toml_binstall::{PkgFmt, PkgMeta}, @@ -72,11 +71,7 @@ impl super::Fetcher for QuickInstall { let url = self.package_url(); debug!("Downloading package from: '{url}'"); Ok(Download::new(self.client.clone(), Url::parse(&url)?) - .and_extract( - self.pkg_fmt(), - dst, - Some(Box::pin(wait_on_cancellation_signal())), - ) + .and_extract(self.pkg_fmt(), dst) .await?) } diff --git a/crates/binstalk/src/helpers/signal.rs b/crates/binstalk/src/helpers/signal.rs index 5b6b7465..36a25f6e 100644 --- a/crates/binstalk/src/helpers/signal.rs +++ b/crates/binstalk/src/helpers/signal.rs @@ -3,7 +3,7 @@ use std::{future::pending, io}; use super::tasks::AutoAbortJoinHandle; use crate::errors::BinstallError; -use tokio::{signal, sync::OnceCell}; +use tokio::signal; /// This function will poll the handle while listening for ctrl_c, /// `SIGINT`, `SIGHUP`, `SIGTERM` and `SIGQUIT`. @@ -30,7 +30,7 @@ pub async fn cancel_on_user_sig_term( } } -pub fn ignore_signals() -> io::Result<()> { +fn ignore_signals() -> io::Result<()> { #[cfg(unix)] unix::ignore_signals_on_unix()?; @@ -39,16 +39,7 @@ pub fn ignore_signals() -> io::Result<()> { /// If call to it returns `Ok(())`, then all calls to this function after /// that also returns `Ok(())`. -pub async fn wait_on_cancellation_signal() -> Result<(), io::Error> { - static CANCELLED: OnceCell<()> = OnceCell::const_new(); - - CANCELLED - .get_or_try_init(wait_on_cancellation_signal_inner) - .await - .copied() -} - -async fn wait_on_cancellation_signal_inner() -> Result<(), io::Error> { +async fn wait_on_cancellation_signal() -> Result<(), io::Error> { #[cfg(unix)] async fn inner() -> Result<(), io::Error> { unix::wait_on_cancellation_signal_unix().await @@ -56,16 +47,10 @@ async fn wait_on_cancellation_signal_inner() -> Result<(), io::Error> { #[cfg(not(unix))] async fn inner() -> Result<(), io::Error> { - // Use pending here so that tokio::select! would just skip this branch. - pending().await + signal::ctrl_c().await } - tokio::select! { - biased; - - res = signal::ctrl_c() => res, - res = inner() => res, - } + inner().await } #[cfg(unix)] diff --git a/crates/binstalk/src/ops/resolve.rs b/crates/binstalk/src/ops/resolve.rs index b5153bf3..306a562a 100644 --- a/crates/binstalk/src/ops/resolve.rs +++ b/crates/binstalk/src/ops/resolve.rs @@ -398,7 +398,15 @@ impl PackageInfo { // Fetch crate via crates.io, git, or use a local manifest path let manifest = match opts.manifest_path.as_ref() { Some(manifest_path) => load_manifest_path(manifest_path)?, - None => fetch_crate_cratesio(client, crates_io_api_client, &name, &version_req).await?, + None => { + Box::pin(fetch_crate_cratesio( + client, + crates_io_api_client, + &name, + &version_req, + )) + .await? + } }; let Some(mut package) = manifest.package else {