Impl BufRead for ReadableRx

Signed-off-by: Jiahao XU <Jiahao_XU@outlook.com>
This commit is contained in:
Jiahao XU 2022-06-10 19:49:09 +10:00
parent f53680c497
commit e753c9ec30
No known key found for this signature in database
GPG key ID: 591C0B03040416D6

View file

@ -1,5 +1,5 @@
use std::cmp::min;
use std::io::{self, Read};
use std::io::{self, BufRead, Read};
use bytes::{Buf, Bytes};
use tokio::sync::mpsc::Receiver;
@ -27,17 +27,12 @@ impl Read for ReadableRx<'_> {
return Ok(0);
}
let bytes = &mut self.bytes;
if !bytes.has_remaining() {
match self.rx.blocking_recv() {
Some(Content::Data(new_bytes)) => *bytes = new_bytes,
Some(Content::Abort) => {
return Err(io::Error::new(io::ErrorKind::Other, "Aborted"))
}
None => return Ok(0),
}
if self.fill_buf()?.is_empty() {
return Ok(0);
}
let bytes = &mut self.bytes;
// copy_to_slice requires the bytes to have enough remaining bytes
// to fill buf.
let n = min(buf.len(), bytes.remaining());
@ -47,3 +42,23 @@ impl Read for ReadableRx<'_> {
Ok(n)
}
}
impl BufRead for ReadableRx<'_> {
fn fill_buf(&mut self) -> io::Result<&[u8]> {
let bytes = &mut self.bytes;
if !bytes.has_remaining() {
match self.rx.blocking_recv() {
Some(Content::Data(new_bytes)) => *bytes = new_bytes,
Some(Content::Abort) => {
return Err(io::Error::new(io::ErrorKind::Other, "Aborted"))
}
None => (),
}
}
Ok(&*bytes)
}
fn consume(&mut self, amt: usize) {
self.bytes.advance(amt);
}
}