Rm unused items in mod helpers

Signed-off-by: Jiahao XU <Jiahao_XU@outlook.com>
This commit is contained in:
Jiahao XU 2022-06-18 18:10:22 +10:00
parent c15d99c6f0
commit 621a641529
No known key found for this signature in database
GPG key ID: 591C0B03040416D6
3 changed files with 4 additions and 163 deletions

View file

@ -24,8 +24,6 @@ pub use ui_thread::UIThread;
mod extracter;
mod stream_readable;
mod readable_rx;
mod path_ext;
pub use path_ext::*;

View file

@ -16,97 +16,20 @@
use std::fmt::Debug;
use std::fs;
use std::io::{self, copy, Read, Seek, Write};
use std::io::{copy, Read, Seek};
use std::path::Path;
use bytes::Bytes;
use futures_util::stream::{Stream, StreamExt};
use futures_util::stream::Stream;
use log::debug;
use scopeguard::{guard, ScopeGuard};
use tar::Entries;
use tempfile::tempfile;
use tokio::{
sync::mpsc,
task::{block_in_place, spawn_blocking, JoinHandle},
};
use tokio::task::block_in_place;
use super::{extracter::*, readable_rx::*, stream_readable::StreamReadable};
use super::{extracter::*, stream_readable::StreamReadable};
use crate::{BinstallError, TarBasedFmt};
pub(crate) enum Content {
/// Data to write to file
Data(Bytes),
/// Abort the writing and remove the file.
Abort,
}
/// AsyncExtracter will pass the `Bytes` you give to another thread via
/// a `mpsc` and decompress and unpack it if needed.
///
/// After all write is done, you must call `AsyncExtracter::done`,
/// otherwise the extracted content will be removed on drop.
#[derive(Debug)]
struct AsyncExtracterInner<T> {
/// Use AutoAbortJoinHandle so that the task
/// will be cancelled on failure.
handle: JoinHandle<Result<T, BinstallError>>,
tx: mpsc::Sender<Content>,
}
impl<T: Debug + Send + 'static> AsyncExtracterInner<T> {
fn new<F: FnOnce(mpsc::Receiver<Content>) -> Result<T, BinstallError> + Send + 'static>(
f: F,
) -> Self {
let (tx, rx) = mpsc::channel::<Content>(100);
let handle = spawn_blocking(move || f(rx));
Self { handle, tx }
}
/// Upon error, this extracter shall not be reused.
/// Otherwise, `Self::done` would panic.
async fn feed(&mut self, bytes: Bytes) -> Result<(), BinstallError> {
if self.tx.send(Content::Data(bytes)).await.is_err() {
// task failed
Err(Self::wait(&mut self.handle).await.expect_err(
"Implementation bug: write task finished successfully before all writes are done",
))
} else {
Ok(())
}
}
async fn done(mut self) -> Result<T, BinstallError> {
// Drop tx as soon as possible so that the task would wrap up what it
// was doing and flush out all the pending data.
drop(self.tx);
Self::wait(&mut self.handle).await
}
async fn wait(handle: &mut JoinHandle<Result<T, BinstallError>>) -> Result<T, BinstallError> {
match handle.await {
Ok(res) => res,
Err(join_err) => Err(io::Error::new(io::ErrorKind::Other, join_err).into()),
}
}
fn abort(self) {
let tx = self.tx;
// If Self::write fail, then the task is already tear down,
// tx closed and no need to abort.
if !tx.is_closed() {
// Use send here because blocking_send would panic if used
// in async threads.
tokio::spawn(async move {
tx.send(Content::Abort).await.ok();
});
}
}
}
async fn extract_impl<T, S, F, E>(stream: S, f: F) -> Result<T, BinstallError>
where
T: Debug + Send + 'static,
@ -118,22 +41,6 @@ where
block_in_place(move || f(readable))
}
fn read_into_file(
file: &mut fs::File,
rx: &mut mpsc::Receiver<Content>,
) -> Result<(), BinstallError> {
while let Some(content) = rx.blocking_recv() {
match content {
Content::Data(bytes) => file.write_all(&*bytes)?,
Content::Abort => return Err(io::Error::new(io::ErrorKind::Other, "Aborted").into()),
}
}
file.flush()?;
Ok(())
}
pub async fn extract_bin<E>(
stream: impl Stream<Item = Result<Bytes, E>> + Unpin,
output: &Path,

View file

@ -1,64 +0,0 @@
use std::cmp::min;
use std::io::{self, BufRead, Read};
use bytes::{Buf, Bytes};
use tokio::sync::mpsc::Receiver;
use super::async_extracter::Content;
#[derive(Debug)]
pub(crate) struct ReadableRx {
rx: Receiver<Content>,
bytes: Bytes,
}
impl ReadableRx {
pub(crate) fn new(rx: Receiver<Content>) -> Self {
Self {
rx,
bytes: Bytes::new(),
}
}
}
impl Read for ReadableRx {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
if buf.is_empty() {
return Ok(0);
}
if self.fill_buf()?.is_empty() {
return Ok(0);
}
let bytes = &mut self.bytes;
// copy_to_slice requires the bytes to have enough remaining bytes
// to fill buf.
let n = min(buf.len(), bytes.remaining());
bytes.copy_to_slice(&mut buf[..n]);
Ok(n)
}
}
impl BufRead for ReadableRx {
fn fill_buf(&mut self) -> io::Result<&[u8]> {
let bytes = &mut self.bytes;
if !bytes.has_remaining() {
match self.rx.blocking_recv() {
Some(Content::Data(new_bytes)) => *bytes = new_bytes,
Some(Content::Abort) => {
return Err(io::Error::new(io::ErrorKind::Other, "Aborted"))
}
None => (),
}
}
Ok(&*bytes)
}
fn consume(&mut self, amt: usize) {
self.bytes.advance(amt);
}
}