Rename lib to binstalk (#361)

This commit is contained in:
Félix Saparelli 2022-09-10 18:44:18 +12:00 committed by GitHub
parent a94d83f0d5
commit e25aa50ec9
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
49 changed files with 25 additions and 25 deletions

View file

@ -0,0 +1,113 @@
use std::{fmt::Debug, marker::PhantomData, path::Path};
use digest::{Digest, FixedOutput, HashMarker, Output, OutputSizeUser, Update};
use log::debug;
use reqwest::{Client, Url};
use crate::{
errors::BinstallError,
helpers::remote::create_request,
manifests::cargo_toml_binstall::{PkgFmt, PkgFmtDecomposed, TarBasedFmt},
};
pub use async_extracter::TarEntriesVisitor;
use async_extracter::*;
mod async_extracter;
mod extracter;
mod stream_readable;
#[derive(Debug)]
pub struct Download<D: Digest = NoDigest> {
client: Client,
url: Url,
_digest: PhantomData<D>,
_checksum: Vec<u8>,
}
impl Download {
pub fn new(client: Client, url: Url) -> Self {
Self {
client,
url,
_digest: PhantomData::default(),
_checksum: Vec::new(),
}
}
/// Download a file from the provided URL and extract part of it to
/// the provided path.
///
/// * `filter` - If Some, then it will pass the path of the file to it
/// and only extract ones which filter returns `true`.
///
/// This does not support verifying a checksum due to the partial extraction
/// and will ignore one if specified.
pub async fn and_visit_tar<V: TarEntriesVisitor + Debug + Send + 'static>(
self,
fmt: TarBasedFmt,
visitor: V,
) -> Result<V::Target, BinstallError> {
let stream = create_request(self.client, self.url).await?;
debug!("Downloading and extracting then in-memory processing");
let ret = extract_tar_based_stream_and_visit(stream, fmt, visitor).await?;
debug!("Download, extraction and in-memory procession OK");
Ok(ret)
}
/// Download a file from the provided URL and extract it to the provided path.
pub async fn and_extract(
self,
fmt: PkgFmt,
path: impl AsRef<Path>,
) -> Result<(), BinstallError> {
let stream = create_request(self.client, self.url).await?;
let path = path.as_ref();
debug!("Downloading and extracting to: '{}'", path.display());
match fmt.decompose() {
PkgFmtDecomposed::Tar(fmt) => extract_tar_based_stream(stream, path, fmt).await?,
PkgFmtDecomposed::Bin => extract_bin(stream, path).await?,
PkgFmtDecomposed::Zip => extract_zip(stream, path).await?,
}
debug!("Download OK, extracted to: '{}'", path.display());
Ok(())
}
}
impl<D: Digest> Download<D> {
pub fn new_with_checksum(client: Client, url: Url, checksum: Vec<u8>) -> Self {
Self {
client,
url,
_digest: PhantomData::default(),
_checksum: checksum,
}
}
// TODO: implement checking the sum, may involve bringing (parts of) and_extract() back in here
}
#[derive(Clone, Copy, Debug, Default)]
pub struct NoDigest;
impl FixedOutput for NoDigest {
fn finalize_into(self, _out: &mut Output<Self>) {}
}
impl OutputSizeUser for NoDigest {
type OutputSize = generic_array::typenum::U0;
}
impl Update for NoDigest {
fn update(&mut self, _data: &[u8]) {}
}
impl HashMarker for NoDigest {}

View file

