diff --git a/src/helpers/readable_rx.rs b/src/helpers/readable_rx.rs index 15aa1300..545bc176 100644 --- a/src/helpers/readable_rx.rs +++ b/src/helpers/readable_rx.rs @@ -1,5 +1,5 @@ use std::cmp::min; -use std::io::{self, Read}; +use std::io::{self, BufRead, Read}; use bytes::{Buf, Bytes}; use tokio::sync::mpsc::Receiver; @@ -27,17 +27,12 @@ impl Read for ReadableRx<'_> { return Ok(0); } - let bytes = &mut self.bytes; - if !bytes.has_remaining() { - match self.rx.blocking_recv() { - Some(Content::Data(new_bytes)) => *bytes = new_bytes, - Some(Content::Abort) => { - return Err(io::Error::new(io::ErrorKind::Other, "Aborted")) - } - None => return Ok(0), - } + if self.fill_buf()?.is_empty() { + return Ok(0); } + let bytes = &mut self.bytes; + // copy_to_slice requires the bytes to have enough remaining bytes // to fill buf. let n = min(buf.len(), bytes.remaining()); @@ -47,3 +42,23 @@ impl Read for ReadableRx<'_> { Ok(n) } } + +impl BufRead for ReadableRx<'_> { + fn fill_buf(&mut self) -> io::Result<&[u8]> { + let bytes = &mut self.bytes; + if !bytes.has_remaining() { + match self.rx.blocking_recv() { + Some(Content::Data(new_bytes)) => *bytes = new_bytes, + Some(Content::Abort) => { + return Err(io::Error::new(io::ErrorKind::Other, "Aborted")) + } + None => (), + } + } + Ok(&*bytes) + } + + fn consume(&mut self, amt: usize) { + self.bytes.advance(amt); + } +}