package discord4j.rest.request;

import discord4j.common.LogUtil;
import discord4j.rest.http.client.ClientException;
import discord4j.rest.http.client.ClientRequest;
import discord4j.rest.http.client.ClientResponse;
import discord4j.rest.http.client.DiscordWebClient;
import discord4j.rest.response.ResponseFunction;
import java.time.Duration;
import java.time.Instant;
import java.util.List;
import java.util.function.Consumer;
import java.util.function.Function;
import org.reactivestreams.Subscription;
import reactor.core.publisher.BaseSubscriber;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoProcessor;
import reactor.core.publisher.SignalType;
import reactor.core.scheduler.Scheduler;
import reactor.netty.http.client.HttpClientResponse;
import reactor.util.Logger;
import reactor.util.Loggers;
import reactor.util.retry.Retry;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:discord4j/rest/request/RequestStream.class */
public class RequestStream {
    private static final Logger log = Loggers.getLogger(RequestStream.class);
    private final BucketKey id;
    private final RequestQueue<RequestCorrelation<ClientResponse>> requestQueue;
    private final GlobalRateLimiter globalRateLimiter;
    private final Scheduler timedTaskScheduler;
    private final List<ResponseFunction> responseFunctions;
    private final DiscordWebClient httpClient;
    private final RateLimitStrategy rateLimitStrategy;
    private final RateLimitRetryOperator rateLimitRetryOperator;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:discord4j/rest/request/RequestStream$RequestSubscriber.class */
    public class RequestSubscriber extends BaseSubscriber<RequestCorrelation<ClientResponse>> {
        private volatile Duration sleepTime = Duration.ZERO;
        private final Function<ClientResponse, Mono<ClientResponse>> responseFunction;

        public RequestSubscriber(RateLimitStrategy rateLimitStrategy) {
            this.responseFunction = clientResponse -> {
                HttpClientResponse httpResponse = clientResponse.getHttpResponse();
                if (RequestStream.log.isDebugEnabled()) {
                    Duration between = Duration.between(Instant.ofEpochMilli(((Long) httpResponse.currentContext().get(DiscordWebClient.KEY_REQUEST_TIMESTAMP)).longValue()), Instant.now());
                    LogUtil.traceDebug(RequestStream.log, bool -> {
                        return LogUtil.format(httpResponse.currentContext(), "Read " + httpResponse.status() + " in " + between + (!bool.booleanValue() ? "" : " with headers: " + httpResponse.responseHeaders()));
                    });
                }
                Duration apply = rateLimitStrategy.apply(httpResponse);
                if (!apply.isZero()) {
                    if (RequestStream.log.isDebugEnabled()) {
                        RequestStream.log.debug(LogUtil.format(httpResponse.currentContext(), "Delaying next request by {}"), new Object[]{apply});
                    }
                    this.sleepTime = apply;
                }
                boolean parseBoolean = Boolean.parseBoolean(httpResponse.responseHeaders().get("X-RateLimit-Global"));
                Mono empty = Mono.empty();
                if (parseBoolean) {
                    Duration ofMillis = Duration.ofMillis(Long.parseLong(httpResponse.responseHeaders().get("Retry-After")));
                    empty = RequestStream.this.globalRateLimiter.rateLimitFor(ofMillis).doOnTerminate(() -> {
                        RequestStream.log.debug(LogUtil.format(httpResponse.currentContext(), "Globally rate limited for {}"), new Object[]{ofMillis});
                    });
                }
                return httpResponse.status().code() >= 400 ? empty.then(clientResponse.createException().flatMap((v0) -> {
                    return Mono.error(v0);
                })) : empty.thenReturn(clientResponse);
            };
        }

        protected void hookOnSubscribe(Subscription subscription) {
            request(1L);
        }