@ -0,0 +1,114 @@
use std::{
fmt::Debug,
fs,
io::{copy, Read, Seek},
path::Path,
};
use bytes::Bytes;
use futures_util::stream::Stream;
use log::debug;
use scopeguard::{guard, ScopeGuard};
use tar::Entries;
use tempfile::tempfile;
use tokio::task::block_in_place;
use super::{extracter::*, stream_readable::StreamReadable};
use crate::{errors::BinstallError, manifests::cargo_toml_binstall::TarBasedFmt};
pub async fn extract_bin<S, E>(stream: S, path: &Path) -> Result<(), BinstallError>
where
S: Stream<Item = Result<Bytes, E>> + Unpin + 'static,
BinstallError: From<E>,
{
let mut reader = StreamReadable::new(stream).await;
block_in_place(move || {
fs::create_dir_all(path.parent().unwrap())?;
let mut file = fs::File::create(path)?;
// remove it unless the operation isn't aborted and no write
// fails.
let remove_guard = guard(&path, |path| {
fs::remove_file(path).ok();
});
copy(&mut reader, &mut file)?;
// Operation isn't aborted and all writes succeed,
// disarm the remove_guard.
ScopeGuard::into_inner(remove_guard);
Ok(())
})
}
pub async fn extract_zip<S, E>(stream: S, path: &Path) -> Result<(), BinstallError>
where
S: Stream<Item = Result<Bytes, E>> + Unpin + 'static,
BinstallError: From<E>,
{
let mut reader = StreamReadable::new(stream).await;
block_in_place(move || {
fs::create_dir_all(path.parent().unwrap())?;
let mut file = tempfile()?;
copy(&mut reader, &mut file)?;
// rewind it so that we can pass it to unzip
file.rewind()?;
unzip(file, path)
})
}
pub async fn extract_tar_based_stream<S, E>(
stream: S,
path: &Path,
fmt: TarBasedFmt,
) -> Result<(), BinstallError>
where
S: Stream<Item = Result<Bytes, E>> + Unpin + 'static,
BinstallError: From<E>,
{
let reader = StreamReadable::new(stream).await;
block_in_place(move || {
fs::create_dir_all(path.parent().unwrap())?;
debug!("Extracting from {fmt} archive to {path:#?}");
create_tar_decoder(reader, fmt)?.unpack(path)?;
Ok(())
})
}
/// Visitor must iterate over all entries.
/// Entires can be in arbitary order.
pub trait TarEntriesVisitor {
type Target;
fn visit<R: Read>(&mut self, entries: Entries<'_, R>) -> Result<(), BinstallError>;
fn finish(self) -> Result<Self::Target, BinstallError>;
}
pub async fn extract_tar_based_stream_and_visit<S, V, E>(
stream: S,
fmt: TarBasedFmt,
mut visitor: V,
) -> Result<V::Target, BinstallError>
where
S: Stream<Item = Result<Bytes, E>> + Unpin + 'static,
V: TarEntriesVisitor + Debug + Send + 'static,
BinstallError: From<E>,
{
let reader = StreamReadable::new(stream).await;
block_in_place(move || {
debug!("Extracting from {fmt} archive to process it in memory");
let mut tar = create_tar_decoder(reader, fmt)?;
visitor.visit(tar.entries()?)?;
visitor.finish()
})
}

View file

@ -0,0 +1,46 @@
use std::{
fs::File,
io::{self, BufRead, Read},
path::Path,
};
use bzip2::bufread::BzDecoder;
use flate2::bufread::GzDecoder;
use log::debug;
use tar::Archive;
use xz2::bufread::XzDecoder;
use zip::read::ZipArchive;
use zstd::stream::Decoder as ZstdDecoder;
use crate::{errors::BinstallError, manifests::cargo_toml_binstall::TarBasedFmt};
pub fn create_tar_decoder(
dat: impl BufRead + 'static,
fmt: TarBasedFmt,
) -> io::Result<Archive<Box<dyn Read>>> {
use TarBasedFmt::*;
let r: Box<dyn Read> = match fmt {
Tar => Box::new(dat),
Tbz2 => Box::new(BzDecoder::new(dat)),
Tgz => Box::new(GzDecoder::new(dat)),
Txz => Box::new(XzDecoder::new(dat)),
Tzstd => {
// The error can only come from raw::Decoder::with_dictionary as of zstd 0.10.2 and
// 0.11.2, which is specified as `&[]` by `ZstdDecoder::new`, thus `ZstdDecoder::new`
// should not return any error.
Box::new(ZstdDecoder::with_buffer(dat)?)
}
};
Ok(Archive::new(r))
}
pub fn unzip(dat: File, dst: &Path) -> Result<(), BinstallError> {
debug!("Decompressing from zip archive to `{dst:?}`");
let mut zip = ZipArchive::new(dat)?;
zip.extract(dst)?;
Ok(())
}

View file

@ -0,0 +1,83 @@
use std::{
cmp::min,
io::{self, BufRead, Read},
};
use bytes::{Buf, Bytes};
use futures_util::stream::{Stream, StreamExt};
use tokio::runtime::Handle;
use crate::errors::BinstallError;
/// This wraps an AsyncIterator as a `Read`able.
/// It must be used in non-async context only,
/// meaning you have to use it with
/// `tokio::task::{block_in_place, spawn_blocking}` or
/// `std::thread::spawn`.
#[derive(Debug)]
pub struct StreamReadable<S> {
stream: S,
handle: Handle,
bytes: Bytes,
}
impl<S> StreamReadable<S> {
pub(super) async fn new(stream: S) -> Self {
Self {
stream,
handle: Handle::current(),
bytes: Bytes::new(),
}
}
}
impl<S, E> Read for StreamReadable<S>
where
S: Stream<Item = Result<Bytes, E>> + Unpin,
BinstallError: From<E>,
{
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
if buf.is_empty() {
return Ok(0);
}
if self.fill_buf()?.is_empty() {
return Ok(0);
}
let bytes = &mut self.bytes;
// copy_to_slice requires the bytes to have enough remaining bytes
// to fill buf.
let n = min(buf.len(), bytes.remaining());
bytes.copy_to_slice(&mut buf[..n]);
Ok(n)
}
}
impl<S, E> BufRead for StreamReadable<S>
where
S: Stream<Item = Result<Bytes, E>> + Unpin,
BinstallError: From<E>,
{
fn fill_buf(&mut self) -> io::Result<&[u8]> {
let bytes = &mut self.bytes;
if !bytes.has_remaining() {
match self.handle.block_on(async { self.stream.next().await }) {
Some(Ok(new_bytes)) => *bytes = new_bytes,
Some(Err(e)) => {
let e: BinstallError = e.into();
return Err(io::Error::new(io::ErrorKind::Other, e));
}
None => (),
}
}
Ok(&*bytes)
}
fn consume(&mut self, amt: usize) {
self.bytes.advance(amt);
}
}

