package org.apache.flink.streaming.connectors.kafka;

import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.regex.Pattern;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
import org.apache.flink.streaming.connectors.kafka.config.OffsetCommitMode;
import org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher;
import org.apache.flink.streaming.connectors.kafka.internals.AbstractPartitionDiscoverer;
import org.apache.flink.streaming.connectors.kafka.internals.Kafka08Fetcher;
import org.apache.flink.streaming.connectors.kafka.internals.Kafka08PartitionDiscoverer;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicsDescriptor;
import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.PropertiesUtil;
import org.apache.flink.util.SerializedValue;

@PublicEvolving
/* loaded from: input_file:org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.class */
public class FlinkKafkaConsumer08<T> extends FlinkKafkaConsumerBase<T> {
    private static final long serialVersionUID = -6272159445203409112L;
    public static final String GET_PARTITIONS_RETRIES_KEY = "flink.get-partitions.retry";
    public static final int DEFAULT_GET_PARTITIONS_RETRIES = 3;
    private final Properties kafkaProperties;

    public FlinkKafkaConsumer08(String str, DeserializationSchema<T> deserializationSchema, Properties properties) {
        this((List<String>) Collections.singletonList(str), deserializationSchema, properties);
    }

    public FlinkKafkaConsumer08(String str, KeyedDeserializationSchema<T> keyedDeserializationSchema, Properties properties) {
        this((List<String>) Collections.singletonList(str), keyedDeserializationSchema, properties);
    }

    public FlinkKafkaConsumer08(List<String> list, DeserializationSchema<T> deserializationSchema, Properties properties) {
        this(list, (KeyedDeserializationSchema) new KeyedDeserializationSchemaWrapper(deserializationSchema), properties);
    }

    public FlinkKafkaConsumer08(List<String> list, KeyedDeserializationSchema<T> keyedDeserializationSchema, Properties properties) {
        this(list, null, keyedDeserializationSchema, properties);
    }

    @PublicEvolving
    public FlinkKafkaConsumer08(Pattern pattern, DeserializationSchema<T> deserializationSchema, Properties properties) {
        this(pattern, (KeyedDeserializationSchema) new KeyedDeserializationSchemaWrapper(deserializationSchema), properties);
    }

    @PublicEvolving
    public FlinkKafkaConsumer08(Pattern pattern, KeyedDeserializationSchema<T> keyedDeserializationSchema, Properties properties) {
        this(null, pattern, keyedDeserializationSchema, properties);
    }

    private FlinkKafkaConsumer08(List<String> list, Pattern pattern, KeyedDeserializationSchema<T> keyedDeserializationSchema, Properties properties) {
        super(list, pattern, keyedDeserializationSchema, PropertiesUtil.getLong((Properties) Preconditions.checkNotNull(properties, "props"), "flink.partition-discovery.interval-millis", Long.MIN_VALUE), !PropertiesUtil.getBoolean(properties, "flink.disable-metrics", false));
        this.kafkaProperties = properties;
        validateZooKeeperConfig(properties);
        validateAutoOffsetResetValue(properties);
    }

    protected AbstractFetcher<T, ?> createFetcher(SourceFunction.SourceContext<T> sourceContext, Map<KafkaTopicPartition, Long> map, SerializedValue<AssignerWithPeriodicWatermarks<T>> serializedValue, SerializedValue<AssignerWithPunctuatedWatermarks<T>> serializedValue2, StreamingRuntimeContext streamingRuntimeContext, OffsetCommitMode offsetCommitMode, MetricGroup metricGroup, boolean z) throws Exception {
        return new Kafka08Fetcher(sourceContext, map, serializedValue, serializedValue2, streamingRuntimeContext, this.deserializer, this.kafkaProperties, offsetCommitMode == OffsetCommitMode.KAFKA_PERIODIC ? PropertiesUtil.getLong(this.kafkaProperties, "auto.commit.interval.ms", 60000L) : -1L, metricGroup, z);
    }

    protected AbstractPartitionDiscoverer createPartitionDiscoverer(KafkaTopicsDescriptor kafkaTopicsDescriptor, int i, int i2) {
        return new Kafka08PartitionDiscoverer(kafkaTopicsDescriptor, i, i2, this.kafkaProperties);
    }

    protected boolean getIsAutoCommitEnabled() {
        return PropertiesUtil.getBoolean(this.kafkaProperties, "auto.commit.enable", true) && PropertiesUtil.getLong(this.kafkaProperties, "auto.commit.interval.ms", 60000L) > 0;
    }

    protected Map<KafkaTopicPartition, Long> fetchOffsetsWithTimestamp(Collection<KafkaTopicPartition> collection, long j) {
        throw new UnsupportedOperationException("Fetching partition offsets using timestamps is only supported in Kafka versions 0.10 and above.");
    }

    protected static void validateZooKeeperConfig(Properties properties) {
        if (properties.getProperty("zookeeper.connect") == null) {
            throw new IllegalArgumentException("Required property 'zookeeper.connect' has not been set in the properties");
        }
        if (properties.getProperty("group.id") == null) {
            throw new IllegalArgumentException("Required property 'group.id' has not been set in the properties");
        }
        try {
            Integer.parseInt(properties.getProperty("zookeeper.session.timeout.ms", "0"));
            try {
                Integer.parseInt(properties.getProperty("zookeeper.connection.timeout.ms", "0"));
            } catch (NumberFormatException e) {
                throw new IllegalArgumentException("Property 'zookeeper.connection.timeout.ms' is not a valid integer");
            }
        } catch (NumberFormatException e2) {
            throw new IllegalArgumentException("Property 'zookeeper.session.timeout.ms' is not a valid integer");
        }
    }

    private static void validateAutoOffsetResetValue(Properties properties) {
        String property = properties.getProperty("auto.offset.reset", "largest");
        if (!property.equals("largest") && !property.equals("latest") && !property.equals("earliest") && !property.equals("smallest")) {
            throw new IllegalArgumentException("Cannot use 'auto.offset.reset' value '" + property + "'. Possible values: 'latest', 'largest', 'earliest', or 'smallest'.");
        }
    }
}
