package de.otto.synapse.endpoint.sender.kafka;

import de.otto.synapse.endpoint.MessageInterceptorRegistry;
import de.otto.synapse.endpoint.sender.AbstractMessageSenderEndpoint;
import de.otto.synapse.message.Message;
import de.otto.synapse.message.TextMessage;
import de.otto.synapse.translator.MessageFormat;
import de.otto.synapse.translator.MessageTranslator;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Stream;
import javax.annotation.Nonnull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.scheduling.annotation.Scheduled;

/* loaded from: input_file:de/otto/synapse/endpoint/sender/kafka/KafkaMessageSender.class */
public class KafkaMessageSender extends AbstractMessageSenderEndpoint {
    private static final Logger LOG = LoggerFactory.getLogger(KafkaMessageSender.class);
    public static final long UPDATE_PARTITION_DELAY = 10000;
    private final KafkaTemplate<String, String> kafkaTemplate;
    private final AtomicReference<KafkaEncoder> encoder;

    public KafkaMessageSender(String str, MessageInterceptorRegistry messageInterceptorRegistry, MessageTranslator<TextMessage> messageTranslator, KafkaTemplate<String, String> kafkaTemplate) {
        super(str, messageInterceptorRegistry, messageTranslator);
        this.encoder = new AtomicReference<>();
        this.kafkaTemplate = kafkaTemplate;
    }

    @Scheduled(initialDelay = UPDATE_PARTITION_DELAY, fixedDelay = UPDATE_PARTITION_DELAY)
    public void updatePartitions() {
        this.encoder.set(createEncoder());
    }

    protected CompletableFuture<Void> doSend(@Nonnull TextMessage textMessage) {
        this.encoder.updateAndGet(kafkaEncoder -> {
            return kafkaEncoder != null ? kafkaEncoder : createEncoder();
        });
        return CompletableFuture.allOf(this.kafkaTemplate.send(this.encoder.get().apply((Message<String>) textMessage)).completable());
    }

    protected CompletableFuture<Void> doSendBatch(@Nonnull Stream<TextMessage> stream) {
        return CompletableFuture.allOf((CompletableFuture[]) stream.map(this::doSend).toArray(i -> {
            return new CompletableFuture[i];
        }));
    }

    public MessageFormat getMessageFormat() {
        return MessageFormat.V1;
    }

    private KafkaEncoder createEncoder() {
        return new KafkaEncoder(getChannelName(), this.kafkaTemplate.partitionsFor(getChannelName()).size());
    }
}
