Refactor: Ret impl Stream in create_request

Since both `download*` function takes a `impl Stream` and the
`Response::bytes_stream` takes `Response` by value, thus there is no
lifetime issue and we can return `impl Stream` instead of `Response`

Signed-off-by: Jiahao XU <Jiahao_XU@outlook.com>
This commit is contained in:
Jiahao XU 2022-06-12 20:01:43 +10:00
parent baf9784b82
commit 225cf74cd9
No known key found for this signature in database
GPG key ID: 591C0B03040416D6

View file

@ -1,7 +1,9 @@
use std::fmt::Debug; use std::fmt::Debug;
use std::path::{Path, PathBuf}; use std::path::{Path, PathBuf};
use bytes::Bytes;
use cargo_toml::Manifest; use cargo_toml::Manifest;
use futures_util::stream::Stream;
use log::debug; use log::debug;
use reqwest::{Method, Response}; use reqwest::{Method, Response};
use serde::Serialize; use serde::Serialize;
@ -49,7 +51,9 @@ pub async fn remote_exists(url: Url, method: Method) -> Result<bool, BinstallErr
Ok(req.status().is_success()) Ok(req.status().is_success())
} }
async fn create_request(url: Url) -> Result<Response, BinstallError> { async fn create_request(
url: Url,
) -> Result<impl Stream<Item = reqwest::Result<Bytes>>, BinstallError> {
reqwest::get(url.clone()) reqwest::get(url.clone())
.await .await
.and_then(|r| r.error_for_status()) .and_then(|r| r.error_for_status())
@ -58,6 +62,7 @@ async fn create_request(url: Url) -> Result<Response, BinstallError> {
url, url,
err, err,
}) })
.map(Response::bytes_stream)
} }
/// Download a file from the provided URL and extract it to the provided path. /// Download a file from the provided URL and extract it to the provided path.
@ -68,13 +73,11 @@ pub async fn download_and_extract<P: AsRef<Path>>(
) -> Result<(), BinstallError> { ) -> Result<(), BinstallError> {
debug!("Downloading from: '{url}'"); debug!("Downloading from: '{url}'");
let resp = create_request(url).await?; let stream = create_request(url).await?;
let path = path.as_ref(); let path = path.as_ref();
debug!("Downloading and extracting to: '{}'", path.display()); debug!("Downloading and extracting to: '{}'", path.display());
let stream = resp.bytes_stream();
match fmt.decompose() { match fmt.decompose() {
PkgFmtDecomposed::Tar(fmt) => extract_tar_based_stream(stream, path, fmt).await?, PkgFmtDecomposed::Tar(fmt) => extract_tar_based_stream(stream, path, fmt).await?,
PkgFmtDecomposed::Bin => extract_bin(stream, path).await?, PkgFmtDecomposed::Bin => extract_bin(stream, path).await?,
@ -98,12 +101,10 @@ pub async fn download_tar_based_and_visit<V: TarEntriesVisitor + Debug + Send +
) -> Result<V, BinstallError> { ) -> Result<V, BinstallError> {
debug!("Downloading from: '{url}'"); debug!("Downloading from: '{url}'");
let resp = create_request(url).await?; let stream = create_request(url).await?;
debug!("Downloading and extracting then in-memory processing"); debug!("Downloading and extracting then in-memory processing");
let stream = resp.bytes_stream();
let visitor = extract_tar_based_stream_and_visit(stream, fmt, visitor).await?; let visitor = extract_tar_based_stream_and_visit(stream, fmt, visitor).await?;
debug!("Download, extraction and in-memory procession OK"); debug!("Download, extraction and in-memory procession OK");