Use std::sync::mpsc for request_tx in UIThreadInner

since it can be used to `send` data in sync functions that are run in
async context.

Signed-off-by: Jiahao XU <Jiahao_XU@outlook.com>
This commit is contained in:
Jiahao XU 2022-06-19 01:27:33 +10:00
parent 670bfcc1bc
commit af6a307a13
No known key found for this signature in database
GPG key ID: 591C0B03040416D6

View file

@ -1,6 +1,7 @@
use std::io::{self, BufRead, Write}; use std::io::{self, BufRead, Write};
use bytes::Bytes; use bytes::Bytes;
use std::sync::mpsc as mpsc_sync;
use tokio::sync::mpsc; use tokio::sync::mpsc;
use tokio::task::spawn_blocking; use tokio::task::spawn_blocking;
@ -21,7 +22,7 @@ pub(super) enum UIRequest {
#[derive(Debug)] #[derive(Debug)]
struct UIThreadInner { struct UIThreadInner {
/// Request for confirmation /// Request for confirmation
request_tx: mpsc::Sender<UIRequest>, request_tx: mpsc_sync::SyncSender<UIRequest>,
/// Confirmation /// Confirmation
confirm_rx: mpsc::Receiver<Result<(), BinstallError>>, confirm_rx: mpsc::Receiver<Result<(), BinstallError>>,
@ -29,7 +30,8 @@ struct UIThreadInner {
impl UIThreadInner { impl UIThreadInner {
fn new() -> Self { fn new() -> Self {
let (request_tx, mut request_rx) = mpsc::channel(1); // Set it to a large enough number so it will never block.
let (request_tx, mut request_rx) = mpsc_sync::sync_channel(50);
let (confirm_tx, confirm_rx) = mpsc::channel(10); let (confirm_tx, confirm_rx) = mpsc::channel(10);
spawn_blocking(move || { spawn_blocking(move || {
@ -41,8 +43,8 @@ impl UIThreadInner {
let mut input = String::with_capacity(16); let mut input = String::with_capacity(16);
loop { loop {
match request_rx.blocking_recv() { match request_rx.recv() {
Some(UIRequest::Confirm) => { Ok(UIRequest::Confirm) => {
let res = loop { let res = loop {
writeln!(&mut stdout, "Do you wish to continue? yes/[no]").unwrap(); writeln!(&mut stdout, "Do you wish to continue? yes/[no]").unwrap();
write!(&mut stdout, "? ").unwrap(); write!(&mut stdout, "? ").unwrap();
@ -64,10 +66,10 @@ impl UIThreadInner {
.blocking_send(res) .blocking_send(res)
.expect("entry exits when confirming request") .expect("entry exits when confirming request")
} }
Some(UIRequest::PrintToStdout(output)) => stdout.write_all(&output).unwrap(), Ok(UIRequest::PrintToStdout(output)) => stdout.write_all(&output).unwrap(),
Some(UIRequest::PrintToStderr(output)) => stderr.write_all(&output).unwrap(), Ok(UIRequest::PrintToStderr(output)) => stderr.write_all(&output).unwrap(),
Some(UIRequest::FlushStdout) => stdout.flush().unwrap(), Ok(UIRequest::FlushStdout) => stdout.flush().unwrap(),
None => break, Err(_) => break,
} }
} }
}); });
@ -81,7 +83,6 @@ impl UIThreadInner {
async fn confirm(&mut self) -> Result<(), BinstallError> { async fn confirm(&mut self) -> Result<(), BinstallError> {
self.request_tx self.request_tx
.send(UIRequest::Confirm) .send(UIRequest::Confirm)
.await
.map_err(|_| BinstallError::UserAbort)?; .map_err(|_| BinstallError::UserAbort)?;
self.confirm_rx self.confirm_rx