diff --git a/Cargo.lock b/Cargo.lock index d970cdfb..306b54da 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -275,7 +275,6 @@ dependencies = [ "bytes", "bzip2", "compact_str", - "digest", "flate2", "futures-lite", "futures-util", diff --git a/crates/binstalk-downloader/Cargo.toml b/crates/binstalk-downloader/Cargo.toml index 25eb22e4..9fe0372d 100644 --- a/crates/binstalk-downloader/Cargo.toml +++ b/crates/binstalk-downloader/Cargo.toml @@ -17,7 +17,6 @@ binstalk-types = { version = "0.5.0", path = "../binstalk-types" } bytes = "1.4.0" bzip2 = "0.4.4" compact_str = "0.7.0" -digest = "0.10.7" flate2 = { version = "1.0.26", default-features = false } futures-lite = { version = "1.13.0", default-features = false } futures-util = "0.3.28" diff --git a/crates/binstalk-downloader/src/download.rs b/crates/binstalk-downloader/src/download.rs index ae3138d2..e70095b9 100644 --- a/crates/binstalk-downloader/src/download.rs +++ b/crates/binstalk-downloader/src/download.rs @@ -1,8 +1,8 @@ -use std::{fmt::Debug, io, marker::PhantomData, path::Path}; +use std::{fmt, io, marker::PhantomData, path::Path}; use binstalk_types::cargo_toml_binstall::PkgFmtDecomposed; -use digest::{Digest, FixedOutput, HashMarker, Output, OutputSizeUser, Update}; -use futures_lite::stream::StreamExt; +use bytes::Bytes; +use futures_lite::stream::{Stream, StreamExt}; use thiserror::Error as ThisError; use tracing::{debug, instrument}; @@ -70,24 +70,95 @@ impl From for io::Error { } } -#[derive(Debug)] -pub struct Download { - client: Client, - url: Url, - _digest: PhantomData, - _checksum: Vec, +pub trait DataVerifier: Send + Sync { + /// Digest input data. + /// + /// This method can be called repeatedly for use with streaming messages, + /// it will be called in the order of the message received. + fn update(&mut self, data: &Bytes); } -impl Download { +impl DataVerifier for T +where + T: FnMut(&Bytes) + Send + Sync, +{ + fn update(&mut self, data: &Bytes) { + (*self)(data) + } +} + +pub struct Download<'a> { + client: Client, + url: Url, + data_verifier: Option<&'a mut dyn DataVerifier>, +} + +impl fmt::Debug for Download<'_> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + #[allow(dead_code, clippy::type_complexity)] + #[derive(Debug)] + struct Download<'a> { + client: &'a Client, + url: &'a Url, + data_verifier: Option>, + } + + fmt::Debug::fmt( + &Download { + client: &self.client, + url: &self.url, + data_verifier: self.data_verifier.as_ref().map(|_| PhantomData), + }, + f, + ) + } +} + +impl Download<'static> { pub fn new(client: Client, url: Url) -> Self { Self { client, url, - _digest: PhantomData, - _checksum: Vec::new(), + data_verifier: None, } } +} +impl<'a> Download<'a> { + pub fn new_with_data_verifier( + client: Client, + url: Url, + data_verifier: &'a mut dyn DataVerifier, + ) -> Self { + Self { + client, + url, + data_verifier: Some(data_verifier), + } + } +} + +impl<'a> Download<'a> { + async fn get_stream( + self, + ) -> Result< + impl Stream> + Send + Sync + Unpin + 'a, + DownloadError, + > { + let mut data_verifier = self.data_verifier; + Ok(self.client.get_stream(self.url).await?.map(move |res| { + let bytes = res?; + + if let Some(data_verifier) = &mut data_verifier { + data_verifier.update(&bytes); + } + + Ok(bytes) + })) + } +} + +impl Download<'_> { /// Download a file from the provided URL and process them in memory. /// /// This does not support verifying a checksum due to the partial extraction @@ -101,11 +172,7 @@ impl Download { fmt: TarBasedFmt, visitor: &mut dyn TarEntriesVisitor, ) -> Result<(), DownloadError> { - let stream = self - .client - .get_stream(self.url) - .await? - .map(|res| res.map_err(DownloadError::from)); + let stream = self.get_stream().await?; debug!("Downloading and extracting then in-memory processing"); @@ -126,15 +193,11 @@ impl Download { path: impl AsRef, ) -> Result { async fn inner( - this: Download, + this: Download<'_>, fmt: PkgFmt, path: &Path, ) -> Result { - let stream = this - .client - .get_stream(this.url) - .await? - .map(|res| res.map_err(DownloadError::from)); + let stream = this.get_stream().await?; debug!("Downloading and extracting to: '{}'", path.display()); @@ -153,36 +216,6 @@ impl Download { } } -impl Download { - pub fn new_with_checksum(client: Client, url: Url, checksum: Vec) -> Self { - Self { - client, - url, - _digest: PhantomData, - _checksum: checksum, - } - } - - // TODO: implement checking the sum, may involve bringing (parts of) and_extract() back in here -} - -#[derive(Clone, Copy, Debug, Default)] -pub struct NoDigest; - -impl FixedOutput for NoDigest { - fn finalize_into(self, _out: &mut Output) {} -} - -impl OutputSizeUser for NoDigest { - type OutputSize = generic_array::typenum::U0; -} - -impl Update for NoDigest { - fn update(&mut self, _data: &[u8]) {} -} - -impl HashMarker for NoDigest {} - #[cfg(test)] mod test { use super::*; diff --git a/crates/binstalk-downloader/src/download/async_extracter.rs b/crates/binstalk-downloader/src/download/async_extracter.rs index 09a42837..e7a95c63 100644 --- a/crates/binstalk-downloader/src/download/async_extracter.rs +++ b/crates/binstalk-downloader/src/download/async_extracter.rs @@ -21,7 +21,7 @@ use crate::utils::{extract_with_blocking_task, StreamReadable}; pub async fn extract_bin(stream: S, path: &Path) -> Result where - S: Stream> + Send + Sync + Unpin + 'static, + S: Stream> + Send + Sync + Unpin, { debug!("Writing to `{}`", path.display()); @@ -45,7 +45,7 @@ where pub async fn extract_zip(stream: S, path: &Path) -> Result where - S: Stream> + Unpin + Send + Sync + 'static, + S: Stream> + Unpin + Send + Sync, { debug!("Decompressing from zip archive to `{}`", path.display()); @@ -79,7 +79,7 @@ pub async fn extract_tar_based_stream( fmt: TarBasedFmt, ) -> Result where - S: Stream> + Send + Sync + Unpin + 'static, + S: Stream> + Send + Sync + Unpin, { debug!("Extracting from {fmt} archive to {}", dst.display()); @@ -162,7 +162,7 @@ fn extract_with_blocking_decoder( f: F, ) -> impl Future> where - S: Stream> + Send + Sync + Unpin + 'static, + S: Stream> + Send + Sync + Unpin, F: FnOnce(mpsc::Receiver, &Path) -> io::Result + Send + Sync + 'static, T: Send + 'static, { diff --git a/crates/binstalk-downloader/src/utils.rs b/crates/binstalk-downloader/src/utils.rs index 2b3c833b..2a47d858 100644 --- a/crates/binstalk-downloader/src/utils.rs +++ b/crates/binstalk-downloader/src/utils.rs @@ -18,7 +18,7 @@ where T: Send + 'static, E: From, E: From, - S: Stream> + Send + Sync + Unpin + 'static, + S: Stream> + Send + Sync + Unpin, F: FnOnce(mpsc::Receiver) -> io::Result + Send + Sync + 'static, { async fn inner( @@ -31,7 +31,7 @@ where E: From, // We do not use trait object for S since there will only be one // S used with this function. - S: Stream> + Send + Sync + Unpin + 'static, + S: Stream> + Send + Sync + Unpin, // asyncify would always return the same future, so no need to // use trait object here. Fut: Future> + Send + Sync,