package discord4j.connect.rsocket.router;

import discord4j.common.LogUtil;
import discord4j.common.ReactorResources;
import discord4j.common.retry.ReconnectOptions;
import discord4j.connect.common.Discord4JConnectException;
import discord4j.connect.rsocket.ConnectRSocket;
import discord4j.rest.http.client.ClientException;
import discord4j.rest.http.client.ClientRequest;
import discord4j.rest.http.client.ClientResponse;
import discord4j.rest.http.client.DiscordWebClient;
import discord4j.rest.request.BucketKey;
import discord4j.rest.request.DiscordWebRequest;
import discord4j.rest.request.DiscordWebResponse;
import discord4j.rest.request.GlobalRateLimiter;
import discord4j.rest.request.RateLimitRetryOperator;
import discord4j.rest.request.RateLimitStrategy;
import discord4j.rest.request.ResponseHeaderStrategy;
import discord4j.rest.request.Router;
import discord4j.rest.response.ResponseFunction;
import io.rsocket.Payload;
import io.rsocket.RSocket;
import io.rsocket.util.DefaultPayload;
import java.net.InetSocketAddress;
import java.time.Duration;
import java.time.Instant;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Function;
import reactor.core.publisher.Mono;
import reactor.core.publisher.UnicastProcessor;
import reactor.core.scheduler.Scheduler;
import reactor.netty.http.client.HttpClientResponse;
import reactor.util.Logger;
import reactor.util.Loggers;
import reactor.util.context.Context;
import reactor.util.retry.Retry;

/* loaded from: input_file:discord4j/connect/rsocket/router/RSocketRouter.class */
public class RSocketRouter implements Router {
    private static final Logger log = Loggers.getLogger(RSocketRouter.class);
    private static final ResponseHeaderStrategy HEADER_STRATEGY = new ResponseHeaderStrategy();
    private static final String READY = "READY";
    private final ReactorResources reactorResources;
    private final List<ResponseFunction> responseFunctions;
    private final DiscordWebClient httpClient;
    private final GlobalRateLimiter globalRateLimiter;
    private final Function<DiscordWebRequest, InetSocketAddress> requestTransportMapper;
    private final Map<InetSocketAddress, ConnectRSocket> sockets = new ConcurrentHashMap();
    private final Map<BucketKey, BucketRequestExecutor> buckets = new ConcurrentHashMap();

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:discord4j/connect/rsocket/router/RSocketRouter$BucketRequestExecutor.class */
    public class BucketRequestExecutor {
        private final BucketKey id;
        private final RateLimitRetryOperator rateLimitRetryOperator;
        private final Function<ClientResponse, Mono<ClientResponse>> rateLimitHandler;
        private volatile Duration sleepTime = Duration.ZERO;

        BucketRequestExecutor(BucketKey bucketKey, GlobalRateLimiter globalRateLimiter, RateLimitStrategy rateLimitStrategy, Scheduler scheduler) {
            this.id = bucketKey;
            this.rateLimitRetryOperator = new RateLimitRetryOperator(scheduler);
            this.rateLimitHandler = clientResponse -> {
                HttpClientResponse httpResponse = clientResponse.getHttpResponse();
                if (RSocketRouter.log.isDebugEnabled()) {
                    Duration between = Duration.between(Instant.ofEpochMilli(((Long) httpResponse.currentContext().get("discord4j.request.timestamp")).longValue()), Instant.now());
                    LogUtil.traceDebug(RSocketRouter.log, bool -> {
                        return LogUtil.format(httpResponse.currentContext(), "Read " + httpResponse.status() + " in " + between + (!bool.booleanValue() ? "" : " with headers: " + httpResponse.responseHeaders()));
                    });
                }
                Duration apply = rateLimitStrategy.apply(httpResponse);
                if (!apply.isZero()) {
                    if (RSocketRouter.log.isDebugEnabled()) {
                        RSocketRouter.log.debug(LogUtil.format(httpResponse.currentContext(), "Delaying next request by {}"), new Object[]{apply});
                    }
                    this.sleepTime = apply;
                }
                boolean parseBoolean = Boolean.parseBoolean(httpResponse.responseHeaders().get("X-RateLimit-Global"));
                Mono empty = Mono.empty();
                if (parseBoolean) {
                    Duration ofMillis = Duration.ofMillis(Long.parseLong(httpResponse.responseHeaders().get("Retry-After")));
                    empty = globalRateLimiter.rateLimitFor(ofMillis).doOnTerminate(() -> {
                        RSocketRouter.log.debug(LogUtil.format(httpResponse.currentContext(), "Globally rate limited for {}"), new Object[]{ofMillis});
                    });
                }
                return httpResponse.status().code() >= 400 ? empty.then(clientResponse.createException().flatMap((v0) -> {
                    return Mono.error(v0);
                })) : empty.thenReturn(clientResponse);
            };
        }