View file

@ -0,0 +1,33 @@
use std::{num::NonZeroUsize, sync::Arc, thread::available_parallelism};
use jobslot::Client;
use tokio::sync::OnceCell;
use crate::errors::BinstallError;
#[derive(Clone)]
pub struct LazyJobserverClient(Arc<OnceCell<Client>>);
impl LazyJobserverClient {
/// This must be called at the start of the program since
/// `Client::from_env` requires that.
#[allow(clippy::new_without_default)]
pub fn new() -> Self {
// Safety:
//
// Client::from_env is unsafe because from_raw_fd is unsafe.
// It doesn't do anything that is actually unsafe, like
// dereferencing pointer.
let opt = unsafe { Client::from_env() };
Self(Arc::new(OnceCell::new_with(opt)))
}
pub async fn get(&self) -> Result<&Client, BinstallError> {
self.0
.get_or_try_init(|| async {
let ncore = available_parallelism().map(NonZeroUsize::get).unwrap_or(1);
Ok(Client::new(ncore)?)
})
.await
}
}

View file

@ -0,0 +1,69 @@
use std::env;
use bytes::Bytes;
use futures_util::stream::Stream;
use log::debug;
use reqwest::{tls, Client, ClientBuilder, Method, Response};
use url::Url;
use crate::errors::BinstallError;
pub fn create_reqwest_client(min_tls: Option<tls::Version>) -> Result<Client, BinstallError> {
const USER_AGENT: &str = concat!(env!("CARGO_PKG_NAME"), "/", env!("CARGO_PKG_VERSION"));
let mut builder = ClientBuilder::new()
.user_agent(USER_AGENT)
.https_only(true)
.min_tls_version(tls::Version::TLS_1_2);
if let Some(ver) = min_tls {
builder = builder.min_tls_version(ver);
}
Ok(builder.build()?)
}
pub async fn remote_exists(
client: Client,
url: Url,
method: Method,
) -> Result<bool, BinstallError> {
let req = client
.request(method.clone(), url.clone())
.send()
.await
.map_err(|err| BinstallError::Http { method, url, err })?;
Ok(req.status().is_success())
}
pub async fn get_redirected_final_url(client: &Client, url: Url) -> Result<Url, BinstallError> {
let method = Method::HEAD;
let req = client
.request(method.clone(), url.clone())
.send()
.await
.and_then(Response::error_for_status)
.map_err(|err| BinstallError::Http { method, url, err })?;
Ok(req.url().clone())
}
pub(crate) async fn create_request(
client: Client,
url: Url,
) -> Result<impl Stream<Item = reqwest::Result<Bytes>>, BinstallError> {
debug!("Downloading from: '{url}'");
client
.get(url.clone())
.send()
.await
.and_then(|r| r.error_for_status())
.map_err(|err| BinstallError::Http {
method: Method::GET,
url,
err,
})
.map(Response::bytes_stream)
}

View file

