package discord4j.rest.request;

import discord4j.rest.http.client.ClientException;
import discord4j.rest.http.client.ClientRequest;
import discord4j.rest.http.client.DiscordWebClient;
import discord4j.rest.util.RouteUtils;
import io.netty.handler.codec.http.DefaultHttpHeaders;
import io.netty.handler.codec.http.HttpHeaders;
import java.time.Duration;
import java.util.Optional;
import java.util.Set;
import java.util.function.Consumer;
import java.util.logging.Level;
import org.reactivestreams.Publisher;
import reactor.core.publisher.EmitterProcessor;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoProcessor;
import reactor.core.publisher.SignalType;
import reactor.netty.http.client.HttpClientResponse;
import reactor.retry.BackoffDelay;
import reactor.retry.IterationContext;
import reactor.retry.Retry;
import reactor.retry.RetryContext;
import reactor.util.Logger;
import reactor.util.Loggers;
import reactor.util.function.Tuple2;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:discord4j/rest/request/RequestStream.class */
public class RequestStream<T> {
    private final EmitterProcessor<Tuple2<MonoProcessor<T>, DiscordRequest<T>>> backing = EmitterProcessor.create(false);
    private final BucketKey id;
    private final Logger log;
    private final DiscordWebClient httpClient;
    private final GlobalRateLimiter globalRateLimiter;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:discord4j/rest/request/RequestStream$Reader.class */
    public class Reader implements Consumer<Tuple2<MonoProcessor<T>, DiscordRequest<T>>> {
        private volatile Duration sleepTime;
        private final Consumer<HttpClientResponse> rateLimitHandler;

        private Reader() {
            this.sleepTime = Duration.ZERO;
            this.rateLimitHandler = httpClientResponse -> {
                HttpHeaders responseHeaders = httpClientResponse.responseHeaders();
                if (RequestStream.this.log.isTraceEnabled()) {
                    RequestStream.this.log.trace("Read {} with headers: {}", new Object[]{httpClientResponse.status(), httpClientResponse.responseHeaders()});
                }
                if (responseHeaders.getInt("X-RateLimit-Remaining", -1) == 0) {
                    Duration ofSeconds = Duration.ofSeconds(Long.parseLong(responseHeaders.get("X-RateLimit-Reset")) - (responseHeaders.getTimeMillis("Date").longValue() / 1000));
                    if (RequestStream.this.log.isTraceEnabled()) {
                        RequestStream.this.log.trace("Delaying next request by {}", new Object[]{ofSeconds});
                    }
                    this.sleepTime = ofSeconds;
                }
            };
        }

