Simplify AsyncFileWriter::write by closing rx on err

Signed-off-by: Jiahao XU <Jiahao_XU@outlook.com>
This commit is contained in:
Jiahao XU 2022-06-08 21:38:05 +10:00
parent d9bcca8b78
commit e584b99240
No known key found for this signature in database
GPG key ID: 591C0B03040416D6

View file

@ -244,14 +244,18 @@ impl AsyncFileWriter {
fs::create_dir_all(path.parent().unwrap())?; fs::create_dir_all(path.parent().unwrap())?;
let mut file = fs::File::create(path)?; let mut file = fs::File::create(path)?;
let (tx, mut rx) = mpsc::channel::<Bytes>(100); let (tx, rx) = mpsc::channel::<Bytes>(100);
let handle = AutoAbortJoinHandle::new(task::spawn_blocking(move || { let handle = AutoAbortJoinHandle::new(task::spawn_blocking(move || {
// close rx on error so that tx.send will return an error
let mut rx = scopeguard::guard(rx, |mut rx| {
rx.close();
});
while let Some(bytes) = rx.blocking_recv() { while let Some(bytes) = rx.blocking_recv() {
file.write_all(&*bytes)?; file.write_all(&*bytes)?;
} }
rx.close();
file.flush()?; file.flush()?;
Ok(()) Ok(())
@ -263,31 +267,13 @@ impl AsyncFileWriter {
/// Upon error, this writer shall not be reused. /// Upon error, this writer shall not be reused.
/// Otherwise, `Self::done` would panic. /// Otherwise, `Self::done` would panic.
pub async fn write(&mut self, bytes: Bytes) -> io::Result<()> { pub async fn write(&mut self, bytes: Bytes) -> io::Result<()> {
let send_future = async { if self.tx.send(bytes).await.is_err() {
self.tx // task failed
.send(bytes) Err(Self::wait(&mut self.handle).await.expect_err(
.await "Implementation bug: write task finished successfully before all writes are done",
.expect("Implementation bug: rx is closed before tx is dropped") ))
}; } else {
tokio::pin!(send_future); Ok(())
let task_future = async {
Self::wait(&mut self.handle).await.map(|_| {
panic!("Implementation bug: write task finished before all writes are done")
})
};
tokio::pin!(task_future);
// Use select to run them in parallel, so that if the send blocks
// the current future and the task failed with some error, the future
// returned by this function would not block forever.
tokio::select! {
// It isn't completely safe to cancel the send_future as it would
// cause us to lose our place in the queue, but if the send_future
// is cancelled, it means that the task has failed and the mpsc
// won't matter anyway.
_ = send_future => Ok(()),
res = task_future => res,
} }
} }