package discord4j.rest.request;

import discord4j.rest.http.client.ClientException;
import discord4j.rest.http.client.ClientRequest;
import discord4j.rest.http.client.DiscordWebClient;
import java.time.Duration;
import java.time.Instant;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.logging.Level;
import org.reactivestreams.Subscription;
import reactor.core.publisher.BaseSubscriber;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoProcessor;
import reactor.core.publisher.SignalType;
import reactor.core.scheduler.Scheduler;
import reactor.netty.http.client.HttpClientResponse;
import reactor.retry.BackoffDelay;
import reactor.retry.IterationContext;
import reactor.retry.Retry;
import reactor.retry.RetryContext;
import reactor.util.Logger;
import reactor.util.Loggers;
import reactor.util.annotation.Nullable;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:discord4j/rest/request/RequestStream.class */
public class RequestStream<T> {
    private final RequestQueue<RequestCorrelation<T>> requestQueue;
    private final BucketKey id;
    private final Logger log;
    private final DiscordWebClient httpClient;
    private final GlobalRateLimiter globalRateLimiter;
    private final RateLimitStrategy rateLimitStrategy;
    private final Scheduler rateLimitScheduler;
    private final RouterOptions routerOptions;

    @FunctionalInterface
    /* loaded from: input_file:discord4j/rest/request/RequestStream$RateLimitStrategy.class */
    interface RateLimitStrategy extends Function<HttpClientResponse, Duration> {
    }

    /* loaded from: input_file:discord4j/rest/request/RequestStream$RequestSubscriber.class */
    private class RequestSubscriber extends BaseSubscriber<RequestCorrelation<T>> {
        private volatile Duration sleepTime = Duration.ZERO;
        private final Consumer<HttpClientResponse> rateLimitHandler;

        public RequestSubscriber(RateLimitStrategy rateLimitStrategy) {
            this.rateLimitHandler = httpClientResponse -> {
                if (RequestStream.this.log.isTraceEnabled()) {
                    RequestStream.this.log.trace("Read {} in {} with headers: {}", new Object[]{httpClientResponse.status(), Duration.between(Instant.ofEpochMilli(((Long) httpClientResponse.currentContext().get(DiscordWebClient.REQUEST_TIMESTAMP_KEY)).longValue()), Instant.now()), httpClientResponse.responseHeaders()});
                }
                Duration apply = rateLimitStrategy.apply(httpClientResponse);
                if (apply.isZero()) {
                    return;
                }
                if (RequestStream.this.log.isTraceEnabled()) {
                    RequestStream.this.log.trace("Delaying next request by {}", new Object[]{apply});
                }
                this.sleepTime = apply;
            };
        }

        protected void hookOnSubscribe(Subscription subscription) {
            request(1L);
        }

        /* JADX INFO: Access modifiers changed from: protected */
        public void hookOnNext(RequestCorrelation<T> requestCorrelation) {
            DiscordRequest<T> request = requestCorrelation.getRequest();
            MonoProcessor<T> response = requestCorrelation.getResponse();
            String shardId = requestCorrelation.getShardId();
            Logger logger = getLogger("traces", shardId);
            if (logger.isTraceEnabled()) {
                logger.trace("Accepting request: {}", new Object[]{request});
            }
            Logger logger2 = getLogger("request", shardId);
            Logger logger3 = getLogger("response", shardId);
            Class<T> responseType = request.getRoute().getResponseType();
            RequestStream.this.globalRateLimiter.withLimiter(Mono.fromCallable(() -> {
                return new ClientRequest(request);
            }).log(logger2, Level.FINEST, false, new SignalType[0]).flatMap(clientRequest -> {
                return RequestStream.this.httpClient.exchange(clientRequest, request.getBody(), responseType, this.rateLimitHandler);
            }).retryWhen(RequestStream.this.rateLimitRetryFactory()).transform(getResponseTransformers(request)).retryWhen(RequestStream.this.serverErrorRetryFactory()).log(logger3, Level.FINEST, false, new SignalType[0]).doFinally(signalType -> {
                next(signalType, logger);
            })).materialize().subscribe(signal -> {
                if (signal.isOnSubscribe()) {
                    response.onSubscribe(signal.getSubscription());
                    return;
                }
                if (signal.isOnNext()) {
                    response.onNext(signal.get());
                } else if (signal.isOnError()) {
                    response.onError(signal.getThrowable());
                } else if (signal.isOnComplete()) {
                    response.onComplete();
                }
            });
        }

