package discord4j.connect.rsocket.shard;

import discord4j.common.operator.RateLimitOperator;
import io.rsocket.AbstractRSocket;
import io.rsocket.Payload;
import io.rsocket.core.RSocketServer;
import io.rsocket.transport.netty.server.CloseableChannel;
import io.rsocket.transport.netty.server.TcpServerTransport;
import io.rsocket.util.DefaultPayload;
import java.net.InetSocketAddress;
import java.time.Duration;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
import reactor.util.Logger;
import reactor.util.Loggers;

/* loaded from: input_file:discord4j/connect/rsocket/shard/RSocketShardCoordinatorServer.class */
public class RSocketShardCoordinatorServer {
    private static final Logger log = Loggers.getLogger(RSocketShardCoordinatorServer.class);
    private final TcpServerTransport serverTransport;
    private final AtomicInteger connected = new AtomicInteger();

    public RSocketShardCoordinatorServer(InetSocketAddress inetSocketAddress) {
        this.serverTransport = TcpServerTransport.create(inetSocketAddress);
    }

    public Mono<CloseableChannel> start() {
        ConcurrentHashMap concurrentHashMap = new ConcurrentHashMap(1);
        return RSocketServer.create((connectionSetupPayload, rSocket) -> {
            return Mono.just(socketAcceptor(concurrentHashMap));
        }).bind(this.serverTransport);
    }

    private AbstractRSocket socketAcceptor(final Map<String, RateLimitOperator<Payload>> map) {
        return new AbstractRSocket() { // from class: discord4j.connect.rsocket.shard.RSocketShardCoordinatorServer.1
            public Mono<Payload> requestResponse(Payload payload) {
                String dataUtf8 = payload.getDataUtf8();
                RSocketShardCoordinatorServer.log.debug("[request_response] >: {}", new Object[]{dataUtf8});
                if (dataUtf8.startsWith("identify")) {
                    return Mono.just(DefaultPayload.create("identify.success")).transform((RateLimitOperator) map.computeIfAbsent(dataUtf8.split("\\.")[1], str -> {
                        return new RateLimitOperator(1, Duration.ofSeconds(6L), Schedulers.parallel());
                    }));
                }
                if (!dataUtf8.equals("request.connected")) {
                    return Mono.empty();
                }
                AtomicInteger atomicInteger = RSocketShardCoordinatorServer.this.connected;
                atomicInteger.getClass();
                return Mono.fromCallable(atomicInteger::get).map(num -> {
                    return DefaultPayload.create(String.valueOf(num));
                });
            }

            public Mono<Void> fireAndForget(Payload payload) {
                String dataUtf8 = payload.getDataUtf8();
                RSocketShardCoordinatorServer.log.debug("[request_fnf] >: {}", new Object[]{dataUtf8});
                if (dataUtf8.equals("notify.connected")) {
                    AtomicInteger atomicInteger = RSocketShardCoordinatorServer.this.connected;
                    atomicInteger.getClass();
                    return Mono.fromCallable(atomicInteger::incrementAndGet).then();
                }
                if (!dataUtf8.equals("notify.disconnected")) {
                    return Mono.empty();
                }
                AtomicInteger atomicInteger2 = RSocketShardCoordinatorServer.this.connected;
                atomicInteger2.getClass();
                return Mono.fromCallable(atomicInteger2::decrementAndGet).then();
            }
        };
    }
}
