From 5fdeea86ad45477d7470a23520b96e02b4b6c679 Mon Sep 17 00:00:00 2001 From: Jiahao XU Date: Wed, 8 Jun 2022 20:03:58 +1000 Subject: [PATCH] Impl `helpers::AsyncFileWriter` Signed-off-by: Jiahao XU --- src/helpers.rs | 50 +++++++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 49 insertions(+), 1 deletion(-) diff --git a/src/helpers.rs b/src/helpers.rs index 27b22551..55e7a8df 100644 --- a/src/helpers.rs +++ b/src/helpers.rs @@ -1,9 +1,10 @@ use std::{ fs, - io::{stderr, stdin, Write}, + io::{self, stderr, stdin, Write}, path::{Path, PathBuf}, }; +use bytes::Bytes; use cargo_toml::Manifest; use flate2::read::GzDecoder; use log::{debug, info}; @@ -11,6 +12,7 @@ use reqwest::Method; use serde::Serialize; use tar::Archive; use tinytemplate::TinyTemplate; +use tokio::{sync::mpsc, task}; use url::Url; use xz2::read::XzDecoder; use zip::read::ZipArchive; @@ -210,3 +212,49 @@ pub trait Template: Serialize { Ok(tt.render("path", self)?) } } + +#[derive(Debug)] +pub struct AsyncFileWriter { + handle: task::JoinHandle>, + tx: mpsc::Sender, +} + +impl AsyncFileWriter { + pub fn new(path: &Path) -> io::Result { + fs::create_dir_all(path.parent().unwrap())?; + + let mut file = fs::File::create(path)?; + let (tx, mut rx) = mpsc::channel::(100); + + let handle = task::spawn_blocking(move || { + while let Some(bytes) = rx.blocking_recv() { + file.write_all(&*bytes)?; + } + + rx.close(); + file.flush()?; + + Ok(()) + }); + + Ok(Self { handle, tx }) + } + + pub async fn write(&self, bytes: Bytes) { + self.tx + .send(bytes) + .await + .expect("Implementation bug: rx is closed before tx is dropped") + } + + pub async fn done(self) -> io::Result<()> { + // Drop tx as soon as possible so that the task would wrap up what it + // was doing and flush out all the pending data. + drop(self.tx); + + match self.handle.await { + Ok(res) => res, + Err(join_err) => Err(io::Error::new(io::ErrorKind::Other, join_err)), + } + } +}