From 621a64152971061e39eadb9832d9bfd43e5b0043 Mon Sep 17 00:00:00 2001 From: Jiahao XU Date: Sat, 18 Jun 2022 18:10:22 +1000 Subject: [PATCH] Rm unused items in mod `helpers` Signed-off-by: Jiahao XU --- src/helpers.rs | 2 - src/helpers/async_extracter.rs | 101 ++------------------------------- src/helpers/readable_rx.rs | 64 --------------------- 3 files changed, 4 insertions(+), 163 deletions(-) delete mode 100644 src/helpers/readable_rx.rs diff --git a/src/helpers.rs b/src/helpers.rs index 4a30d986..b2d252e9 100644 --- a/src/helpers.rs +++ b/src/helpers.rs @@ -24,8 +24,6 @@ pub use ui_thread::UIThread; mod extracter; mod stream_readable; -mod readable_rx; - mod path_ext; pub use path_ext::*; diff --git a/src/helpers/async_extracter.rs b/src/helpers/async_extracter.rs index d37d1db0..feec5d06 100644 --- a/src/helpers/async_extracter.rs +++ b/src/helpers/async_extracter.rs @@ -16,97 +16,20 @@ use std::fmt::Debug; use std::fs; -use std::io::{self, copy, Read, Seek, Write}; +use std::io::{copy, Read, Seek}; use std::path::Path; use bytes::Bytes; -use futures_util::stream::{Stream, StreamExt}; +use futures_util::stream::Stream; use log::debug; use scopeguard::{guard, ScopeGuard}; use tar::Entries; use tempfile::tempfile; -use tokio::{ - sync::mpsc, - task::{block_in_place, spawn_blocking, JoinHandle}, -}; +use tokio::task::block_in_place; -use super::{extracter::*, readable_rx::*, stream_readable::StreamReadable}; +use super::{extracter::*, stream_readable::StreamReadable}; use crate::{BinstallError, TarBasedFmt}; -pub(crate) enum Content { - /// Data to write to file - Data(Bytes), - - /// Abort the writing and remove the file. - Abort, -} - -/// AsyncExtracter will pass the `Bytes` you give to another thread via -/// a `mpsc` and decompress and unpack it if needed. -/// -/// After all write is done, you must call `AsyncExtracter::done`, -/// otherwise the extracted content will be removed on drop. -#[derive(Debug)] -struct AsyncExtracterInner { - /// Use AutoAbortJoinHandle so that the task - /// will be cancelled on failure. - handle: JoinHandle>, - tx: mpsc::Sender, -} - -impl AsyncExtracterInner { - fn new) -> Result + Send + 'static>( - f: F, - ) -> Self { - let (tx, rx) = mpsc::channel::(100); - - let handle = spawn_blocking(move || f(rx)); - - Self { handle, tx } - } - - /// Upon error, this extracter shall not be reused. - /// Otherwise, `Self::done` would panic. - async fn feed(&mut self, bytes: Bytes) -> Result<(), BinstallError> { - if self.tx.send(Content::Data(bytes)).await.is_err() { - // task failed - Err(Self::wait(&mut self.handle).await.expect_err( - "Implementation bug: write task finished successfully before all writes are done", - )) - } else { - Ok(()) - } - } - - async fn done(mut self) -> Result { - // Drop tx as soon as possible so that the task would wrap up what it - // was doing and flush out all the pending data. - drop(self.tx); - - Self::wait(&mut self.handle).await - } - - async fn wait(handle: &mut JoinHandle>) -> Result { - match handle.await { - Ok(res) => res, - Err(join_err) => Err(io::Error::new(io::ErrorKind::Other, join_err).into()), - } - } - - fn abort(self) { - let tx = self.tx; - // If Self::write fail, then the task is already tear down, - // tx closed and no need to abort. - if !tx.is_closed() { - // Use send here because blocking_send would panic if used - // in async threads. - tokio::spawn(async move { - tx.send(Content::Abort).await.ok(); - }); - } - } -} - async fn extract_impl(stream: S, f: F) -> Result where T: Debug + Send + 'static, @@ -118,22 +41,6 @@ where block_in_place(move || f(readable)) } -fn read_into_file( - file: &mut fs::File, - rx: &mut mpsc::Receiver, -) -> Result<(), BinstallError> { - while let Some(content) = rx.blocking_recv() { - match content { - Content::Data(bytes) => file.write_all(&*bytes)?, - Content::Abort => return Err(io::Error::new(io::ErrorKind::Other, "Aborted").into()), - } - } - - file.flush()?; - - Ok(()) -} - pub async fn extract_bin( stream: impl Stream> + Unpin, output: &Path, diff --git a/src/helpers/readable_rx.rs b/src/helpers/readable_rx.rs deleted file mode 100644 index e2597aae..00000000 --- a/src/helpers/readable_rx.rs +++ /dev/null @@ -1,64 +0,0 @@ -use std::cmp::min; -use std::io::{self, BufRead, Read}; - -use bytes::{Buf, Bytes}; -use tokio::sync::mpsc::Receiver; - -use super::async_extracter::Content; - -#[derive(Debug)] -pub(crate) struct ReadableRx { - rx: Receiver, - bytes: Bytes, -} - -impl ReadableRx { - pub(crate) fn new(rx: Receiver) -> Self { - Self { - rx, - bytes: Bytes::new(), - } - } -} - -impl Read for ReadableRx { - fn read(&mut self, buf: &mut [u8]) -> io::Result { - if buf.is_empty() { - return Ok(0); - } - - if self.fill_buf()?.is_empty() { - return Ok(0); - } - - let bytes = &mut self.bytes; - - // copy_to_slice requires the bytes to have enough remaining bytes - // to fill buf. - let n = min(buf.len(), bytes.remaining()); - - bytes.copy_to_slice(&mut buf[..n]); - - Ok(n) - } -} - -impl BufRead for ReadableRx { - fn fill_buf(&mut self) -> io::Result<&[u8]> { - let bytes = &mut self.bytes; - if !bytes.has_remaining() { - match self.rx.blocking_recv() { - Some(Content::Data(new_bytes)) => *bytes = new_bytes, - Some(Content::Abort) => { - return Err(io::Error::new(io::ErrorKind::Other, "Aborted")) - } - None => (), - } - } - Ok(&*bytes) - } - - fn consume(&mut self, amt: usize) { - self.bytes.advance(amt); - } -}