package discord4j.connect.rsocket;

import discord4j.common.retry.ReconnectContext;
import discord4j.common.retry.ReconnectOptions;
import discord4j.connect.common.Discord4JConnectException;
import io.rsocket.RSocket;
import io.rsocket.RSocketFactory;
import io.rsocket.transport.netty.client.TcpClientTransport;
import java.net.InetSocketAddress;
import java.time.Duration;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.function.Predicate;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.retry.Retry;
import reactor.retry.RetryContext;
import reactor.util.Logger;
import reactor.util.Loggers;

/* loaded from: input_file:discord4j/connect/rsocket/CachedRSocket.class */
public class CachedRSocket extends AtomicReference<Mono<RSocket>> {
    private static final Logger log = Loggers.getLogger(CachedRSocket.class);
    private final InetSocketAddress serverAddress;
    private final Predicate<? super RetryContext<ReconnectContext>> retryPredicate;
    private final ReconnectOptions reconnectOptions;
    private final ReconnectContext reconnectContext;

    public CachedRSocket(InetSocketAddress inetSocketAddress, Predicate<? super RetryContext<ReconnectContext>> predicate, ReconnectOptions reconnectOptions) {
        this.serverAddress = inetSocketAddress;
        this.retryPredicate = predicate;
        this.reconnectOptions = reconnectOptions;
        this.reconnectContext = new ReconnectContext(reconnectOptions.getFirstBackoff(), reconnectOptions.getMaxBackoffInterval());
    }

    private Mono<RSocket> getSocket() {
        return updateAndGet(mono -> {
            return mono != null ? mono : createSocket();
        });
    }

    private Mono<RSocket> createSocket() {
        return RSocketFactory.connect().errorConsumer(th -> {
            log.error("Client error: {}", new Object[]{th.toString()});
        }).transport(TcpClientTransport.create(this.serverAddress)).start().doOnSubscribe(subscription -> {
            log.debug("Connecting to RSocket server: {}", new Object[]{this.serverAddress});
        }).cache(rSocket -> {
            return Duration.ofHours(1L);
        }, th2 -> {
            return Duration.ZERO;
        }, () -> {
            return Duration.ZERO;
        });
    }

    public <T> Flux<T> withSocket(Function<? super RSocket, Publisher<? extends T>> function) {
        return Mono.defer(this::getSocket).flatMap(rSocket -> {
            if (!rSocket.isDisposed()) {
                return Mono.just(rSocket);
            }
            set(null);
            return Mono.error(new Discord4JConnectException("Lost connection to server"));
        }).flatMapMany(function).retryWhen(Retry.onlyIf(this.retryPredicate).retryMax(this.reconnectOptions.getMaxRetries()).backoff(this.reconnectOptions.getBackoff()).jitter(this.reconnectOptions.getJitter()).withApplicationContext(this.reconnectContext).withBackoffScheduler(this.reconnectOptions.getBackoffScheduler()).doOnRetry(retryContext -> {
            set(null);
            log.info("Reconnecting to server: {}", new Object[]{retryContext.exception().toString()});
        }));
    }
}
