package discord4j.connect.rsocket.gateway;

import discord4j.common.retry.ReconnectOptions;
import discord4j.connect.common.ConnectPayload;
import discord4j.connect.common.PayloadSink;
import discord4j.connect.common.SinkMapper;
import discord4j.connect.rsocket.CachedRSocket;
import io.rsocket.Payload;
import java.net.InetSocketAddress;
import reactor.core.publisher.Flux;
import reactor.util.Logger;
import reactor.util.Loggers;

/* loaded from: input_file:discord4j/connect/rsocket/gateway/RSocketPayloadSink.class */
public class RSocketPayloadSink implements PayloadSink {
    private static final Logger log = Loggers.getLogger(RSocketPayloadSink.class);
    private final CachedRSocket socket;
    private final SinkMapper<Payload> mapper;

    public RSocketPayloadSink(InetSocketAddress inetSocketAddress, SinkMapper<Payload> sinkMapper) {
        this.socket = new CachedRSocket(inetSocketAddress, retryContext -> {
            return true;
        }, ReconnectOptions.create());
        this.mapper = sinkMapper;
    }

    public Flux<?> send(Flux<ConnectPayload> flux) {
        return this.socket.withSocket(rSocket -> {
            SinkMapper<Payload> sinkMapper = this.mapper;
            sinkMapper.getClass();
            return rSocket.requestChannel(flux.flatMap(sinkMapper::apply));
        }).doOnError(th -> {
            log.error("Send failed", th);
        }).doOnSubscribe(subscription -> {
            log.info("Begin sending to server");
        }).doFinally(signalType -> {
            log.info("Sender completed after {}", new Object[]{signalType});
        });
    }
}
