Merge pull request #179 from NobodyXu/feature/improve-readable-rx

This commit is contained in:
Félix Saparelli 2022-06-11 20:11:33 +12:00 committed by GitHub
commit bd4cc85386
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 30 additions and 15 deletions

View file

@ -1,11 +1,11 @@
use std::fs::{self, File}; use std::fs::{self, File};
use std::io::Read; use std::io::{BufRead, Read};
use std::path::Path; use std::path::Path;
use flate2::read::GzDecoder; use flate2::bufread::GzDecoder;
use log::debug; use log::debug;
use tar::Archive; use tar::Archive;
use xz2::read::XzDecoder; use xz2::bufread::XzDecoder;
use zip::read::ZipArchive; use zip::read::ZipArchive;
use zstd::stream::Decoder as ZstdDecoder; use zstd::stream::Decoder as ZstdDecoder;
@ -55,7 +55,7 @@ fn untar<Filter: FnMut(&Path) -> bool>(
/// Note that this is a best-effort and it only works when `fmt` /// Note that this is a best-effort and it only works when `fmt`
/// is not `PkgFmt::Bin` or `PkgFmt::Zip`. /// is not `PkgFmt::Bin` or `PkgFmt::Zip`.
pub(crate) fn extract_compressed_from_readable<Filter: FnMut(&Path) -> bool>( pub(crate) fn extract_compressed_from_readable<Filter: FnMut(&Path) -> bool>(
dat: impl Read, dat: impl BufRead,
fmt: PkgFmt, fmt: PkgFmt,
path: &Path, path: &Path,
filter: Option<Filter>, filter: Option<Filter>,
@ -89,7 +89,7 @@ pub(crate) fn extract_compressed_from_readable<Filter: FnMut(&Path) -> bool>(
// as of zstd 0.10.2 and 0.11.2, which is specified // as of zstd 0.10.2 and 0.11.2, which is specified
// as &[] by ZstdDecoder::new, thus ZstdDecoder::new // as &[] by ZstdDecoder::new, thus ZstdDecoder::new
// should not return any error. // should not return any error.
let tar = ZstdDecoder::new(dat)?; let tar = ZstdDecoder::with_buffer(dat)?;
untar(tar, path, filter)?; untar(tar, path, filter)?;
} }
PkgFmt::Zip => panic!("Unexpected PkgFmt::Zip!"), PkgFmt::Zip => panic!("Unexpected PkgFmt::Zip!"),

View file

@ -1,5 +1,5 @@
use std::cmp::min; use std::cmp::min;
use std::io::{self, Read}; use std::io::{self, BufRead, Read};
use bytes::{Buf, Bytes}; use bytes::{Buf, Bytes};
use tokio::sync::mpsc::Receiver; use tokio::sync::mpsc::Receiver;
@ -27,17 +27,12 @@ impl Read for ReadableRx<'_> {
return Ok(0); return Ok(0);
} }
let bytes = &mut self.bytes; if self.fill_buf()?.is_empty() {
if !bytes.has_remaining() { return Ok(0);
match self.rx.blocking_recv() {
Some(Content::Data(new_bytes)) => *bytes = new_bytes,
Some(Content::Abort) => {
return Err(io::Error::new(io::ErrorKind::Other, "Aborted"))
}
None => return Ok(0),
}
} }
let bytes = &mut self.bytes;
// copy_to_slice requires the bytes to have enough remaining bytes // copy_to_slice requires the bytes to have enough remaining bytes
// to fill buf. // to fill buf.
let n = min(buf.len(), bytes.remaining()); let n = min(buf.len(), bytes.remaining());
@ -47,3 +42,23 @@ impl Read for ReadableRx<'_> {
Ok(n) Ok(n)
} }
} }
impl BufRead for ReadableRx<'_> {
fn fill_buf(&mut self) -> io::Result<&[u8]> {
let bytes = &mut self.bytes;
if !bytes.has_remaining() {
match self.rx.blocking_recv() {
Some(Content::Data(new_bytes)) => *bytes = new_bytes,
Some(Content::Abort) => {
return Err(io::Error::new(io::ErrorKind::Other, "Aborted"))
}
None => (),
}
}
Ok(&*bytes)
}
fn consume(&mut self, amt: usize) {
self.bytes.advance(amt);
}
}