package cz.o2.proxima.direct.kafka;

import cz.o2.proxima.internal.shaded.com.google.common.annotations.VisibleForTesting;
import cz.o2.proxima.kafka.shaded.org.apache.kafka.clients.consumer.ConsumerConfig;
import cz.o2.proxima.kafka.shaded.org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import cz.o2.proxima.kafka.shaded.org.apache.kafka.clients.consumer.KafkaConsumer;
import cz.o2.proxima.kafka.shaded.org.apache.kafka.common.TopicPartition;
import cz.o2.proxima.kafka.shaded.org.apache.kafka.common.serialization.Serde;
import cz.o2.proxima.storage.Partition;
import cz.o2.proxima.storage.commitlog.Position;
import java.net.URI;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Properties;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import lombok.Generated;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:cz/o2/proxima/direct/kafka/KafkaConsumerFactory.class */
public class KafkaConsumerFactory<K, V> {

    @Generated
    private static final Logger log = LoggerFactory.getLogger((Class<?>) KafkaConsumerFactory.class);
    private final URI uri;

    @Nullable
    private final String topic;

    @Nullable
    private final String topicPattern;
    private final Properties props;
    private final Serde<K> keySerde;
    private final Serde<V> valueSerde;

    /* JADX INFO: Access modifiers changed from: package-private */
    public KafkaConsumerFactory(URI uri, Properties properties, Serde<K> serde, Serde<V> serde2) {
        this.uri = uri;
        this.props = properties;
        this.topic = Utils.topic(uri);
        this.topicPattern = Utils.topicPattern(uri);
        this.keySerde = serde;
        this.valueSerde = serde2;
    }

    public KafkaConsumer<K, V> create(String str, Position position, @Nullable ConsumerRebalanceListener consumerRebalanceListener) {
        log.debug("Creating named consumer with name {} and listener {}", str, consumerRebalanceListener);
        Properties clone = clone(this.props);
        clone.put("bootstrap.servers", this.uri.getAuthority());
        clone.put("group.id", str);
        clone.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 0);
        clone.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        updateAutoOffsetReset(position, clone, false);
        clone.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, this.keySerde.deserializer().getClass());
        clone.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, this.valueSerde.deserializer().getClass());
        KafkaConsumer<K, V> kafkaConsumer = new KafkaConsumer<>(clone);
        if (this.topic == null) {
            Pattern compile = Pattern.compile((String) Objects.requireNonNull(this.topicPattern));
            if (consumerRebalanceListener == null) {
                kafkaConsumer.subscribe(compile);
            } else {
                kafkaConsumer.subscribe(compile, consumerRebalanceListener);
            }
        } else if (consumerRebalanceListener == null) {
            kafkaConsumer.subscribe(Collections.singletonList(this.topic));
        } else {
            kafkaConsumer.subscribe(Collections.singletonList(this.topic), consumerRebalanceListener);
        }
        return kafkaConsumer;
    }

    public KafkaConsumer<K, V> create(String str) {
        return create(str, Position.NEWEST, null);
    }

    public KafkaConsumer<K, V> create(Position position, Collection<Partition> collection) {
        log.debug("Creating unnamed consumer for partitions {}", collection);
        Properties clone = clone(this.props);
        clone.put("bootstrap.servers", this.uri.getAuthority());
        clone.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        clone.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, this.keySerde.deserializer().getClass());
        clone.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, this.valueSerde.deserializer().getClass());
        updateAutoOffsetReset(position, clone, true);
        KafkaConsumer<K, V> kafkaConsumer = new KafkaConsumer<>(clone);
        kafkaConsumer.assign((List) collection.stream().map(partition -> {
            return new TopicPartition(this.topic, partition.getId());
        }).collect(Collectors.toList()));
        return kafkaConsumer;
    }

    public KafkaConsumer<K, V> create() {
        log.debug("Creating unnamed consumer for all partitions of topic {}", this.topic);
        Properties clone = clone(this.props);
        clone.put("bootstrap.servers", this.uri.getAuthority());
        clone.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        clone.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, this.keySerde.deserializer().getClass());
        clone.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, this.valueSerde.deserializer().getClass());
        KafkaConsumer<K, V> kafkaConsumer = new KafkaConsumer<>(clone);
        kafkaConsumer.assign((List) kafkaConsumer.partitionsFor(this.topic).stream().map(partitionInfo -> {
            return new TopicPartition(this.topic, partitionInfo.partition());
        }).collect(Collectors.toList()));
        return kafkaConsumer;
    }

    private Properties clone(Properties properties) {
        Properties properties2 = new Properties();
        properties2.getClass();
        properties.forEach(properties2::put);
        return properties2;
    }

    @VisibleForTesting
    static void updateAutoOffsetReset(Position position, Properties properties, boolean z) {
        if (properties.containsKey(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG)) {
            return;
        }
        if (position == Position.OLDEST) {
            properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        } else if (z && position == Position.CURRENT) {
            properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "none");
        } else {
            properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
        }
    }
}
