Use BinstallError in AsyncFileWriter

Signed-off-by: Jiahao XU <Jiahao_XU@outlook.com>
This commit is contained in:
Jiahao XU 2022-06-09 01:01:37 +10:00
parent f211788052
commit 59544e8b55
No known key found for this signature in database
GPG key ID: 591C0B03040416D6

View file

@ -7,6 +7,7 @@ use scopeguard::{guard, Always, ScopeGuard};
use tokio::{sync::mpsc, task::spawn_blocking}; use tokio::{sync::mpsc, task::spawn_blocking};
use super::AutoAbortJoinHandle; use super::AutoAbortJoinHandle;
use crate::{BinstallError, PkgFmt};
pub enum Content { pub enum Content {
/// Data to write to file /// Data to write to file
@ -20,7 +21,7 @@ pub enum Content {
struct AsyncFileWriterInner { struct AsyncFileWriterInner {
/// Use AutoAbortJoinHandle so that the task /// Use AutoAbortJoinHandle so that the task
/// will be cancelled on failure. /// will be cancelled on failure.
handle: AutoAbortJoinHandle<io::Result<()>>, handle: AutoAbortJoinHandle<Result<(), BinstallError>>,
tx: mpsc::Sender<Content>, tx: mpsc::Sender<Content>,
} }
@ -47,7 +48,9 @@ impl AsyncFileWriterInner {
while let Some(content) = rx.blocking_recv() { while let Some(content) = rx.blocking_recv() {
match content { match content {
Content::Data(bytes) => file.write_all(&*bytes)?, Content::Data(bytes) => file.write_all(&*bytes)?,
Content::Abort => return Err(io::Error::new(io::ErrorKind::Other, "Aborted")), Content::Abort => {
return Err(io::Error::new(io::ErrorKind::Other, "Aborted").into())
}
} }
} }
@ -65,7 +68,7 @@ impl AsyncFileWriterInner {
/// Upon error, this writer shall not be reused. /// Upon error, this writer shall not be reused.
/// Otherwise, `Self::done` would panic. /// Otherwise, `Self::done` would panic.
async fn write(&mut self, bytes: Bytes) -> io::Result<()> { async fn write(&mut self, bytes: Bytes) -> Result<(), BinstallError> {
if self.tx.send(Content::Data(bytes)).await.is_err() { if self.tx.send(Content::Data(bytes)).await.is_err() {
// task failed // task failed
Err(Self::wait(&mut self.handle).await.expect_err( Err(Self::wait(&mut self.handle).await.expect_err(
@ -76,7 +79,7 @@ impl AsyncFileWriterInner {
} }
} }
async fn done(mut self) -> io::Result<()> { async fn done(mut self) -> Result<(), BinstallError> {
// Drop tx as soon as possible so that the task would wrap up what it // Drop tx as soon as possible so that the task would wrap up what it
// was doing and flush out all the pending data. // was doing and flush out all the pending data.
drop(self.tx); drop(self.tx);
@ -84,10 +87,12 @@ impl AsyncFileWriterInner {
Self::wait(&mut self.handle).await Self::wait(&mut self.handle).await
} }
async fn wait(handle: &mut AutoAbortJoinHandle<io::Result<()>>) -> io::Result<()> { async fn wait(
handle: &mut AutoAbortJoinHandle<Result<(), BinstallError>>,
) -> Result<(), BinstallError> {
match handle.await { match handle.await {
Ok(res) => res, Ok(res) => res,
Err(join_err) => Err(io::Error::new(io::ErrorKind::Other, join_err)), Err(join_err) => Err(io::Error::new(io::ErrorKind::Other, join_err).into()),
} }
} }
@ -117,11 +122,11 @@ impl AsyncFileWriter {
/// Upon error, this writer shall not be reused. /// Upon error, this writer shall not be reused.
/// Otherwise, `Self::done` would panic. /// Otherwise, `Self::done` would panic.
pub async fn write(&mut self, bytes: Bytes) -> io::Result<()> { pub async fn write(&mut self, bytes: Bytes) -> Result<(), BinstallError> {
self.0.write(bytes).await self.0.write(bytes).await
} }
pub async fn done(self) -> io::Result<()> { pub async fn done(self) -> Result<(), BinstallError> {
ScopeGuard::into_inner(self.0).done().await ScopeGuard::into_inner(self.0).done().await
} }
} }