package com.uber.hoodie.utilities.sources;

import com.uber.hoodie.exception.HoodieNotSupportedException;
import com.uber.hoodie.utilities.UtilHelpers;
import com.uber.hoodie.utilities.exception.HoodieDeltaStreamerException;
import com.uber.hoodie.utilities.schema.SchemaProvider;
import java.lang.invoke.SerializedLambda;
import java.nio.charset.Charset;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.Spliterators;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import kafka.common.TopicAndPartition;
import kafka.serializer.DefaultDecoder;
import org.apache.avro.generic.GenericRecord;
import org.apache.commons.configuration.PropertiesConfiguration;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.streaming.kafka.KafkaCluster;
import org.apache.spark.streaming.kafka.KafkaUtils;
import org.apache.spark.streaming.kafka.OffsetRange;
import scala.Predef;
import scala.collection.JavaConverters;
import scala.collection.immutable.Map;
import scala.collection.immutable.Set;
import scala.collection.mutable.ArrayBuffer;
import scala.collection.mutable.StringBuilder;
import scala.util.Either;

/* loaded from: input_file:com/uber/hoodie/utilities/sources/KafkaSource.class */
public class KafkaSource extends Source {
    private static volatile Logger log = LogManager.getLogger(KafkaSource.class);
    private HashMap<String, String> kafkaParams;
    private final String topicName;

    /* loaded from: input_file:com/uber/hoodie/utilities/sources/KafkaSource$CheckpointUtils.class */
    static class CheckpointUtils {
        CheckpointUtils() {
        }

        public static HashMap<TopicAndPartition, KafkaCluster.LeaderOffset> strToOffsets(String str) {
            HashMap<TopicAndPartition, KafkaCluster.LeaderOffset> hashMap = new HashMap<>();
            String[] split = str.split(",");
            String str2 = split[0];
            for (int i = 1; i < split.length; i++) {
                String[] split2 = split[i].split(":");
                hashMap.put(new TopicAndPartition(str2, Integer.parseInt(split2[0])), new KafkaCluster.LeaderOffset("", -1, Long.parseLong(split2[1])));
            }
            return hashMap;
        }

        public static String offsetsToStr(HashMap<TopicAndPartition, KafkaCluster.LeaderOffset> hashMap) {
            StringBuilder stringBuilder = new StringBuilder();
            stringBuilder.append(hashMap.entrySet().stream().findFirst().get().getKey().topic() + ",");
            stringBuilder.append((String) hashMap.entrySet().stream().map(entry -> {
                return String.format("%s:%d", Integer.valueOf(((TopicAndPartition) entry.getKey()).partition()), Long.valueOf(((KafkaCluster.LeaderOffset) entry.getValue()).offset()));
            }).collect(Collectors.joining(",")));
            return stringBuilder.toString();
        }

        public static OffsetRange[] computeOffsetRanges(HashMap<TopicAndPartition, KafkaCluster.LeaderOffset> hashMap, HashMap<TopicAndPartition, KafkaCluster.LeaderOffset> hashMap2) {
            List list = (List) hashMap2.entrySet().stream().map(entry -> {
                TopicAndPartition topicAndPartition = (TopicAndPartition) entry.getKey();
                long j = -1;
                if (hashMap.containsKey(topicAndPartition)) {
                    j = ((KafkaCluster.LeaderOffset) hashMap.get(topicAndPartition)).offset();
                }
                return OffsetRange.create(topicAndPartition, j, ((KafkaCluster.LeaderOffset) entry.getValue()).offset());
            }).sorted((offsetRange, offsetRange2) -> {
                return Integer.valueOf(offsetRange.partition()).compareTo(Integer.valueOf(offsetRange2.partition()));
            }).collect(Collectors.toList());
            return (OffsetRange[]) list.toArray(new OffsetRange[list.size()]);
        }

        public static long totalNewMessages(OffsetRange[] offsetRangeArr) {
            long j = 0;
            for (OffsetRange offsetRange : offsetRangeArr) {
                j += Math.max(offsetRange.untilOffset() - offsetRange.fromOffset(), 0L);
            }
            return j;
        }
    }

    /* loaded from: input_file:com/uber/hoodie/utilities/sources/KafkaSource$Config.class */
    static class Config {
        private static final String KAFKA_TOPIC_NAME = "hoodie.deltastreamer.source.kafka.topic";
        private static final String DEFAULT_AUTO_RESET_OFFSET = "largest";

        Config() {
        }
    }

    /* loaded from: input_file:com/uber/hoodie/utilities/sources/KafkaSource$ScalaHelpers.class */
    static class ScalaHelpers {
        ScalaHelpers() {
        }

        public static <K, V> Map<K, V> toScalaMap(HashMap<K, V> hashMap) {
            return ((scala.collection.mutable.Map) JavaConverters.mapAsScalaMapConverter(hashMap).asScala()).toMap(Predef.conforms());
        }

        public static Set<String> toScalaSet(HashSet<String> hashSet) {
            return ((scala.collection.mutable.Set) JavaConverters.asScalaSetConverter(hashSet).asScala()).toSet();
        }

        public static <K, V> java.util.Map<K, V> toJavaMap(Map<K, V> map) {
            return (java.util.Map) JavaConverters.mapAsJavaMapConverter(map).asJava();
        }
    }

