Run downloader & extracter on the same thread

This have the following advantage:
 - Remove the mpsc channel, which:
    - Remove synchronization required for mpsc.
    - Remove the internal buffering of the mpsc channel, which avoid potentially OOM situation.
 - Improve data locality since it no longer needs to be sent over thread.
 - It uses `block_in_place` to avoid creating an additional blocking
   thread.

The disadvantages would be that the downloader can no longer be run in parallel to the extracter.

If the bottleneck is the decompressor, then the downloader should also pause and wait
for the decompressor to consume the data.

But if the bottleneck is the network, then that might be an issue.

Signed-off-by: Jiahao XU <Jiahao_XU@outlook.com>
This commit is contained in:
Jiahao XU 2022-06-18 18:07:46 +10:00
parent aba1ba7b6d
commit c15d99c6f0
No known key found for this signature in database
GPG key ID: 591C0B03040416D6

View file

@ -16,7 +16,7 @@
use std::fmt::Debug;
use std::fs;
use std::io::{self, Read, Seek, Write};
use std::io::{self, copy, Read, Seek, Write};
use std::path::Path;
use bytes::Bytes;
@ -27,10 +27,10 @@ use tar::Entries;
use tempfile::tempfile;
use tokio::{
sync::mpsc,
task::{spawn_blocking, JoinHandle},
task::{block_in_place, spawn_blocking, JoinHandle},
};
use super::{extracter::*, readable_rx::*};
use super::{extracter::*, readable_rx::*, stream_readable::StreamReadable};
use crate::{BinstallError, TarBasedFmt};
pub(crate) enum Content {
@ -107,20 +107,15 @@ impl<T: Debug + Send + 'static> AsyncExtracterInner<T> {
}
}
async fn extract_impl<T: Debug + Send + 'static, S: Stream<Item = Result<Bytes, E>> + Unpin, E>(
mut stream: S,
f: Box<dyn FnOnce(mpsc::Receiver<Content>) -> Result<T, BinstallError> + Send>,
) -> Result<T, BinstallError>
async fn extract_impl<T, S, F, E>(stream: S, f: F) -> Result<T, BinstallError>
where
T: Debug + Send + 'static,
S: Stream<Item = Result<Bytes, E>> + Unpin,
F: FnOnce(StreamReadable<S>) -> Result<T, BinstallError>,
BinstallError: From<E>,
{
let mut extracter = guard(AsyncExtracterInner::new(f), AsyncExtracterInner::abort);
while let Some(res) = stream.next().await {
extracter.feed(res?).await?;
}
ScopeGuard::into_inner(extracter).done().await
let readable = StreamReadable::new(stream).await;
block_in_place(move || f(readable))
}
fn read_into_file(
@ -148,28 +143,25 @@ where
{
let path = output.to_owned();
extract_impl(
stream,
Box::new(move |mut rx| {
fs::create_dir_all(path.parent().unwrap())?;
extract_impl(stream, move |mut reader| {
fs::create_dir_all(path.parent().unwrap())?;
let mut file = fs::File::create(&path)?;
let mut file = fs::File::create(&path)?;
// remove it unless the operation isn't aborted and no write
// fails.
let remove_guard = guard(&path, |path| {
fs::remove_file(path).ok();
});
// remove it unless the operation isn't aborted and no write
// fails.
let remove_guard = guard(&path, |path| {
fs::remove_file(path).ok();
});
read_into_file(&mut file, &mut rx)?;
copy(&mut reader, &mut file)?;
// Operation isn't aborted and all writes succeed,
// disarm the remove_guard.
ScopeGuard::into_inner(remove_guard);
// Operation isn't aborted and all writes succeed,
// disarm the remove_guard.
ScopeGuard::into_inner(remove_guard);
Ok(())
}),
)
Ok(())
})
.await
}
@ -182,26 +174,23 @@ where
{
let path = output.to_owned();
extract_impl(
stream,
Box::new(move |mut rx| {
fs::create_dir_all(path.parent().unwrap())?;
extract_impl(stream, move |mut reader| {
fs::create_dir_all(path.parent().unwrap())?;
let mut file = tempfile()?;
let mut file = tempfile()?;
read_into_file(&mut file, &mut rx)?;
copy(&mut reader, &mut file)?;
// rewind it so that we can pass it to unzip
file.rewind()?;
// rewind it so that we can pass it to unzip
file.rewind()?;
unzip(file, &path)
}),
)
unzip(file, &path)
})
.await
}
pub async fn extract_tar_based_stream<E>(
stream: impl Stream<Item = Result<Bytes, E>> + Unpin,
stream: impl Stream<Item = Result<Bytes, E>> + Unpin + 'static,
output: &Path,
fmt: TarBasedFmt,
) -> Result<(), BinstallError>
@ -210,17 +199,15 @@ where
{
let path = output.to_owned();
extract_impl(
stream,
Box::new(move |rx| {
fs::create_dir_all(path.parent().unwrap())?;
extract_impl(stream, move |reader| {
fs::create_dir_all(path.parent().unwrap())?;
debug!("Extracting from {fmt} archive to {path:#?}");
create_tar_decoder(ReadableRx::new(rx), fmt)?.unpack(path)?;
debug!("Extracting from {fmt} archive to {path:#?}");
Ok(())
}),
)
create_tar_decoder(reader, fmt)?.unpack(path)?;
Ok(())
})
.await
}
@ -237,22 +224,19 @@ impl<V: TarEntriesVisitor> TarEntriesVisitor for &mut V {
}
pub async fn extract_tar_based_stream_and_visit<V: TarEntriesVisitor + Debug + Send + 'static, E>(
stream: impl Stream<Item = Result<Bytes, E>> + Unpin,
stream: impl Stream<Item = Result<Bytes, E>> + Unpin + 'static,
fmt: TarBasedFmt,
mut visitor: V,
) -> Result<V, BinstallError>
where
BinstallError: From<E>,
{
extract_impl(
stream,
Box::new(move |rx| {
debug!("Extracting from {fmt} archive to process it in memory");
extract_impl(stream, move |reader| {
debug!("Extracting from {fmt} archive to process it in memory");
let mut tar = create_tar_decoder(ReadableRx::new(rx), fmt)?;
visitor.visit(tar.entries()?)?;
Ok(visitor)
}),
)
let mut tar = create_tar_decoder(reader, fmt)?;
visitor.visit(tar.entries()?)?;
Ok(visitor)
})
.await
}