use std::{ collections::HashMap, future::Future, pin::Pin, sync::Mutex, task::{Context, Poll}, }; use compact_str::{CompactString, ToCompactString}; use reqwest::{Request, Url}; use tokio::{ sync::Mutex as AsyncMutex, time::{sleep_until, Instant}, }; use tower::{Service, ServiceExt}; #[derive(Debug)] pub(super) struct DelayRequest { inner: AsyncMutex, hosts_to_delay: Mutex>, } impl DelayRequest { pub(super) fn new(inner: S) -> Self { Self { inner: AsyncMutex::new(inner), hosts_to_delay: Default::default(), } } pub(super) fn add_urls_to_delay<'a, Urls>(&self, urls: Urls, deadline: Instant) where Urls: IntoIterator, { let mut hosts_to_delay = self.hosts_to_delay.lock().unwrap(); urls.into_iter().filter_map(Url::host_str).for_each(|host| { hosts_to_delay .entry(host.to_compact_string()) .and_modify(|old_dl| { *old_dl = deadline.max(*old_dl); }) .or_insert(deadline); }); } fn wait_until_available(&self, url: &Url) -> impl Future + Send + 'static { let mut hosts_to_delay = self.hosts_to_delay.lock().unwrap(); let sleep = url .host_str() .and_then(|host| hosts_to_delay.get(host).map(|deadline| (*deadline, host))) .and_then(|(deadline, host)| { if deadline.elapsed().is_zero() { Some(sleep_until(deadline)) } else { // We have already gone past the deadline, // so we should remove it instead. hosts_to_delay.remove(host); None } }); async move { if let Some(sleep) = sleep { sleep.await; } } } } impl<'this, S> Service for &'this DelayRequest where S: Service + Send, S::Future: Send, { type Response = S::Response; type Error = S::Error; // TODO: Replace this with `type_alias_impl_trait` once it stablises // https://github.com/rust-lang/rust/issues/63063 type Future = Pin> + Send + 'this>>; fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll> { Poll::Ready(Ok(())) } fn call(&mut self, req: Request) -> Self::Future { let this = *self; Box::pin(async move { this.wait_until_available(req.url()).await; // Reduce critical section: // - Construct the request before locking // - Once it is ready, call it and obtain // the future, then release the lock before // polling the future, which performs network I/O that could // take really long. let future = this.inner.lock().await.ready().await?.call(req); future.await }) } }