        private Function<Mono<T>, Mono<T>> getResponseTransformers(DiscordRequest<T> discordRequest) {
            return (Function) RequestStream.this.routerOptions.getResponseTransformers().stream().map(responseFunction -> {
                return responseFunction.transform(discordRequest);
            }).reduce((v0, v1) -> {
                return v0.andThen(v1);
            }).orElse(mono -> {
                return mono;
            });
        }

        private void next(SignalType signalType, Logger logger) {
            Mono.delay(this.sleepTime, RequestStream.this.rateLimitScheduler).subscribe(l -> {
                if (logger.isTraceEnabled()) {
                    logger.trace("Ready to consume next request after {}", new Object[]{signalType});
                }
                this.sleepTime = Duration.ZERO;
                request(1L);
            }, th -> {
                logger.error("Error while scheduling next request", th);
            });
        }

        private Logger getLogger(String str, @Nullable String str2) {
            return Loggers.getLogger("discord4j.rest." + str + "." + RequestStream.this.id + (str2 == null ? "" : "." + str2));
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public RequestStream(BucketKey bucketKey, DiscordWebClient discordWebClient, GlobalRateLimiter globalRateLimiter, RateLimitStrategy rateLimitStrategy, Scheduler scheduler, RouterOptions routerOptions) {
        this.requestQueue = routerOptions.getRequestQueueFactory().create();
        this.id = bucketKey;
        this.log = Loggers.getLogger("discord4j.rest.traces." + bucketKey);
        this.httpClient = discordWebClient;
        this.globalRateLimiter = globalRateLimiter;
        this.rateLimitStrategy = rateLimitStrategy;
        this.rateLimitScheduler = scheduler;
        this.routerOptions = routerOptions;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public Retry<?> rateLimitRetryFactory() {
        return Retry.onlyIf((v1) -> {
            return isRateLimitError(v1);
        }).backoff(iterationContext -> {
            if (!isRateLimitError(iterationContext)) {
                return new BackoffDelay(Duration.ZERO);
            }
            ClientException clientException = (ClientException) ((RetryContext) iterationContext).exception();
            boolean parseBoolean = Boolean.parseBoolean(clientException.getHeaders().get("X-RateLimit-Global"));
            Duration ofMillis = Duration.ofMillis(Long.parseLong(clientException.getHeaders().get("Retry-After")));
            if (parseBoolean) {
                Duration remaining = this.globalRateLimiter.getRemaining();
                if (!remaining.isNegative() && !remaining.isZero()) {
                    return new BackoffDelay(remaining);
                }
                this.log.debug("Globally rate limited for {}", new Object[]{ofMillis});
                this.globalRateLimiter.rateLimitFor(ofMillis);
            }
            return new BackoffDelay(ofMillis);
        }).doOnRetry(retryContext -> {
            if (this.log.isTraceEnabled()) {
                this.log.trace("Retry {} due to {} for {}", new Object[]{Long.valueOf(retryContext.iteration()), retryContext.exception().toString(), retryContext.backoff()});
            }
        });
    }

    private boolean isRateLimitError(IterationContext<?> iterationContext) {
        Throwable exception = ((RetryContext) iterationContext).exception();
        return (exception instanceof ClientException) && ((ClientException) exception).getStatus().code() == 429;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public Retry<?> serverErrorRetryFactory() {
        return Retry.onlyIf(ClientException.isRetryContextStatusCode(500, 502, 503, 504)).exponentialBackoffWithJitter(Duration.ofSeconds(2L), Duration.ofSeconds(30L)).doOnRetry(retryContext -> {
            if (this.log.isTraceEnabled()) {
                this.log.trace("Retry {} due to {} for {}", new Object[]{Long.valueOf(retryContext.iteration()), retryContext.exception().toString(), retryContext.backoff()});
            }
        });
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void push(RequestCorrelation<T> requestCorrelation) {
        this.requestQueue.push(requestCorrelation);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void start() {
        this.requestQueue.requests().doOnDiscard(RequestCorrelation.class, this::onDiscard).subscribe(new RequestSubscriber(this.rateLimitStrategy));
    }

    private void onDiscard(RequestCorrelation<?> requestCorrelation) {
        requestCorrelation.getResponse().onError(new DiscardedRequestException(requestCorrelation.getRequest()));
    }
}
