diff --git a/src/helpers/extracter.rs b/src/helpers/extracter.rs index fbb9d5c0..e1c91cef 100644 --- a/src/helpers/extracter.rs +++ b/src/helpers/extracter.rs @@ -1,11 +1,11 @@ use std::fs::{self, File}; -use std::io::Read; +use std::io::{BufRead, Read}; use std::path::Path; -use flate2::read::GzDecoder; +use flate2::bufread::GzDecoder; use log::debug; use tar::Archive; -use xz2::read::XzDecoder; +use xz2::bufread::XzDecoder; use zip::read::ZipArchive; use zstd::stream::Decoder as ZstdDecoder; @@ -55,7 +55,7 @@ fn untar bool>( /// Note that this is a best-effort and it only works when `fmt` /// is not `PkgFmt::Bin` or `PkgFmt::Zip`. pub(crate) fn extract_compressed_from_readable bool>( - dat: impl Read, + dat: impl BufRead, fmt: PkgFmt, path: &Path, filter: Option, @@ -89,7 +89,7 @@ pub(crate) fn extract_compressed_from_readable bool>( // as of zstd 0.10.2 and 0.11.2, which is specified // as &[] by ZstdDecoder::new, thus ZstdDecoder::new // should not return any error. - let tar = ZstdDecoder::new(dat)?; + let tar = ZstdDecoder::with_buffer(dat)?; untar(tar, path, filter)?; } PkgFmt::Zip => panic!("Unexpected PkgFmt::Zip!"), diff --git a/src/helpers/readable_rx.rs b/src/helpers/readable_rx.rs index 15aa1300..545bc176 100644 --- a/src/helpers/readable_rx.rs +++ b/src/helpers/readable_rx.rs @@ -1,5 +1,5 @@ use std::cmp::min; -use std::io::{self, Read}; +use std::io::{self, BufRead, Read}; use bytes::{Buf, Bytes}; use tokio::sync::mpsc::Receiver; @@ -27,17 +27,12 @@ impl Read for ReadableRx<'_> { return Ok(0); } - let bytes = &mut self.bytes; - if !bytes.has_remaining() { - match self.rx.blocking_recv() { - Some(Content::Data(new_bytes)) => *bytes = new_bytes, - Some(Content::Abort) => { - return Err(io::Error::new(io::ErrorKind::Other, "Aborted")) - } - None => return Ok(0), - } + if self.fill_buf()?.is_empty() { + return Ok(0); } + let bytes = &mut self.bytes; + // copy_to_slice requires the bytes to have enough remaining bytes // to fill buf. let n = min(buf.len(), bytes.remaining()); @@ -47,3 +42,23 @@ impl Read for ReadableRx<'_> { Ok(n) } } + +impl BufRead for ReadableRx<'_> { + fn fill_buf(&mut self) -> io::Result<&[u8]> { + let bytes = &mut self.bytes; + if !bytes.has_remaining() { + match self.rx.blocking_recv() { + Some(Content::Data(new_bytes)) => *bytes = new_bytes, + Some(Content::Abort) => { + return Err(io::Error::new(io::ErrorKind::Other, "Aborted")) + } + None => (), + } + } + Ok(&*bytes) + } + + fn consume(&mut self, amt: usize) { + self.bytes.advance(amt); + } +}