package io.strimzi.kafka.bridge;

import io.strimzi.kafka.bridge.config.KafkaConfig;
import io.strimzi.kafka.bridge.tracing.TracingUtil;
import java.util.Properties;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.Serializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/strimzi/kafka/bridge/KafkaBridgeProducer.class */
public class KafkaBridgeProducer<K, V> {
    private final Logger log = LoggerFactory.getLogger(KafkaBridgeConsumer.class);
    private final KafkaConfig kafkaConfig;
    private final Serializer<K> keySerializer;
    private final Serializer<V> valueSerializer;
    private Producer<K, V> producer;

    public KafkaBridgeProducer(KafkaConfig kafkaConfig, Serializer<K> serializer, Serializer<V> serializer2) {
        this.kafkaConfig = kafkaConfig;
        this.keySerializer = serializer;
        this.valueSerializer = serializer2;
    }

    public CompletionStage<RecordMetadata> send(ProducerRecord<K, V> producerRecord) {
        CompletableFuture completableFuture = new CompletableFuture();
        this.log.trace("Send thread {}", Thread.currentThread());
        this.log.debug("Sending record {}", producerRecord);
        this.producer.send(producerRecord, (recordMetadata, exc) -> {
            this.log.trace("Kafka client callback thread {}", Thread.currentThread());
            this.log.debug("Sent record {} at offset {}", producerRecord, Long.valueOf(recordMetadata.offset()));
            if (exc == null) {
                completableFuture.complete(recordMetadata);
            } else {
                completableFuture.completeExceptionally(exc);
            }
        });
        return completableFuture;
    }

    public void sendIgnoreResult(ProducerRecord<K, V> producerRecord) {
        this.log.trace("Send ignore result thread {}", Thread.currentThread());
        this.log.debug("Sending record {}", producerRecord);
        this.producer.send(producerRecord);
    }

    public void create() {
        Properties properties = new Properties();
        properties.putAll(this.kafkaConfig.getConfig());
        properties.putAll(this.kafkaConfig.getProducerConfig().getConfig());
        TracingUtil.getTracing().addTracingPropsToProducerConfig(properties);
        this.producer = new KafkaProducer(properties, this.keySerializer, this.valueSerializer);
    }

    public void close() {
        if (this.producer != null) {
            this.producer.close();
        }
    }
}
