Impl ReadableRx in mod receiver_as_readable

Signed-off-by: Jiahao XU <Jiahao_XU@outlook.com>
This commit is contained in:
Jiahao XU 2022-06-08 23:04:14 +10:00
parent 6bc04340b6
commit f211788052
No known key found for this signature in database
GPG key ID: 591C0B03040416D6
3 changed files with 51 additions and 1 deletions

View file

@ -26,6 +26,7 @@ mod auto_abort_join_handle;
pub use auto_abort_join_handle::AutoAbortJoinHandle; pub use auto_abort_join_handle::AutoAbortJoinHandle;
mod extracter; mod extracter;
mod receiver_as_readable;
/// Load binstall metadata from the crate `Cargo.toml` at the provided path /// Load binstall metadata from the crate `Cargo.toml` at the provided path
pub fn load_manifest_path<P: AsRef<Path>>( pub fn load_manifest_path<P: AsRef<Path>>(

View file

@ -8,7 +8,7 @@ use tokio::{sync::mpsc, task::spawn_blocking};
use super::AutoAbortJoinHandle; use super::AutoAbortJoinHandle;
enum Content { pub enum Content {
/// Data to write to file /// Data to write to file
Data(Bytes), Data(Bytes),

View file

@ -0,0 +1,49 @@
use std::cmp::min;
use std::io::{self, Read};
use bytes::{Buf, Bytes};
use tokio::sync::mpsc::Receiver;
use super::async_file_writer::Content;
#[derive(Debug)]
pub struct ReadableRx<'a> {
rx: &'a mut Receiver<Content>,
bytes: Bytes,
}
impl<'a> ReadableRx<'a> {
pub fn new(rx: &'a mut Receiver<Content>) -> Self {
Self {
rx,
bytes: Bytes::new(),
}
}
}
impl Read for ReadableRx<'_> {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
if buf.is_empty() {
return Ok(0);
}
let bytes = &mut self.bytes;
if !bytes.has_remaining() {
match self.rx.blocking_recv() {
Some(Content::Data(new_bytes)) => *bytes = new_bytes,
Some(Content::Abort) => {
return Err(io::Error::new(io::ErrorKind::Other, "Aborted"))
}
None => return Ok(0),
}
}
// copy_to_slice requires the bytes to have enough remaining bytes
// to fill buf.
let n = min(buf.len(), bytes.remaining());
bytes.copy_to_slice(&mut buf[..n]);
Ok(n)
}
}