diff --git a/src/helpers.rs b/src/helpers.rs index 9ec2bcb7..e9351147 100644 --- a/src/helpers.rs +++ b/src/helpers.rs @@ -26,6 +26,7 @@ mod auto_abort_join_handle; pub use auto_abort_join_handle::AutoAbortJoinHandle; mod extracter; +mod receiver_as_readable; /// Load binstall metadata from the crate `Cargo.toml` at the provided path pub fn load_manifest_path>( diff --git a/src/helpers/async_file_writer.rs b/src/helpers/async_file_writer.rs index 12f69133..a6bc1fc8 100644 --- a/src/helpers/async_file_writer.rs +++ b/src/helpers/async_file_writer.rs @@ -8,7 +8,7 @@ use tokio::{sync::mpsc, task::spawn_blocking}; use super::AutoAbortJoinHandle; -enum Content { +pub enum Content { /// Data to write to file Data(Bytes), diff --git a/src/helpers/receiver_as_readable.rs b/src/helpers/receiver_as_readable.rs new file mode 100644 index 00000000..02ccc344 --- /dev/null +++ b/src/helpers/receiver_as_readable.rs @@ -0,0 +1,49 @@ +use std::cmp::min; +use std::io::{self, Read}; + +use bytes::{Buf, Bytes}; +use tokio::sync::mpsc::Receiver; + +use super::async_file_writer::Content; + +#[derive(Debug)] +pub struct ReadableRx<'a> { + rx: &'a mut Receiver, + bytes: Bytes, +} + +impl<'a> ReadableRx<'a> { + pub fn new(rx: &'a mut Receiver) -> Self { + Self { + rx, + bytes: Bytes::new(), + } + } +} + +impl Read for ReadableRx<'_> { + fn read(&mut self, buf: &mut [u8]) -> io::Result { + if buf.is_empty() { + return Ok(0); + } + + let bytes = &mut self.bytes; + if !bytes.has_remaining() { + match self.rx.blocking_recv() { + Some(Content::Data(new_bytes)) => *bytes = new_bytes, + Some(Content::Abort) => { + return Err(io::Error::new(io::ErrorKind::Other, "Aborted")) + } + None => return Ok(0), + } + } + + // copy_to_slice requires the bytes to have enough remaining bytes + // to fill buf. + let n = min(buf.len(), bytes.remaining()); + + bytes.copy_to_slice(&mut buf[..n]); + + Ok(n) + } +}