    public KafkaSource(PropertiesConfiguration propertiesConfiguration, JavaSparkContext javaSparkContext, SourceDataFormat sourceDataFormat, SchemaProvider schemaProvider) {
        super(propertiesConfiguration, javaSparkContext, sourceDataFormat, schemaProvider);
        this.kafkaParams = new HashMap<>();
        StreamSupport.stream(Spliterators.spliteratorUnknownSize(propertiesConfiguration.getKeys(), 256), false).forEach(str -> {
            this.kafkaParams.put(str, propertiesConfiguration.getString(str));
        });
        UtilHelpers.checkRequiredProperties(propertiesConfiguration, Arrays.asList("hoodie.deltastreamer.source.kafka.topic"));
        this.topicName = propertiesConfiguration.getString("hoodie.deltastreamer.source.kafka.topic");
    }

    @Override // com.uber.hoodie.utilities.sources.Source
    public Pair<Optional<JavaRDD<GenericRecord>>, String> fetchNewData(Optional<String> optional, long j) {
        HashMap<TopicAndPartition, KafkaCluster.LeaderOffset> hashMap;
        JavaRDD map;
        KafkaCluster kafkaCluster = new KafkaCluster(ScalaHelpers.toScalaMap(this.kafkaParams));
        Either partitions = kafkaCluster.getPartitions(ScalaHelpers.toScalaSet(new HashSet(Arrays.asList(this.topicName))));
        if (partitions.isLeft()) {
            throw new HoodieDeltaStreamerException("Error obtaining partition metadata", (Throwable) ((ArrayBuffer) partitions.left().get()).head());
        }
        Set set = (Set) partitions.right().get();
        if (optional.isPresent()) {
            hashMap = CheckpointUtils.strToOffsets(optional.get());
        } else {
            String string = this.config.getString("auto.offset.reset", "largest");
            if (string.equals("smallest")) {
                hashMap = new HashMap<>((java.util.Map<? extends TopicAndPartition, ? extends KafkaCluster.LeaderOffset>) ScalaHelpers.toJavaMap((Map) kafkaCluster.getEarliestLeaderOffsets(set).right().get()));
            } else {
                if (!string.equals("largest")) {
                    throw new HoodieNotSupportedException("Auto reset value must be one of 'smallest' or 'largest' ");
                }
                hashMap = new HashMap<>((java.util.Map<? extends TopicAndPartition, ? extends KafkaCluster.LeaderOffset>) ScalaHelpers.toJavaMap((Map) kafkaCluster.getLatestLeaderOffsets(set).right().get()));
            }
        }
        HashMap hashMap2 = new HashMap(ScalaHelpers.toJavaMap((Map) kafkaCluster.getLatestLeaderOffsets(set).right().get()));
        OffsetRange[] computeOffsetRanges = CheckpointUtils.computeOffsetRanges(hashMap, hashMap2);
        long j2 = CheckpointUtils.totalNewMessages(computeOffsetRanges);
        if (j2 <= 0) {
            return new ImmutablePair(Optional.empty(), optional.isPresent() ? optional.get() : CheckpointUtils.offsetsToStr(hashMap2));
        }
        log.info("About to read " + j2 + " from Kafka for topic :" + this.topicName);
        JavaRDD values = KafkaUtils.createRDD(this.sparkContext, byte[].class, byte[].class, DefaultDecoder.class, DefaultDecoder.class, this.kafkaParams, computeOffsetRanges).values();
        AvroConvertor avroConvertor = new AvroConvertor(this.schemaProvider.getSourceSchema().toString());
        if (this.dataFormat == SourceDataFormat.AVRO) {
            map = values.map(bArr -> {
                return avroConvertor.fromAvroBinary(bArr);
            });
        } else {
            if (this.dataFormat != SourceDataFormat.JSON) {
                throw new HoodieNotSupportedException("Unsupport data format :" + this.dataFormat);
            }
            map = values.map(bArr2 -> {
                return avroConvertor.fromJson(new String(bArr2, Charset.forName("utf-8")));
            });
        }
        return new ImmutablePair(Optional.of(map), CheckpointUtils.offsetsToStr(hashMap2));
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case 360754470:
                if (implMethodName.equals("lambda$fetchNewData$cdfe36ed$1")) {
                    z = false;
                    break;
                }
                break;
            case 360754471:
                if (implMethodName.equals("lambda$fetchNewData$cdfe36ed$2")) {
                    z = true;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/spark/api/java/function/Function") && serializedLambda.getFunctionalInterfaceMethodName().equals("call") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("com/uber/hoodie/utilities/sources/KafkaSource") && serializedLambda.getImplMethodSignature().equals("(Lcom/uber/hoodie/utilities/sources/AvroConvertor;[B)Lorg/apache/avro/generic/GenericRecord;")) {
                    AvroConvertor avroConvertor = (AvroConvertor) serializedLambda.getCapturedArg(0);
                    return bArr -> {
                        return avroConvertor.fromAvroBinary(bArr);
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/spark/api/java/function/Function") && serializedLambda.getFunctionalInterfaceMethodName().equals("call") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("com/uber/hoodie/utilities/sources/KafkaSource") && serializedLambda.getImplMethodSignature().equals("(Lcom/uber/hoodie/utilities/sources/AvroConvertor;[B)Lorg/apache/avro/generic/GenericRecord;")) {
                    AvroConvertor avroConvertor2 = (AvroConvertor) serializedLambda.getCapturedArg(0);
                    return bArr2 -> {
                        return avroConvertor2.fromJson(new String(bArr2, Charset.forName("utf-8")));
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
