Fix too many 429 response (#1231)

Fixed #1229

Signed-off-by: Jiahao XU <Jiahao_XU@outlook.com>
This commit is contained in:
Jiahao XU 2023-08-08 10:53:15 +10:00 committed by GitHub
parent e4c776f403
commit 435df675b3
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
11 changed files with 228 additions and 132 deletions

View file

@ -1,19 +1,14 @@
use std::{
collections::HashMap,
future::Future,
iter::Peekable,
pin::Pin,
collections::HashMap, future::Future, iter::Peekable, num::NonZeroU64, ops::ControlFlow,
sync::Mutex,
task::{Context, Poll},
};
use compact_str::{CompactString, ToCompactString};
use reqwest::{Request, Url};
use tokio::{
sync::Mutex as AsyncMutex,
time::{sleep_until, Duration, Instant},
};
use tower::{Service, ServiceExt};
use tokio::time::{sleep_until, Duration, Instant};
use tracing::debug;
pub(super) type RequestResult = Result<reqwest::Response, reqwest::Error>;
trait IterExt: Iterator {
fn dedup(self) -> Dedup<Self>
@ -47,15 +42,107 @@ where
}
#[derive(Debug)]
pub(super) struct DelayRequest<S> {
inner: AsyncMutex<S>,
struct Inner {
client: reqwest::Client,
num_request: NonZeroU64,
per: Duration,
until: Instant,
state: State,
}
#[derive(Debug)]
enum State {
Limited,
Ready { rem: NonZeroU64 },
}
impl Inner {
fn new(num_request: NonZeroU64, per: Duration, client: reqwest::Client) -> Self {
Inner {
client,
per,
num_request,
until: Instant::now() + per,
state: State::Ready { rem: num_request },
}
}
fn inc_rate_limit(&mut self) {
if let Some(num_request) = NonZeroU64::new(self.num_request.get() / 2) {
// If self.num_request.get() > 1, then cut it by half
self.num_request = num_request;
if let State::Ready { rem, .. } = &mut self.state {
*rem = num_request.min(*rem)
}
}
let per = self.per;
if per < Duration::from_millis(700) {
self.per = per.mul_f32(1.2);
self.until += self.per - per;
}
}
fn ready(&mut self) -> Readiness {
match self.state {
State::Ready { .. } => Readiness::Ready,
State::Limited => {
if self.until.elapsed().is_zero() {
Readiness::Limited(self.until)
} else {
// rate limit can be reset now and is ready
self.until = Instant::now() + self.per;
self.state = State::Ready {
rem: self.num_request,
};
Readiness::Ready
}
}
}
}
fn call(&mut self, req: Request) -> impl Future<Output = RequestResult> {
match &mut self.state {
State::Ready { rem } => {
let now = Instant::now();
// If the period has elapsed, reset it.
if now >= self.until {
self.until = now + self.per;
*rem = self.num_request;
}
if let Some(new_rem) = NonZeroU64::new(rem.get() - 1) {
*rem = new_rem;
} else {
// The service is disabled until further notice
self.state = State::Limited;
}
// Call the inner future
self.client.execute(req)
}
State::Limited => panic!("service not ready; poll_ready must be called first"),
}
}
}
enum Readiness {
Limited(Instant),
Ready,
}
#[derive(Debug)]
pub(super) struct DelayRequest {
inner: Mutex<Inner>,
hosts_to_delay: Mutex<HashMap<CompactString, Instant>>,
}
impl<S> DelayRequest<S> {
pub(super) fn new(inner: S) -> Self {
impl DelayRequest {
pub(super) fn new(num_request: NonZeroU64, per: Duration, client: reqwest::Client) -> Self {
Self {
inner: AsyncMutex::new(inner),
inner: Mutex::new(Inner::new(num_request, per, client)),
hosts_to_delay: Default::default(),
}
}
@ -78,61 +165,81 @@ impl<S> DelayRequest<S> {
});
}
fn wait_until_available(&self, url: &Url) -> impl Future<Output = ()> + Send + 'static {
fn get_delay_until(&self, host: &str) -> Option<Instant> {
let mut hosts_to_delay = self.hosts_to_delay.lock().unwrap();
let deadline = url
.host_str()
.and_then(|host| hosts_to_delay.get(host).map(|deadline| (*deadline, host)))
.and_then(|(deadline, host)| {
if deadline.elapsed().is_zero() {
Some(deadline)
} else {
// We have already gone past the deadline,
// so we should remove it instead.
hosts_to_delay.remove(host);
None
}
});
async move {
if let Some(deadline) = deadline {
sleep_until(deadline).await;
hosts_to_delay.get(host).copied().and_then(|until| {
if until.elapsed().is_zero() {
Some(until)
} else {
// We have already gone past the deadline,
// so we should remove it instead.
hosts_to_delay.remove(host);
None
}
}
}
}
impl<'this, S> Service<Request> for &'this DelayRequest<S>
where
S: Service<Request> + Send,
S::Future: Send,
{
type Response = S::Response;
type Error = S::Error;
// TODO: Replace this with `type_alias_impl_trait` once it stablises
// https://github.com/rust-lang/rust/issues/63063
type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send + 'this>>;
fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
fn call(&mut self, req: Request) -> Self::Future {
let this = *self;
Box::pin(async move {
this.wait_until_available(req.url()).await;
// Reduce critical section:
// - Construct the request before locking
// - Once it is ready, call it and obtain
// the future, then release the lock before
// polling the future, which performs network I/O that could
// take really long.
let future = this.inner.lock().await.ready().await?.call(req);
future.await
})
}
// Define a new function so that the guard will be dropped ASAP and not
// included in the future.
fn call_inner(
&self,
counter: &mut u32,
req: &mut Option<Request>,
) -> ControlFlow<impl Future<Output = RequestResult>, Instant> {
// Wait until we are ready to send next requests
// (client-side rate-limit throttler).
let mut guard = self.inner.lock().unwrap();
if let Readiness::Limited(until) = guard.ready() {
ControlFlow::Continue(until)
} else if let Some(until) = req
.as_ref()
.unwrap()
.url()
.host_str()
.and_then(|host| self.get_delay_until(host))
{
// If the host rate-limit us, then wait until then
// and try again (server-side rate-limit throttler).
// Try increasing client-side rate-limit throttler to prevent
// rate-limit in the future.
guard.inc_rate_limit();
let additional_delay =
Duration::from_millis(200) + Duration::from_millis(100) * 20.min(*counter);
*counter += 1;
debug!("server-side rate limit exceeded; sleeping.");
ControlFlow::Continue(until + additional_delay)
} else {
ControlFlow::Break(guard.call(req.take().unwrap()))
}
}
pub(super) async fn call(&self, req: Request) -> RequestResult {
// Put all variables in a block so that will be dropped before polling
// the future returned by reqwest.
{
let mut counter = 0;
// Use Option here so that we don't have to move entire `Request`
// twice when calling `self.call_inner` while retain the ability to
// take its value without boxing.
//
// This will be taken when `ControlFlow::Break` is then it will
// break the loop, so it will never call `self.call_inner` with
// a `None`.
let mut req = Some(req);
loop {
match self.call_inner(&mut counter, &mut req) {
ControlFlow::Continue(until) => sleep_until(until).await,
ControlFlow::Break(future) => break future,
}
}
}
.await
}
}