Refactor: Extract new crate binstalk-{signal, downloader} (#518)

* Refactor: Extract new crate binstalk-downloader
* Re-export `PkgFmt` from `binstalk_manifests`
* Update release-pr.yml
* Update dependabot

Signed-off-by: Jiahao XU <Jiahao_XU@outlook.com>
This commit is contained in:
Jiahao XU 2022-11-11 15:02:54 +11:00 committed by GitHub
parent 3841762a5b
commit 89fa5b1769
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
21 changed files with 456 additions and 260 deletions

View file

@ -0,0 +1,125 @@
use std::{
fmt::Debug,
fs,
io::{Read, Seek},
path::Path,
};
use bytes::Bytes;
use futures_util::stream::Stream;
use log::debug;
use scopeguard::{guard, ScopeGuard};
use tar::Entries;
use tempfile::tempfile;
use tokio::task::block_in_place;
use super::{
extracter::*, stream_readable::StreamReadable, CancellationFuture, DownloadError, TarBasedFmt,
};
pub async fn extract_bin<S, E>(
stream: S,
path: &Path,
cancellation_future: CancellationFuture,
) -> Result<(), DownloadError>
where
S: Stream<Item = Result<Bytes, E>> + Unpin + 'static,
DownloadError: From<E>,
{
let mut reader = StreamReadable::new(stream, cancellation_future).await;
block_in_place(move || {
fs::create_dir_all(path.parent().unwrap())?;
let mut file = fs::File::create(path)?;
// remove it unless the operation isn't aborted and no write
// fails.
let remove_guard = guard(&path, |path| {
fs::remove_file(path).ok();
});
reader.copy(&mut file)?;
// Operation isn't aborted and all writes succeed,
// disarm the remove_guard.
ScopeGuard::into_inner(remove_guard);
Ok(())
})
}
pub async fn extract_zip<S, E>(
stream: S,
path: &Path,
cancellation_future: CancellationFuture,
) -> Result<(), DownloadError>
where
S: Stream<Item = Result<Bytes, E>> + Unpin + 'static,
DownloadError: From<E>,
{
let mut reader = StreamReadable::new(stream, cancellation_future).await;
block_in_place(move || {
fs::create_dir_all(path.parent().unwrap())?;
let mut file = tempfile()?;
reader.copy(&mut file)?;
// rewind it so that we can pass it to unzip
file.rewind()?;
unzip(file, path)
})
}
pub async fn extract_tar_based_stream<S, E>(
stream: S,
path: &Path,
fmt: TarBasedFmt,
cancellation_future: CancellationFuture,
) -> Result<(), DownloadError>
where
S: Stream<Item = Result<Bytes, E>> + Unpin + 'static,
DownloadError: From<E>,
{
let reader = StreamReadable::new(stream, cancellation_future).await;
block_in_place(move || {
fs::create_dir_all(path.parent().unwrap())?;
debug!("Extracting from {fmt} archive to {path:#?}");
create_tar_decoder(reader, fmt)?.unpack(path)?;
Ok(())
})
}
/// Visitor must iterate over all entries.
/// Entires can be in arbitary order.
pub trait TarEntriesVisitor {
type Target;
fn visit<R: Read>(&mut self, entries: Entries<'_, R>) -> Result<(), DownloadError>;
fn finish(self) -> Result<Self::Target, DownloadError>;
}
pub async fn extract_tar_based_stream_and_visit<S, V, E>(
stream: S,
fmt: TarBasedFmt,
mut visitor: V,
cancellation_future: CancellationFuture,
) -> Result<V::Target, DownloadError>
where
S: Stream<Item = Result<Bytes, E>> + Unpin + 'static,
V: TarEntriesVisitor + Debug + Send + 'static,
DownloadError: From<E>,
{
let reader = StreamReadable::new(stream, cancellation_future).await;
block_in_place(move || {
debug!("Extracting from {fmt} archive to process it in memory");
let mut tar = create_tar_decoder(reader, fmt)?;
visitor.visit(tar.entries()?)?;
visitor.finish()
})
}

View file

@ -0,0 +1,46 @@
use std::{
fs::File,
io::{self, BufRead, Read},
path::Path,
};
use bzip2::bufread::BzDecoder;
use flate2::bufread::GzDecoder;
use log::debug;
use tar::Archive;
use xz2::bufread::XzDecoder;
use zip::read::ZipArchive;
use zstd::stream::Decoder as ZstdDecoder;
use super::{DownloadError, TarBasedFmt};
pub fn create_tar_decoder(
dat: impl BufRead + 'static,
fmt: TarBasedFmt,
) -> io::Result<Archive<Box<dyn Read>>> {
use TarBasedFmt::*;
let r: Box<dyn Read> = match fmt {
Tar => Box::new(dat),
Tbz2 => Box::new(BzDecoder::new(dat)),
Tgz => Box::new(GzDecoder::new(dat)),
Txz => Box::new(XzDecoder::new(dat)),
Tzstd => {
// The error can only come from raw::Decoder::with_dictionary as of zstd 0.10.2 and
// 0.11.2, which is specified as `&[]` by `ZstdDecoder::new`, thus `ZstdDecoder::new`
// should not return any error.
Box::new(ZstdDecoder::with_buffer(dat)?)
}
};
Ok(Archive::new(r))
}
pub fn unzip(dat: File, dst: &Path) -> Result<(), DownloadError> {
debug!("Decompressing from zip archive to `{dst:?}`");
let mut zip = ZipArchive::new(dat)?;
zip.extract(dst)?;
Ok(())
}

View file

@ -0,0 +1,146 @@
use std::{
cmp::min,
io::{self, BufRead, Read, Write},
};
use bytes::{Buf, Bytes};
use futures_util::stream::{Stream, StreamExt};
use tokio::runtime::Handle;
use super::{CancellationFuture, DownloadError};
/// This wraps an AsyncIterator as a `Read`able.
/// It must be used in non-async context only,
/// meaning you have to use it with
/// `tokio::task::{block_in_place, spawn_blocking}` or
/// `std::thread::spawn`.
pub struct StreamReadable<S> {
stream: S,
handle: Handle,
bytes: Bytes,
cancellation_future: CancellationFuture,
}
impl<S> StreamReadable<S> {
pub(super) async fn new(stream: S, cancellation_future: CancellationFuture) -> Self {
Self {
stream,
handle: Handle::current(),
bytes: Bytes::new(),
cancellation_future,
}
}
}
impl<S, E> StreamReadable<S>
where
S: Stream<Item = Result<Bytes, E>> + Unpin,
DownloadError: From<E>,
{
/// Copies from `self` to `writer`.
///
/// Same as `io::copy` but does not allocate any internal buffer
/// since `self` is buffered.
pub(super) fn copy<W>(&mut self, mut writer: W) -> io::Result<()>
where
W: Write,
{
self.copy_inner(&mut writer)
}
fn copy_inner(&mut self, writer: &mut dyn Write) -> io::Result<()> {
loop {
let buf = self.fill_buf()?;
if buf.is_empty() {
// Eof
break Ok(());
}
writer.write_all(buf)?;
let n = buf.len();
self.consume(n);
}
}
}
impl<S, E> Read for StreamReadable<S>
where
S: Stream<Item = Result<Bytes, E>> + Unpin,
DownloadError: From<E>,
{
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
if buf.is_empty() {
return Ok(0);
}
if self.fill_buf()?.is_empty() {
return Ok(0);
}
let bytes = &mut self.bytes;
// copy_to_slice requires the bytes to have enough remaining bytes
// to fill buf.
let n = min(buf.len(), bytes.remaining());
bytes.copy_to_slice(&mut buf[..n]);
Ok(n)
}
}
/// If `Ok(Some(bytes))` if returned, then `bytes.is_empty() == false`.
async fn next_stream<S, E>(stream: &mut S) -> io::Result<Option<Bytes>>
where
S: Stream<Item = Result<Bytes, E>> + Unpin,
DownloadError: From<E>,
{
loop {
let option = stream
.next()
.await
.transpose()
.map_err(DownloadError::from)?;
match option {
Some(bytes) if bytes.is_empty() => continue,
option => break Ok(option),
}
}
}
impl<S, E> BufRead for StreamReadable<S>
where
S: Stream<Item = Result<Bytes, E>> + Unpin,
DownloadError: From<E>,
{
fn fill_buf(&mut self) -> io::Result<&[u8]> {
let bytes = &mut self.bytes;
if !bytes.has_remaining() {
let option = self.handle.block_on(async {
if let Some(cancellation_future) = self.cancellation_future.as_mut() {
tokio::select! {
res = next_stream(&mut self.stream) => res,
res = cancellation_future => {
Err(res.err().unwrap_or_else(|| io::Error::from(DownloadError::UserAbort)))
},
}
} else {
next_stream(&mut self.stream).await
}
})?;
if let Some(new_bytes) = option {
// new_bytes are guaranteed to be non-empty.
*bytes = new_bytes;
}
}
Ok(&*bytes)
}
fn consume(&mut self, amt: usize) {
self.bytes.advance(amt);
}
}