        /* JADX INFO: Access modifiers changed from: protected */
        public void hookOnNext(RequestCorrelation<ClientResponse> requestCorrelation) {
            DiscordWebRequest request = requestCorrelation.getRequest();
            ClientRequest clientRequest = new ClientRequest(request);
            MonoProcessor<ClientResponse> response = requestCorrelation.getResponse();
            if (RequestStream.log.isDebugEnabled()) {
                RequestStream.log.debug("[B:{}, R:{}] {}", new Object[]{RequestStream.this.id.toString(), clientRequest.getId(), clientRequest.getDescription()});
            }
            Mono subscriberContext = Mono.just(clientRequest).doOnEach(signal -> {
                RequestStream.log.trace(LogUtil.format(signal.getContext(), ">> {}"), new Object[]{signal});
            }).flatMap(clientRequest2 -> {
                return RequestStream.this.globalRateLimiter.withLimiter(RequestStream.this.httpClient.exchange(clientRequest2).flatMap(this.responseFunction)).next();
            }).doOnEach(signal2 -> {
                RequestStream.log.trace(LogUtil.format(signal2.getContext(), "<< {}"), new Object[]{signal2});
            }).subscriberContext(context -> {
                return context.putAll(requestCorrelation.getContext()).put("discord4j.request", clientRequest.getId()).put("discord4j.bucket", RequestStream.this.id.toString());
            });
            RateLimitRetryOperator rateLimitRetryOperator = RequestStream.this.rateLimitRetryOperator;
            rateLimitRetryOperator.getClass();
            subscriberContext.retryWhen(Retry.withThrowable(rateLimitRetryOperator::apply)).transform(getResponseTransformers(request)).retryWhen(Retry.withThrowable(RequestStream.this.serverErrorRetryFactory())).doFinally(this::next).checkpoint("Request to " + clientRequest.getDescription() + " [RequestStream]").subscribeWith(response).subscribe((Consumer) null, th -> {
                RequestStream.log.trace("Error while processing {}: {}", new Object[]{request, th});
            });
        }

        private Function<Mono<ClientResponse>, Mono<ClientResponse>> getResponseTransformers(DiscordWebRequest discordWebRequest) {
            return (Function) RequestStream.this.responseFunctions.stream().map(responseFunction -> {
                return responseFunction.transform(discordWebRequest).andThen(mono -> {
                    return mono.checkpoint("Apply " + responseFunction + " to " + discordWebRequest.getDescription() + " [RequestStream]");
                });
            }).reduce((v0, v1) -> {
                return v0.andThen(v1);
            }).orElse(mono -> {
                return mono;
            });
        }

        private void next(SignalType signalType) {
            (this.sleepTime.isZero() ? Mono.just(0L) : Mono.delay(this.sleepTime, RequestStream.this.timedTaskScheduler)).subscribe(l -> {
                if (RequestStream.log.isDebugEnabled()) {
                    RequestStream.log.debug("[B:{}] Ready to consume next request after {}", new Object[]{RequestStream.this.id.toString(), signalType});
                }
                this.sleepTime = Duration.ZERO;
                request(1L);
            }, th -> {
                RequestStream.log.error("[B:{}] Error while scheduling next request", new Object[]{RequestStream.this.id.toString(), th});
            });
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public RequestStream(BucketKey bucketKey, RouterOptions routerOptions, DiscordWebClient discordWebClient, RateLimitStrategy rateLimitStrategy) {
        this.id = bucketKey;
        this.requestQueue = routerOptions.getRequestQueueFactory().create();
        this.globalRateLimiter = routerOptions.getGlobalRateLimiter();
        this.timedTaskScheduler = routerOptions.getReactorResources().getTimerTaskScheduler();
        this.responseFunctions = routerOptions.getResponseTransformers();
        this.httpClient = discordWebClient;
        this.rateLimitStrategy = rateLimitStrategy;
        this.rateLimitRetryOperator = new RateLimitRetryOperator(this.timedTaskScheduler);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public reactor.retry.Retry<?> serverErrorRetryFactory() {
        return reactor.retry.Retry.onlyIf(ClientException.isRetryContextStatusCode(500, 502, 503, 504, 520)).withBackoffScheduler(this.timedTaskScheduler).exponentialBackoffWithJitter(Duration.ofSeconds(2L), Duration.ofSeconds(30L)).doOnRetry(retryContext -> {
            if (log.isDebugEnabled()) {
                log.debug("Retry {} in bucket {} due to {} for {}", new Object[]{Long.valueOf(retryContext.iteration()), this.id.toString(), retryContext.exception().toString(), retryContext.backoff()});
            }
        });
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void push(RequestCorrelation<ClientResponse> requestCorrelation) {
        this.requestQueue.push(requestCorrelation);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void start() {
        this.requestQueue.requests().doOnDiscard(RequestCorrelation.class, this::onDiscard).subscribe(new RequestSubscriber(this.rateLimitStrategy));
    }

    private void onDiscard(RequestCorrelation<?> requestCorrelation) {
        requestCorrelation.getResponse().onError(new DiscardedRequestException(requestCorrelation.getRequest()));
    }
}