        public Mono<ClientResponse> exchange(ClientRequest clientRequest, RSocket rSocket, Context context) {
            Mono doOnEach = Mono.just(clientRequest).doOnEach(signal -> {
                RSocketRouter.log.trace(LogUtil.format(signal.getContext(), ">> {}"), new Object[]{signal});
            });
            DiscordWebClient discordWebClient = RSocketRouter.this.httpClient;
            discordWebClient.getClass();
            Mono subscriberContext = doOnEach.flatMap(discordWebClient::exchange).flatMap(this.rateLimitHandler).doOnEach(signal2 -> {
                RSocketRouter.log.trace(LogUtil.format(signal2.getContext(), "<< {}"), new Object[]{signal2});
            }).flatMap(clientResponse -> {
                return rSocket.requestResponse(RSocketRouter.limitPayload(this.id.toString(), this.sleepTime.toMillis())).thenReturn(clientResponse);
            }).subscriberContext(context2 -> {
                return context2.putAll(context).put("discord4j.request", clientRequest.getId()).put("discord4j.bucket", this.id.toString());
            });
            RateLimitRetryOperator rateLimitRetryOperator = this.rateLimitRetryOperator;
            rateLimitRetryOperator.getClass();
            return subscriberContext.retryWhen(Retry.withThrowable(rateLimitRetryOperator::apply)).transform(getResponseTransformers(clientRequest.getDiscordRequest())).retryWhen(Retry.withThrowable(serverErrorRetryFactory())).checkpoint("Request to " + clientRequest.getDescription() + " [RSocketRouter]");
        }

        private Function<Mono<ClientResponse>, Mono<ClientResponse>> getResponseTransformers(DiscordWebRequest discordWebRequest) {
            return (Function) RSocketRouter.this.responseFunctions.stream().map(responseFunction -> {
                return responseFunction.transform(discordWebRequest).andThen(mono -> {
                    return mono.checkpoint("Apply " + responseFunction + " to " + discordWebRequest.getDescription() + " [RSocketRouter]");
                });
            }).reduce((v0, v1) -> {
                return v0.andThen(v1);
            }).orElse(mono -> {
                return mono;
            });
        }

        private reactor.retry.Retry<?> serverErrorRetryFactory() {
            return reactor.retry.Retry.onlyIf(ClientException.isRetryContextStatusCode(new Integer[]{500, 502, 503, 504})).exponentialBackoffWithJitter(Duration.ofSeconds(2L), Duration.ofSeconds(30L)).doOnRetry(retryContext -> {
                if (RSocketRouter.log.isTraceEnabled()) {
                    RSocketRouter.log.trace("Retry {} in bucket {} due to {} for {}", new Object[]{Long.valueOf(retryContext.iteration()), this.id.toString(), retryContext.exception().toString(), retryContext.backoff()});
                }
            });
        }
    }

    public RSocketRouter(RSocketRouterOptions rSocketRouterOptions) {
        this.reactorResources = (ReactorResources) Objects.requireNonNull(rSocketRouterOptions.getReactorResources(), "reactorResources");
        this.responseFunctions = (List) Objects.requireNonNull(rSocketRouterOptions.getResponseTransformers(), "responseFunctions");
        this.httpClient = new DiscordWebClient(this.reactorResources.getHttpClient(), rSocketRouterOptions.getExchangeStrategies(), "Bot", rSocketRouterOptions.getToken(), this.responseFunctions);
        this.globalRateLimiter = (GlobalRateLimiter) Objects.requireNonNull(rSocketRouterOptions.getGlobalRateLimiter(), "globalRateLimiter");
        this.requestTransportMapper = (Function) Objects.requireNonNull(rSocketRouterOptions.getRequestTransportMapper(), "requestTransportMapper");
    }

    public DiscordWebResponse exchange(DiscordWebRequest discordWebRequest) {
        return new DiscordWebResponse(Mono.deferWithContext(context -> {
            ClientRequest clientRequest = new ClientRequest(discordWebRequest);
            String id = clientRequest.getId();
            String bucketKey = BucketKey.of(discordWebRequest).toString();
            BucketRequestExecutor executor = getExecutor(discordWebRequest);
            return getSocket(this.requestTransportMapper.apply(discordWebRequest)).withSocket(rSocket -> {
                UnicastProcessor create = UnicastProcessor.create();
                create.onNext(requestPayload(bucketKey, id));
                return rSocket.requestChannel(create).onErrorMap(Discord4JConnectException::new).flatMap(payload -> {
                    String dataUtf8 = payload.getDataUtf8();
                    if (dataUtf8.startsWith(READY)) {
                        return executor.exchange(clientRequest, rSocket, context).doOnTerminate(() -> {
                            log.debug("[B:{}, R:{}] Request completed", new Object[]{bucketKey, id});
                            create.onNext(donePayload(bucketKey, id));
                            create.onComplete();
                        });
                    }
                    log.warn("Unknown payload: {}", new Object[]{dataUtf8});
                    return Mono.empty();
                }).next();
            }).next();
        }), this.reactorResources);
    }

    private ConnectRSocket getSocket(InetSocketAddress inetSocketAddress) {
        return this.sockets.computeIfAbsent(inetSocketAddress, inetSocketAddress2 -> {
            return new ConnectRSocket("router", inetSocketAddress, th -> {
                return true;
            }, ReconnectOptions.create());
        });
    }

    private BucketRequestExecutor getExecutor(DiscordWebRequest discordWebRequest) {
        return this.buckets.computeIfAbsent(BucketKey.of(discordWebRequest), bucketKey -> {
            if (log.isTraceEnabled()) {
                log.trace("Creating RequestStream with key {} for request: {} -> {}", new Object[]{bucketKey, discordWebRequest.getRoute().getUriTemplate(), discordWebRequest.getCompleteUri()});
            }
            return new BucketRequestExecutor(bucketKey, this.globalRateLimiter, HEADER_STRATEGY, this.reactorResources.getTimerTaskScheduler());
        });
    }

    private static Payload requestPayload(String str, String str2) {
        return DefaultPayload.create("REQUEST:" + str + ":" + str2);
    }

    private static Payload donePayload(String str, String str2) {
        return DefaultPayload.create("DONE:" + str + ":" + str2);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static Payload limitPayload(String str, long j) {
        return DefaultPayload.create("LIMIT:" + str + ":" + j);
    }
}
