package discord4j.connect.rsocket.gateway;

import io.rsocket.AbstractRSocket;
import io.rsocket.Payload;
import io.rsocket.RSocket;
import io.rsocket.core.RSocketServer;
import io.rsocket.transport.netty.server.CloseableChannel;
import io.rsocket.transport.netty.server.TcpServerTransport;
import io.rsocket.util.DefaultPayload;
import java.net.InetSocketAddress;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Function;
import org.reactivestreams.Publisher;
import reactor.core.publisher.EmitterProcessor;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxProcessor;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Mono;
import reactor.extra.processor.WorkQueueProcessor;
import reactor.util.Logger;
import reactor.util.Loggers;

/* loaded from: input_file:discord4j/connect/rsocket/gateway/RSocketPayloadServer.class */
public class RSocketPayloadServer {
    private static final Logger log = Loggers.getLogger(RSocketPayloadServer.class);
    private final TcpServerTransport serverTransport;
    private final Function<String, FluxProcessor<Payload, Payload>> processorFactory;
    private final Map<String, FluxProcessor<Payload, Payload>> queues;
    private final Map<String, FluxSink<Payload>> sinks;

    public RSocketPayloadServer(InetSocketAddress inetSocketAddress) {
        this(inetSocketAddress, str -> {
            if (str.contains("outbound")) {
                log.info("Creating fanout queue: {}", new Object[]{str});
                return EmitterProcessor.create(1024, false);
            }
            log.info("Creating work queue: {}", new Object[]{str});
            return WorkQueueProcessor.builder().autoCancel(false).share(true).name(str).bufferSize(1024).build();
        });
    }

    public RSocketPayloadServer(InetSocketAddress inetSocketAddress, Function<String, FluxProcessor<Payload, Payload>> function) {
        this.queues = new ConcurrentHashMap();
        this.sinks = new ConcurrentHashMap();
        this.serverTransport = TcpServerTransport.create(inetSocketAddress);
        this.processorFactory = function;
    }

    public Mono<CloseableChannel> start() {
        return RSocketServer.create((connectionSetupPayload, rSocket) -> {
            return Mono.just(leaderAcceptor(rSocket));
        }).bind(this.serverTransport);
    }

    private RSocket leaderAcceptor(final RSocket rSocket) {
        return new AbstractRSocket() { // from class: discord4j.connect.rsocket.gateway.RSocketPayloadServer.1
            static final /* synthetic */ boolean $assertionsDisabled;

            public Flux<Payload> requestChannel(Publisher<Payload> publisher) {
                String hexString = Integer.toHexString(System.identityHashCode(rSocket));
                Flux from = Flux.from(publisher);
                RSocket rSocket2 = rSocket;
                return from.switchOnFirst((signal, flux) -> {
                    if (signal.hasValue()) {
                        Payload payload = (Payload) signal.get();
                        if (!$assertionsDisabled && payload == null) {
                            throw new AssertionError();
                        }
                        String[] command = getCommand(payload);
                        String str = command[0];
                        String str2 = command[1];
                        if (str.equals("produce")) {
                            return flux.doOnSubscribe(subscription -> {
                                RSocketPayloadServer.log.debug("[{}] Producing to {}", new Object[]{hexString, str2});
                            }).map(payload2 -> {
                                RSocketPayloadServer.log.trace("[{}] Produce to {}: {}", new Object[]{hexString, str2, payload2.getDataUtf8()});
                                getSink(str2).next(payload2);
                                return DefaultPayload.create("OK", str2);
                            });
                        }
                        if (str.equals("consume")) {
                            return Flux.defer(() -> {
                                return getQueue(str2);
                            }).limitRate(1).zipWith(flux).map((v0) -> {
                                return v0.getT1();
                            }).doOnSubscribe(subscription2 -> {
                                RSocketPayloadServer.log.debug("[{}] Consuming from {}", new Object[]{hexString, str2});
                            }).doOnNext(payload3 -> {
                                if (rSocket2.availability() < 1.0d) {
                                    throw new IllegalStateException("Consumer is unavailable");
                                }
                            });
                        }
                    }
                    return Flux.error(new IllegalArgumentException("Invalid routing: must be produce, consume"));
                }).doFinally(signalType -> {
                    RSocketPayloadServer.log.info("[{}] Terminating channel after {}", new Object[]{hexString, signalType});
                });
            }

            private String[] getCommand(Payload payload) {
                if (!payload.hasMetadata()) {
                    throw new IllegalArgumentException("Missing topic metadata");
                }
                String[] split = payload.getMetadataUtf8().split(":", 2);
                if (split.length != 2 || split[1].isEmpty()) {
                    throw new IllegalArgumentException("Invalid topic metadata");
                }
                return split;
            }

            private FluxProcessor<Payload, Payload> getQueue(String str) {
                return (FluxProcessor) RSocketPayloadServer.this.queues.computeIfAbsent(str, RSocketPayloadServer.this.processorFactory);
            }

            private FluxSink<Payload> getSink(String str) {
                return (FluxSink) RSocketPayloadServer.this.sinks.computeIfAbsent(str, str2 -> {
                    return getQueue(str2).sink(FluxSink.OverflowStrategy.LATEST);
                });
            }

            static {
                $assertionsDisabled = !RSocketPayloadServer.class.desiredAssertionStatus();
            }
        };
    }
}
