package discord4j.connect.rsocket.gateway;

import discord4j.common.retry.ReconnectOptions;
import discord4j.connect.common.ConnectPayload;
import discord4j.connect.common.PayloadSource;
import discord4j.connect.common.SourceMapper;
import discord4j.connect.rsocket.CachedRSocket;
import io.rsocket.Payload;
import io.rsocket.util.DefaultPayload;
import java.net.InetSocketAddress;
import java.util.function.Function;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.UnicastProcessor;
import reactor.util.Logger;
import reactor.util.Loggers;

/* loaded from: input_file:discord4j/connect/rsocket/gateway/RSocketPayloadSource.class */
public class RSocketPayloadSource implements PayloadSource {
    private static final Logger log = Loggers.getLogger(RSocketPayloadSource.class);
    private final SourceMapper<Payload> mapper;
    private final Flux<Payload> inbound;

    public RSocketPayloadSource(InetSocketAddress inetSocketAddress, String str, SourceMapper<Payload> sourceMapper) {
        CachedRSocket cachedRSocket = new CachedRSocket(inetSocketAddress, retryContext -> {
            return true;
        }, ReconnectOptions.create());
        this.mapper = sourceMapper;
        this.inbound = cachedRSocket.withSocket(rSocket -> {
            UnicastProcessor create = UnicastProcessor.create();
            create.onNext(DefaultPayload.create("START", "consume:" + str));
            return rSocket.requestChannel(create).doOnNext(payload -> {
                create.onNext(DefaultPayload.create("ACK"));
            });
        }).doOnSubscribe(subscription -> {
            log.info("Begin receiving from server");
        }).doFinally(signalType -> {
            log.info("Receiver completed after {}", new Object[]{signalType});
        }).share();
    }

    public Flux<?> receive(Function<ConnectPayload, Mono<Void>> function) {
        Flux<Payload> flux = this.inbound;
        SourceMapper<Payload> sourceMapper = this.mapper;
        sourceMapper.getClass();
        return flux.flatMap((v1) -> {
            return r1.apply(v1);
        }).flatMap(function);
    }
}
