feat: Support data verification in Download (#1248)

By accepting `&mut dyn DataVerifier` for users to pass any callback that
uses `digest::Digest`/`digest::Mac`, `sigstore` or whatever they want.

Signed-off-by: Jiahao XU <Jiahao_XU@outlook.com>
This commit is contained in:
Jiahao XU 2023-08-06 15:56:02 +10:00 committed by GitHub
parent 3181e16e36
commit e4c776f403
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 92 additions and 61 deletions

1
Cargo.lock generated
View file

@ -275,7 +275,6 @@ dependencies = [
"bytes",
"bzip2",
"compact_str",
"digest",
"flate2",
"futures-lite",
"futures-util",

View file

@ -17,7 +17,6 @@ binstalk-types = { version = "0.5.0", path = "../binstalk-types" }
bytes = "1.4.0"
bzip2 = "0.4.4"
compact_str = "0.7.0"
digest = "0.10.7"
flate2 = { version = "1.0.26", default-features = false }
futures-lite = { version = "1.13.0", default-features = false }
futures-util = "0.3.28"

View file

@ -1,8 +1,8 @@
use std::{fmt::Debug, io, marker::PhantomData, path::Path};
use std::{fmt, io, marker::PhantomData, path::Path};
use binstalk_types::cargo_toml_binstall::PkgFmtDecomposed;
use digest::{Digest, FixedOutput, HashMarker, Output, OutputSizeUser, Update};
use futures_lite::stream::StreamExt;
use bytes::Bytes;
use futures_lite::stream::{Stream, StreamExt};
use thiserror::Error as ThisError;
use tracing::{debug, instrument};
@ -70,24 +70,95 @@ impl From<DownloadError> for io::Error {
}
}
#[derive(Debug)]
pub struct Download<D: Digest = NoDigest> {
client: Client,
url: Url,
_digest: PhantomData<D>,
_checksum: Vec<u8>,
pub trait DataVerifier: Send + Sync {
/// Digest input data.
///
/// This method can be called repeatedly for use with streaming messages,
/// it will be called in the order of the message received.
fn update(&mut self, data: &Bytes);
}
impl Download {
impl<T> DataVerifier for T
where
T: FnMut(&Bytes) + Send + Sync,
{
fn update(&mut self, data: &Bytes) {
(*self)(data)
}
}
pub struct Download<'a> {
client: Client,
url: Url,
data_verifier: Option<&'a mut dyn DataVerifier>,
}
impl fmt::Debug for Download<'_> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
#[allow(dead_code, clippy::type_complexity)]
#[derive(Debug)]
struct Download<'a> {
client: &'a Client,
url: &'a Url,
data_verifier: Option<PhantomData<&'a mut dyn DataVerifier>>,
}
fmt::Debug::fmt(
&Download {
client: &self.client,
url: &self.url,
data_verifier: self.data_verifier.as_ref().map(|_| PhantomData),
},
f,
)
}
}
impl Download<'static> {
pub fn new(client: Client, url: Url) -> Self {
Self {
client,
url,
_digest: PhantomData,
_checksum: Vec::new(),
data_verifier: None,
}
}
}
impl<'a> Download<'a> {
pub fn new_with_data_verifier(
client: Client,
url: Url,
data_verifier: &'a mut dyn DataVerifier,
) -> Self {
Self {
client,
url,
data_verifier: Some(data_verifier),
}
}
}
impl<'a> Download<'a> {
async fn get_stream(
self,
) -> Result<
impl Stream<Item = Result<Bytes, DownloadError>> + Send + Sync + Unpin + 'a,
DownloadError,
> {
let mut data_verifier = self.data_verifier;
Ok(self.client.get_stream(self.url).await?.map(move |res| {
let bytes = res?;
if let Some(data_verifier) = &mut data_verifier {
data_verifier.update(&bytes);
}
Ok(bytes)
}))
}
}
impl Download<'_> {
/// Download a file from the provided URL and process them in memory.
///
/// This does not support verifying a checksum due to the partial extraction
@ -101,11 +172,7 @@ impl Download {
fmt: TarBasedFmt,
visitor: &mut dyn TarEntriesVisitor,
) -> Result<(), DownloadError> {
let stream = self
.client
.get_stream(self.url)
.await?
.map(|res| res.map_err(DownloadError::from));
let stream = self.get_stream().await?;
debug!("Downloading and extracting then in-memory processing");
@ -126,15 +193,11 @@ impl Download {
path: impl AsRef<Path>,
) -> Result<ExtractedFiles, DownloadError> {
async fn inner(
this: Download,
this: Download<'_>,
fmt: PkgFmt,
path: &Path,
) -> Result<ExtractedFiles, DownloadError> {
let stream = this
.client
.get_stream(this.url)
.await?
.map(|res| res.map_err(DownloadError::from));
let stream = this.get_stream().await?;
debug!("Downloading and extracting to: '{}'", path.display());
@ -153,36 +216,6 @@ impl Download {
}
}
impl<D: Digest> Download<D> {
pub fn new_with_checksum(client: Client, url: Url, checksum: Vec<u8>) -> Self {
Self {
client,
url,
_digest: PhantomData,
_checksum: checksum,
}
}
// TODO: implement checking the sum, may involve bringing (parts of) and_extract() back in here
}
#[derive(Clone, Copy, Debug, Default)]
pub struct NoDigest;
impl FixedOutput for NoDigest {
fn finalize_into(self, _out: &mut Output<Self>) {}
}
impl OutputSizeUser for NoDigest {
type OutputSize = generic_array::typenum::U0;
}
impl Update for NoDigest {
fn update(&mut self, _data: &[u8]) {}
}
impl HashMarker for NoDigest {}
#[cfg(test)]
mod test {
use super::*;

View file

@ -21,7 +21,7 @@ use crate::utils::{extract_with_blocking_task, StreamReadable};
pub async fn extract_bin<S>(stream: S, path: &Path) -> Result<ExtractedFiles, DownloadError>
where
S: Stream<Item = Result<Bytes, DownloadError>> + Send + Sync + Unpin + 'static,
S: Stream<Item = Result<Bytes, DownloadError>> + Send + Sync + Unpin,
{
debug!("Writing to `{}`", path.display());
@ -45,7 +45,7 @@ where
pub async fn extract_zip<S>(stream: S, path: &Path) -> Result<ExtractedFiles, DownloadError>
where
S: Stream<Item = Result<Bytes, DownloadError>> + Unpin + Send + Sync + 'static,
S: Stream<Item = Result<Bytes, DownloadError>> + Unpin + Send + Sync,
{
debug!("Decompressing from zip archive to `{}`", path.display());
@ -79,7 +79,7 @@ pub async fn extract_tar_based_stream<S>(
fmt: TarBasedFmt,
) -> Result<ExtractedFiles, DownloadError>
where
S: Stream<Item = Result<Bytes, DownloadError>> + Send + Sync + Unpin + 'static,
S: Stream<Item = Result<Bytes, DownloadError>> + Send + Sync + Unpin,
{
debug!("Extracting from {fmt} archive to {}", dst.display());
@ -162,7 +162,7 @@ fn extract_with_blocking_decoder<S, F, T>(
f: F,
) -> impl Future<Output = Result<T, DownloadError>>
where
S: Stream<Item = Result<Bytes, DownloadError>> + Send + Sync + Unpin + 'static,
S: Stream<Item = Result<Bytes, DownloadError>> + Send + Sync + Unpin,
F: FnOnce(mpsc::Receiver<Bytes>, &Path) -> io::Result<T> + Send + Sync + 'static,
T: Send + 'static,
{

View file

@ -18,7 +18,7 @@ where
T: Send + 'static,
E: From<io::Error>,
E: From<StreamError>,
S: Stream<Item = Result<Bytes, StreamError>> + Send + Sync + Unpin + 'static,
S: Stream<Item = Result<Bytes, StreamError>> + Send + Sync + Unpin,
F: FnOnce(mpsc::Receiver<Bytes>) -> io::Result<T> + Send + Sync + 'static,
{
async fn inner<S, StreamError, Fut, T, E>(
@ -31,7 +31,7 @@ where
E: From<StreamError>,
// We do not use trait object for S since there will only be one
// S used with this function.
S: Stream<Item = Result<Bytes, StreamError>> + Send + Sync + Unpin + 'static,
S: Stream<Item = Result<Bytes, StreamError>> + Send + Sync + Unpin,
// asyncify would always return the same future, so no need to
// use trait object here.
Fut: Future<Output = io::Result<T>> + Send + Sync,