From e089ecd6cf3fb4493ca25fd08275973d2932d99b Mon Sep 17 00:00:00 2001 From: Jiahao XU Date: Sat, 22 Jun 2024 05:27:21 +0000 Subject: [PATCH] FuturesResolver: Fallback to other future if one error --- .../binstalk-fetchers/src/futures_resolver.rs | 18 ++++++++++++++---- crates/binstalk-fetchers/src/gh_crate_meta.rs | 2 +- 2 files changed, 15 insertions(+), 5 deletions(-) diff --git a/crates/binstalk-fetchers/src/futures_resolver.rs b/crates/binstalk-fetchers/src/futures_resolver.rs index 0a550519..737a8fc1 100644 --- a/crates/binstalk-fetchers/src/futures_resolver.rs +++ b/crates/binstalk-fetchers/src/futures_resolver.rs @@ -1,4 +1,6 @@ -use std::{future::Future, pin::Pin}; +use std::{future::Future, pin::Pin, fmt::Debug}; + +use tracing::warn; use tokio::sync::mpsc; /// Given multiple futures with output = `Result, E>`, @@ -17,7 +19,7 @@ impl Default for FuturesResolver { } } -impl FuturesResolver { +impl FuturesResolver { /// Insert new future into this resolver, they will start running /// right away. pub fn push(&self, fut: Fut) @@ -67,10 +69,18 @@ impl FuturesResolver { } /// Return the resolution. - pub fn resolve(self) -> impl Future, E>> { + pub fn resolve(self) -> impl Future> { let mut rx = self.rx; drop(self.tx); - async move { rx.recv().await.transpose() } + async move { + while let Some(res) = rx.recv().await { + match res { + Ok(ret) => return Some(ret), + Err(err) => warn!(?err, "Faile to resolve the future"), + } + } + None + } } } diff --git a/crates/binstalk-fetchers/src/gh_crate_meta.rs b/crates/binstalk-fetchers/src/gh_crate_meta.rs index 976b6942..51cd012c 100644 --- a/crates/binstalk-fetchers/src/gh_crate_meta.rs +++ b/crates/binstalk-fetchers/src/gh_crate_meta.rs @@ -278,7 +278,7 @@ impl super::Fetcher for GhCrateMeta { } } - if let Some(resolved) = resolver.resolve().await? { + if let Some(resolved) = resolver.resolve().await { debug!(?resolved, "Winning URL found!"); self.resolution .set(resolved)