Impl helpers::AsyncFileWriter

Signed-off-by: Jiahao XU <Jiahao_XU@outlook.com>
This commit is contained in:
Jiahao XU 2022-06-08 20:03:58 +10:00
parent 570febdaad
commit 5fdeea86ad
No known key found for this signature in database
GPG key ID: 591C0B03040416D6

View file

@ -1,9 +1,10 @@
use std::{
fs,
io::{stderr, stdin, Write},
io::{self, stderr, stdin, Write},
path::{Path, PathBuf},
};
use bytes::Bytes;
use cargo_toml::Manifest;
use flate2::read::GzDecoder;
use log::{debug, info};
@ -11,6 +12,7 @@ use reqwest::Method;
use serde::Serialize;
use tar::Archive;
use tinytemplate::TinyTemplate;
use tokio::{sync::mpsc, task};
use url::Url;
use xz2::read::XzDecoder;
use zip::read::ZipArchive;
@ -210,3 +212,49 @@ pub trait Template: Serialize {
Ok(tt.render("path", self)?)
}
}
#[derive(Debug)]
pub struct AsyncFileWriter {
handle: task::JoinHandle<io::Result<()>>,
tx: mpsc::Sender<Bytes>,
}
impl AsyncFileWriter {
pub fn new(path: &Path) -> io::Result<Self> {
fs::create_dir_all(path.parent().unwrap())?;
let mut file = fs::File::create(path)?;
let (tx, mut rx) = mpsc::channel::<Bytes>(100);
let handle = task::spawn_blocking(move || {
while let Some(bytes) = rx.blocking_recv() {
file.write_all(&*bytes)?;
}
rx.close();
file.flush()?;
Ok(())
});
Ok(Self { handle, tx })
}
pub async fn write(&self, bytes: Bytes) {
self.tx
.send(bytes)
.await
.expect("Implementation bug: rx is closed before tx is dropped")
}
pub async fn done(self) -> io::Result<()> {
// Drop tx as soon as possible so that the task would wrap up what it
// was doing and flush out all the pending data.
drop(self.tx);
match self.handle.await {
Ok(res) => res,
Err(join_err) => Err(io::Error::new(io::ErrorKind::Other, join_err)),
}
}
}