package com.facebook.presto.kafka;

import com.facebook.presto.spi.ConnectorSession;
import com.facebook.presto.spi.ConnectorSplitSource;
import com.facebook.presto.spi.ConnectorTableLayoutHandle;
import com.facebook.presto.spi.FixedSplitSource;
import com.facebook.presto.spi.HostAddress;
import com.facebook.presto.spi.PrestoException;
import com.facebook.presto.spi.connector.ConnectorSplitManager;
import com.facebook.presto.spi.connector.ConnectorTransactionHandle;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import io.airlift.log.Logger;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ThreadLocalRandom;
import javax.inject.Inject;
import kafka.api.PartitionOffsetRequestInfo;
import kafka.cluster.Broker;
import kafka.common.TopicAndPartition;
import kafka.javaapi.OffsetRequest;
import kafka.javaapi.OffsetResponse;
import kafka.javaapi.PartitionMetadata;
import kafka.javaapi.TopicMetadata;
import kafka.javaapi.TopicMetadataRequest;
import kafka.javaapi.TopicMetadataResponse;
import kafka.javaapi.consumer.SimpleConsumer;

/* loaded from: input_file:com/facebook/presto/kafka/KafkaSplitManager.class */
public class KafkaSplitManager implements ConnectorSplitManager {
    private static final Logger log = Logger.get(KafkaSplitManager.class);
    private final String connectorId;
    private final KafkaSimpleConsumerManager consumerManager;
    private final Set<HostAddress> nodes;

    @Inject
    public KafkaSplitManager(KafkaConnectorId kafkaConnectorId, KafkaConnectorConfig kafkaConnectorConfig, KafkaSimpleConsumerManager kafkaSimpleConsumerManager) {
        this.connectorId = ((KafkaConnectorId) Objects.requireNonNull(kafkaConnectorId, "connectorId is null")).toString();
        this.consumerManager = (KafkaSimpleConsumerManager) Objects.requireNonNull(kafkaSimpleConsumerManager, "consumerManager is null");
        Objects.requireNonNull(kafkaConnectorConfig, "kafkaConfig is null");
        this.nodes = ImmutableSet.copyOf(kafkaConnectorConfig.getNodes());
    }

    public ConnectorSplitSource getSplits(ConnectorTransactionHandle connectorTransactionHandle, ConnectorSession connectorSession, ConnectorTableLayoutHandle connectorTableLayoutHandle, ConnectorSplitManager.SplitSchedulingStrategy splitSchedulingStrategy) {
        KafkaTableHandle table = KafkaHandleResolver.convertLayout(connectorTableLayoutHandle).getTable();
        TopicMetadataResponse send = this.consumerManager.getConsumer((HostAddress) selectRandom(this.nodes)).send(new TopicMetadataRequest(ImmutableList.of(table.getTopicName())));
        ImmutableList.Builder builder = ImmutableList.builder();
        for (TopicMetadata topicMetadata : send.topicsMetadata()) {
            for (PartitionMetadata partitionMetadata : topicMetadata.partitionsMetadata()) {
                log.debug("Adding Partition %s/%s", new Object[]{topicMetadata.topic(), Integer.valueOf(partitionMetadata.partitionId())});
                Broker leader = partitionMetadata.leader();
                if (leader == null) {
                    log.warn("No leader for partition %s/%s found!", new Object[]{topicMetadata.topic(), Integer.valueOf(partitionMetadata.partitionId())});
                } else {
                    HostAddress fromParts = HostAddress.fromParts(leader.host(), leader.port());
                    long[] findAllOffsets = findAllOffsets(this.consumerManager.getConsumer(fromParts), topicMetadata.topic(), partitionMetadata.partitionId());
                    for (int length = findAllOffsets.length - 1; length > 0; length--) {
                        builder.add(new KafkaSplit(this.connectorId, topicMetadata.topic(), table.getKeyDataFormat(), table.getMessageDataFormat(), partitionMetadata.partitionId(), findAllOffsets[length], findAllOffsets[length - 1], fromParts));
                    }
                }
            }
        }
        return new FixedSplitSource(builder.build());
    }

    private static long[] findAllOffsets(SimpleConsumer simpleConsumer, String str, int i) {
        OffsetResponse offsetsBefore = simpleConsumer.getOffsetsBefore(new OffsetRequest(ImmutableMap.of(new TopicAndPartition(str, i), new PartitionOffsetRequestInfo(kafka.api.OffsetRequest.LatestTime(), Integer.MAX_VALUE)), kafka.api.OffsetRequest.CurrentVersion(), simpleConsumer.clientId()));
        if (!offsetsBefore.hasError()) {
            return offsetsBefore.offsets(str, i);
        }
        short errorCode = offsetsBefore.errorCode(str, i);
        log.warn("Offset response has error: %d", new Object[]{Short.valueOf(errorCode)});
        throw new PrestoException(KafkaErrorCode.KAFKA_SPLIT_ERROR, "could not fetch data from Kafka, error code is '" + ((int) errorCode) + "'");
    }

    private static <T> T selectRandom(Iterable<T> iterable) {
        ImmutableList copyOf = ImmutableList.copyOf(iterable);
        return (T) copyOf.get(ThreadLocalRandom.current().nextInt(copyOf.size()));
    }
}
