package com.facebook.presto.kafka;

import com.facebook.presto.spi.ColumnHandle;
import com.facebook.presto.spi.ConnectorPartition;
import com.facebook.presto.spi.ConnectorPartitionResult;
import com.facebook.presto.spi.ConnectorSession;
import com.facebook.presto.spi.ConnectorSplitManager;
import com.facebook.presto.spi.ConnectorSplitSource;
import com.facebook.presto.spi.ConnectorTableHandle;
import com.facebook.presto.spi.FixedSplitSource;
import com.facebook.presto.spi.HostAddress;
import com.facebook.presto.spi.PrestoException;
import com.facebook.presto.spi.TableNotFoundException;
import com.facebook.presto.spi.TupleDomain;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
import io.airlift.log.Logger;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import javax.inject.Inject;
import kafka.api.PartitionOffsetRequestInfo;
import kafka.cluster.Broker;
import kafka.common.TopicAndPartition;
import kafka.javaapi.OffsetRequest;
import kafka.javaapi.OffsetResponse;
import kafka.javaapi.PartitionMetadata;
import kafka.javaapi.TopicMetadata;
import kafka.javaapi.TopicMetadataRequest;
import kafka.javaapi.TopicMetadataResponse;
import kafka.javaapi.consumer.SimpleConsumer;

/* loaded from: input_file:com/facebook/presto/kafka/KafkaSplitManager.class */
public class KafkaSplitManager implements ConnectorSplitManager {
    private static final Logger log = Logger.get(KafkaSplitManager.class);
    private final String connectorId;
    private final KafkaHandleResolver handleResolver;
    private final KafkaSimpleConsumerManager consumerManager;
    private final Set<HostAddress> nodes;

    @Inject
    public KafkaSplitManager(KafkaConnectorId kafkaConnectorId, KafkaConnectorConfig kafkaConnectorConfig, KafkaHandleResolver kafkaHandleResolver, KafkaSimpleConsumerManager kafkaSimpleConsumerManager) {
        this.connectorId = ((KafkaConnectorId) Objects.requireNonNull(kafkaConnectorId, "connectorId is null")).toString();
        this.handleResolver = (KafkaHandleResolver) Objects.requireNonNull(kafkaHandleResolver, "handleResolver is null");
        this.consumerManager = (KafkaSimpleConsumerManager) Objects.requireNonNull(kafkaSimpleConsumerManager, "consumerManager is null");
        Objects.requireNonNull(kafkaConnectorConfig, "kafkaConfig is null");
        this.nodes = ImmutableSet.copyOf(kafkaConnectorConfig.getNodes());
    }

    public ConnectorPartitionResult getPartitions(ConnectorSession connectorSession, ConnectorTableHandle connectorTableHandle, TupleDomain<ColumnHandle> tupleDomain) {
        KafkaTableHandle convertTableHandle = this.handleResolver.convertTableHandle(connectorTableHandle);
        ArrayList arrayList = new ArrayList(this.nodes);
        Collections.shuffle(arrayList);
        try {
            TopicMetadataResponse send = this.consumerManager.getConsumer((HostAddress) arrayList.get(0)).send(new TopicMetadataRequest(ImmutableList.of(convertTableHandle.getTopicName())));
            ImmutableList.Builder builder = ImmutableList.builder();
            for (TopicMetadata topicMetadata : send.topicsMetadata()) {
                for (PartitionMetadata partitionMetadata : topicMetadata.partitionsMetadata()) {
                    log.debug("Adding Partition %s/%s", new Object[]{topicMetadata.topic(), Integer.valueOf(partitionMetadata.partitionId())});
                    Broker leader = partitionMetadata.leader();
                    if (leader == null) {
                        log.warn("No leader for partition %s/%s found!", new Object[]{topicMetadata.topic(), Integer.valueOf(partitionMetadata.partitionId())});
                    } else {
                        builder.add(new KafkaPartition(topicMetadata.topic(), partitionMetadata.partitionId(), HostAddress.fromParts(leader.host(), leader.port()), ImmutableList.copyOf(Lists.transform(partitionMetadata.isr(), KafkaSplitManager::brokerToHostAddress))));
                    }
                }
            }
            return new ConnectorPartitionResult(builder.build(), tupleDomain);
        } catch (Exception e) {
            throw new TableNotFoundException(convertTableHandle.toSchemaTableName(), e);
        }
    }

    public ConnectorSplitSource getPartitionSplits(ConnectorSession connectorSession, ConnectorTableHandle connectorTableHandle, List<ConnectorPartition> list) {
        KafkaTableHandle convertTableHandle = this.handleResolver.convertTableHandle(connectorTableHandle);
        ImmutableList.Builder builder = ImmutableList.builder();
        for (ConnectorPartition connectorPartition : list) {
            Preconditions.checkState(connectorPartition instanceof KafkaPartition, "Found an unknown partition type: %s", new Object[]{connectorPartition.getClass().getSimpleName()});
            KafkaPartition kafkaPartition = (KafkaPartition) connectorPartition;
            long[] findAllOffsets = findAllOffsets(this.consumerManager.getConsumer(kafkaPartition.getPartitionLeader()), kafkaPartition);
            for (int length = findAllOffsets.length - 1; length > 0; length--) {
                builder.add(new KafkaSplit(this.connectorId, kafkaPartition.getTopicName(), convertTableHandle.getKeyDataFormat(), convertTableHandle.getMessageDataFormat(), kafkaPartition.getPartitionIdAsInt(), findAllOffsets[length], findAllOffsets[length - 1], kafkaPartition.getPartitionNodes()));
            }
        }
        return new FixedSplitSource(this.connectorId, builder.build());
    }

    private static long[] findAllOffsets(SimpleConsumer simpleConsumer, KafkaPartition kafkaPartition) {
        OffsetResponse offsetsBefore = simpleConsumer.getOffsetsBefore(new OffsetRequest(ImmutableMap.of(new TopicAndPartition(kafkaPartition.getTopicName(), kafkaPartition.getPartitionIdAsInt()), new PartitionOffsetRequestInfo(kafka.api.OffsetRequest.LatestTime(), Integer.MAX_VALUE)), kafka.api.OffsetRequest.CurrentVersion(), simpleConsumer.clientId()));
        if (!offsetsBefore.hasError()) {
            return offsetsBefore.offsets(kafkaPartition.getTopicName(), kafkaPartition.getPartitionIdAsInt());
        }
        short errorCode = offsetsBefore.errorCode(kafkaPartition.getTopicName(), kafkaPartition.getPartitionIdAsInt());
        log.warn("Offset response has error: %d", new Object[]{Short.valueOf(errorCode)});
        throw new PrestoException(KafkaErrorCode.KAFKA_SPLIT_ERROR, "could not fetch data from Kafka, error code is '" + ((int) errorCode) + "'");
    }

    private static HostAddress brokerToHostAddress(Broker broker) {
        return HostAddress.fromParts(broker.host(), broker.port());
    }
}