        @Override // java.util.function.Consumer
        public void accept(Tuple2<MonoProcessor<T>, DiscordRequest<T>> tuple2) {
            MonoProcessor monoProcessor = (MonoProcessor) tuple2.getT1();
            DiscordRequest discordRequest = (DiscordRequest) tuple2.getT2();
            if (RequestStream.this.log.isTraceEnabled()) {
                RequestStream.this.log.trace("Accepting request: {}", new Object[]{discordRequest});
            }
            ClientRequest clientRequest = new ClientRequest(discordRequest.getRoute().getMethod(), RouteUtils.expandQuery(discordRequest.getCompleteUri(), discordRequest.getQueryParams()), (HttpHeaders) Optional.ofNullable(discordRequest.getHeaders()).map(map -> {
                return (HttpHeaders) map.entrySet().stream().reduce(new DefaultHttpHeaders(), (httpHeaders, entry) -> {
                    String str = (String) entry.getKey();
                    ((Set) entry.getValue()).forEach(str2 -> {
                        httpHeaders.add(str, str2);
                    });
                    return httpHeaders;
                }, (v0, v1) -> {
                    return v0.add(v1);
                });
            }).orElse(new DefaultHttpHeaders()));
            Class<T> responseType = discordRequest.getRoute().getResponseType();
            Mono.when(new Publisher[]{RequestStream.this.globalRateLimiter}).log("discord4j.rest.request." + RequestStream.this.id, Level.FINEST, new SignalType[0]).materialize().flatMap(signal -> {
                return RequestStream.this.httpClient.exchange(clientRequest, discordRequest.getBody(), responseType, this.rateLimitHandler);
            }).retryWhen(RequestStream.this.rateLimitRetryFactory()).retryWhen(RequestStream.this.serverErrorRetryFactory()).log("discord4j.rest.response." + RequestStream.this.id, Level.FINEST, new SignalType[0]).materialize().subscribe(signal2 -> {
                if (signal2.isOnSubscribe()) {
                    monoProcessor.onSubscribe(signal2.getSubscription());
                } else if (signal2.isOnNext()) {
                    monoProcessor.onNext(signal2.get());
                } else if (signal2.isOnError()) {
                    monoProcessor.onError(signal2.getThrowable());
                } else if (signal2.isOnComplete()) {
                    monoProcessor.onComplete();
                }
                Mono.delay(this.sleepTime).subscribe(l -> {
                    if (RequestStream.this.log.isTraceEnabled()) {
                        RequestStream.this.log.trace("Ready to consume next request");
                    }
                    this.sleepTime = Duration.ZERO;
                    RequestStream.this.read().subscribe(this);
                });
            });
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public RequestStream(BucketKey bucketKey, DiscordWebClient discordWebClient, GlobalRateLimiter globalRateLimiter) {
        this.id = bucketKey;
        this.log = Loggers.getLogger("discord4j.rest.request." + bucketKey);
        this.httpClient = discordWebClient;
        this.globalRateLimiter = globalRateLimiter;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public Retry<?> rateLimitRetryFactory() {
        return Retry.onlyIf((v1) -> {
            return isRateLimitError(v1);
        }).backoff(iterationContext -> {
            if (!isRateLimitError(iterationContext)) {
                return new BackoffDelay(Duration.ZERO);
            }
            ClientException clientException = (ClientException) ((RetryContext) iterationContext).exception();
            boolean booleanValue = Boolean.valueOf(clientException.getHeaders().get("X-RateLimit-Global")).booleanValue();
            Duration ofMillis = Duration.ofMillis(Long.valueOf(clientException.getHeaders().get("Retry-After")).longValue());
            if (booleanValue) {
                this.globalRateLimiter.rateLimitFor(ofMillis);
            }
            return new BackoffDelay(ofMillis);
        }).doOnRetry(retryContext -> {
            if (this.log.isTraceEnabled()) {
                this.log.trace("Retry {} due to {} for {}", new Object[]{Long.valueOf(retryContext.iteration()), retryContext.exception().toString(), retryContext.backoff()});
            }
        });
    }

    private boolean isRateLimitError(IterationContext<?> iterationContext) {
        Throwable exception = ((RetryContext) iterationContext).exception();
        return (exception instanceof ClientException) && ((ClientException) exception).getStatus().code() == 429;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public Retry<?> serverErrorRetryFactory() {
        return Retry.onlyIf((v1) -> {
            return isServerError(v1);
        }).exponentialBackoffWithJitter(Duration.ofSeconds(2L), Duration.ofSeconds(30L)).doOnRetry(retryContext -> {
            if (this.log.isTraceEnabled()) {
                this.log.trace("Retry {} due to {} for {}", new Object[]{Long.valueOf(retryContext.iteration()), retryContext.exception().toString(), retryContext.backoff()});
            }
        });
    }

    private boolean isServerError(IterationContext<?> iterationContext) {
        Throwable exception = ((RetryContext) iterationContext).exception();
        if (!(exception instanceof ClientException)) {
            return false;
        }
        int code = ((ClientException) exception).getStatus().code();
        return code == 502 || code == 503 || code == 504;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void push(Tuple2<MonoProcessor<T>, DiscordRequest<T>> tuple2) {
        this.backing.onNext(tuple2);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void start() {
        read().subscribe(new Reader(), th -> {
            this.log.error("Error when consuming request", th);
        });
    }

    /* JADX INFO: Access modifiers changed from: private */
    public Mono<Tuple2<MonoProcessor<T>, DiscordRequest<T>>> read() {
        return this.backing.next();
    }
}
