FuturesResolver: Fallback to other future if one error

This commit is contained in:
Jiahao XU 2024-06-22 05:27:21 +00:00
parent 658a71bc3c
commit e089ecd6cf
2 changed files with 15 additions and 5 deletions

View file

@ -1,4 +1,6 @@
use std::{future::Future, pin::Pin}; use std::{future::Future, pin::Pin, fmt::Debug};
use tracing::warn;
use tokio::sync::mpsc; use tokio::sync::mpsc;
/// Given multiple futures with output = `Result<Option<T>, E>`, /// Given multiple futures with output = `Result<Option<T>, E>`,
@ -17,7 +19,7 @@ impl<T, E> Default for FuturesResolver<T, E> {
} }
} }
impl<T: Send + 'static, E: Send + 'static> FuturesResolver<T, E> { impl<T: Send + 'static, E: Send + Debug + 'static> FuturesResolver<T, E> {
/// Insert new future into this resolver, they will start running /// Insert new future into this resolver, they will start running
/// right away. /// right away.
pub fn push<Fut>(&self, fut: Fut) pub fn push<Fut>(&self, fut: Fut)
@ -67,10 +69,18 @@ impl<T: Send + 'static, E: Send + 'static> FuturesResolver<T, E> {
} }
/// Return the resolution. /// Return the resolution.
pub fn resolve(self) -> impl Future<Output = Result<Option<T>, E>> { pub fn resolve(self) -> impl Future<Output = Option<T>> {
let mut rx = self.rx; let mut rx = self.rx;
drop(self.tx); drop(self.tx);
async move { rx.recv().await.transpose() } async move {
while let Some(res) = rx.recv().await {
match res {
Ok(ret) => return Some(ret),
Err(err) => warn!(?err, "Faile to resolve the future"),
}
}
None
}
} }
} }

View file

@ -278,7 +278,7 @@ impl super::Fetcher for GhCrateMeta {
} }
} }
if let Some(resolved) = resolver.resolve().await? { if let Some(resolved) = resolver.resolve().await {
debug!(?resolved, "Winning URL found!"); debug!(?resolved, "Winning URL found!");
self.resolution self.resolution
.set(resolved) .set(resolved)