use std::{ collections::HashMap, future::Future, iter::Peekable, pin::Pin, sync::Mutex, task::{Context, Poll}, }; use compact_str::{CompactString, ToCompactString}; use reqwest::{Request, Url}; use tokio::{ sync::Mutex as AsyncMutex, time::{sleep_until, Duration, Instant}, }; use tower::{Service, ServiceExt}; trait IterExt: Iterator { fn dedup(self) -> Dedup where Self: Sized, Self::Item: PartialEq, { Dedup(self.peekable()) } } impl IterExt for It {} struct Dedup(Peekable); impl Iterator for Dedup where It: Iterator, It::Item: PartialEq, { type Item = It::Item; fn next(&mut self) -> Option { let curr = self.0.next()?; // Drop all consecutive dup values while self.0.next_if_eq(&curr).is_some() {} Some(curr) } } #[derive(Debug)] pub(super) struct DelayRequest { inner: AsyncMutex, hosts_to_delay: Mutex>, } impl DelayRequest { pub(super) fn new(inner: S) -> Self { Self { inner: AsyncMutex::new(inner), hosts_to_delay: Default::default(), } } pub(super) fn add_urls_to_delay(&self, urls: &[&Url], delay_duration: Duration) { let deadline = Instant::now() + delay_duration; let mut hosts_to_delay = self.hosts_to_delay.lock().unwrap(); urls.iter() .filter_map(|url| url.host_str()) .dedup() .for_each(|host| { hosts_to_delay .entry(host.to_compact_string()) .and_modify(|old_dl| { *old_dl = deadline.max(*old_dl); }) .or_insert(deadline); }); } fn wait_until_available(&self, url: &Url) -> impl Future + Send + 'static { let mut hosts_to_delay = self.hosts_to_delay.lock().unwrap(); let deadline = url .host_str() .and_then(|host| hosts_to_delay.get(host).map(|deadline| (*deadline, host))) .and_then(|(deadline, host)| { if deadline.elapsed().is_zero() { Some(deadline) } else { // We have already gone past the deadline, // so we should remove it instead. hosts_to_delay.remove(host); None } }); async move { if let Some(deadline) = deadline { sleep_until(deadline).await; } } } } impl<'this, S> Service for &'this DelayRequest where S: Service + Send, S::Future: Send, { type Response = S::Response; type Error = S::Error; // TODO: Replace this with `type_alias_impl_trait` once it stablises // https://github.com/rust-lang/rust/issues/63063 type Future = Pin> + Send + 'this>>; fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll> { Poll::Ready(Ok(())) } fn call(&mut self, req: Request) -> Self::Future { let this = *self; Box::pin(async move { this.wait_until_available(req.url()).await; // Reduce critical section: // - Construct the request before locking // - Once it is ready, call it and obtain // the future, then release the lock before // polling the future, which performs network I/O that could // take really long. let future = this.inner.lock().await.ready().await?.call(req); future.await }) } }