diff --git a/src/helpers.rs b/src/helpers.rs index 4fbc12c7..9e51f319 100644 --- a/src/helpers.rs +++ b/src/helpers.rs @@ -244,14 +244,18 @@ impl AsyncFileWriter { fs::create_dir_all(path.parent().unwrap())?; let mut file = fs::File::create(path)?; - let (tx, mut rx) = mpsc::channel::(100); + let (tx, rx) = mpsc::channel::(100); let handle = AutoAbortJoinHandle::new(task::spawn_blocking(move || { + // close rx on error so that tx.send will return an error + let mut rx = scopeguard::guard(rx, |mut rx| { + rx.close(); + }); + while let Some(bytes) = rx.blocking_recv() { file.write_all(&*bytes)?; } - rx.close(); file.flush()?; Ok(()) @@ -263,31 +267,13 @@ impl AsyncFileWriter { /// Upon error, this writer shall not be reused. /// Otherwise, `Self::done` would panic. pub async fn write(&mut self, bytes: Bytes) -> io::Result<()> { - let send_future = async { - self.tx - .send(bytes) - .await - .expect("Implementation bug: rx is closed before tx is dropped") - }; - tokio::pin!(send_future); - - let task_future = async { - Self::wait(&mut self.handle).await.map(|_| { - panic!("Implementation bug: write task finished before all writes are done") - }) - }; - tokio::pin!(task_future); - - // Use select to run them in parallel, so that if the send blocks - // the current future and the task failed with some error, the future - // returned by this function would not block forever. - tokio::select! { - // It isn't completely safe to cancel the send_future as it would - // cause us to lose our place in the queue, but if the send_future - // is cancelled, it means that the task has failed and the mpsc - // won't matter anyway. - _ = send_future => Ok(()), - res = task_future => res, + if self.tx.send(bytes).await.is_err() { + // task failed + Err(Self::wait(&mut self.handle).await.expect_err( + "Implementation bug: write task finished successfully before all writes are done", + )) + } else { + Ok(()) } }