@ -0,0 +1,82 @@
use std::io;
use futures_util::future::pending;
use tokio::signal;
use super::tasks::AutoAbortJoinHandle;
use crate::errors::BinstallError;
/// This function will poll the handle while listening for ctrl_c,
/// `SIGINT`, `SIGHUP`, `SIGTERM` and `SIGQUIT`.
///
/// When signal is received, [`BinstallError::UserAbort`] will be returned.
///
/// It would also ignore `SIGUSER1` and `SIGUSER2` on unix.
///
/// This function uses [`tokio::signal`] and once exit, does not reset the default
/// signal handler, so be careful when using it.
pub async fn cancel_on_user_sig_term<T>(
handle: AutoAbortJoinHandle<T>,
) -> Result<T, BinstallError> {
#[cfg(unix)]
unix::ignore_signals_on_unix()?;
tokio::select! {
res = handle => res,
res = wait_on_cancellation_signal() => {
res.map_err(BinstallError::Io).and(Err(BinstallError::UserAbort))
}
}
}
async fn wait_on_cancellation_signal() -> Result<(), io::Error> {
#[cfg(unix)]
async fn inner() -> Result<(), io::Error> {
unix::wait_on_cancellation_signal_unix().await
}
#[cfg(not(unix))]
async fn inner() -> Result<(), io::Error> {
// Use pending here so that tokio::select! would just skip this branch.
pending().await
}
tokio::select! {
res = signal::ctrl_c() => res,
res = inner() => res,
}
}
#[cfg(unix)]
mod unix {
use super::*;
use signal::unix::{signal, SignalKind};
/// Same as [`wait_on_cancellation_signal`] but is only available on unix.
pub async fn wait_on_cancellation_signal_unix() -> Result<(), io::Error> {
tokio::select! {
res = wait_for_signal_unix(SignalKind::interrupt()) => res,
res = wait_for_signal_unix(SignalKind::hangup()) => res,
res = wait_for_signal_unix(SignalKind::terminate()) => res,
res = wait_for_signal_unix(SignalKind::quit()) => res,
}
}
/// Wait for first arrival of signal.
pub async fn wait_for_signal_unix(kind: signal::unix::SignalKind) -> Result<(), io::Error> {
let mut sig_listener = signal::unix::signal(kind)?;
if sig_listener.recv().await.is_some() {
Ok(())
} else {
// Use pending() here for the same reason as above.
pending().await
}
}
pub fn ignore_signals_on_unix() -> Result<(), BinstallError> {
drop(signal(SignalKind::user_defined1())?);
drop(signal(SignalKind::user_defined2())?);
Ok(())
}
}

View file

@ -0,0 +1,23 @@
use std::{
io::Error,
ops::Deref,
path::{Path, PathBuf},
};
use once_cell::sync::{Lazy, OnceCell};
use url::Url;
pub fn cargo_home() -> Result<&'static Path, Error> {
static CARGO_HOME: OnceCell<PathBuf> = OnceCell::new();
CARGO_HOME
.get_or_try_init(home::cargo_home)
.map(Deref::deref)
}
pub fn cratesio_url() -> &'static Url {
static CRATESIO: Lazy<Url, fn() -> Url> =
Lazy::new(|| Url::parse("https://github.com/rust-lang/crates.io-index").unwrap());
&CRATESIO
}

View file

@ -0,0 +1,61 @@
use std::{
future::Future,
ops::{Deref, DerefMut},
pin::Pin,
task::{Context, Poll},
};
use tokio::task::JoinHandle;
use crate::errors::BinstallError;
#[derive(Debug)]
pub struct AutoAbortJoinHandle<T>(JoinHandle<T>);
impl<T> AutoAbortJoinHandle<T> {
pub fn new(handle: JoinHandle<T>) -> Self {
Self(handle)
}
}
impl<T> AutoAbortJoinHandle<T>
where
T: Send + 'static,
{
pub fn spawn<F>(future: F) -> Self
where
F: Future<Output = T> + Send + 'static,
{
Self(tokio::spawn(future))
}
}
impl<T> Drop for AutoAbortJoinHandle<T> {
fn drop(&mut self) {
self.0.abort();
}
}
impl<T> Deref for AutoAbortJoinHandle<T> {
type Target = JoinHandle<T>;
fn deref(&self) -> &Self::Target {
&self.0
}
}
impl<T> DerefMut for AutoAbortJoinHandle<T> {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.0
}
}
impl<T> Future for AutoAbortJoinHandle<T> {
type Output = Result<T, BinstallError>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
Pin::new(&mut Pin::into_inner(self).0)
.poll(cx)
.map(|res| res.map_err(BinstallError::TaskJoinError))
}
}