package com.facebook.presto.kafka;

import com.facebook.presto.spi.ConnectorSession;
import com.facebook.presto.spi.ConnectorSplitSource;
import com.facebook.presto.spi.ConnectorTableLayoutHandle;
import com.facebook.presto.spi.FixedSplitSource;
import com.facebook.presto.spi.HostAddress;
import com.facebook.presto.spi.PrestoException;
import com.facebook.presto.spi.StandardErrorCode;
import com.facebook.presto.spi.connector.ConnectorSplitManager;
import com.facebook.presto.spi.connector.ConnectorTransactionHandle;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.io.CharStreams;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.net.MalformedURLException;
import java.net.URI;
import java.net.URL;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import javax.inject.Inject;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;

/* loaded from: input_file:com/facebook/presto/kafka/KafkaSplitManager.class */
public class KafkaSplitManager implements ConnectorSplitManager {
    private final String connectorId;
    private final KafkaConsumerManager consumerManager;
    private final KafkaStaticServerset servers;

    @Inject
    public KafkaSplitManager(KafkaConnectorId kafkaConnectorId, KafkaConnectorConfig kafkaConnectorConfig, KafkaStaticServerset kafkaStaticServerset, KafkaConsumerManager kafkaConsumerManager) {
        this.connectorId = ((KafkaConnectorId) Objects.requireNonNull(kafkaConnectorId, "connectorId is null")).toString();
        this.consumerManager = (KafkaConsumerManager) Objects.requireNonNull(kafkaConsumerManager, "consumerManager is null");
        Objects.requireNonNull(kafkaConnectorConfig, "kafkaConfig is null");
        this.servers = kafkaStaticServerset;
    }

    public ConnectorSplitSource getSplits(ConnectorTransactionHandle connectorTransactionHandle, ConnectorSession connectorSession, ConnectorTableLayoutHandle connectorTableLayoutHandle, ConnectorSplitManager.SplitSchedulingContext splitSchedulingContext) {
        KafkaTableHandle table = KafkaHandleResolver.convertLayout(connectorTableLayoutHandle).getTable();
        try {
            HostAddress selectRandomServer = this.servers.selectRandomServer();
            String topicName = table.getTopicName();
            KafkaTableLayoutHandle kafkaTableLayoutHandle = (KafkaTableLayoutHandle) connectorTableLayoutHandle;
            KafkaConsumer<ByteBuffer, ByteBuffer> createConsumer = this.consumerManager.createConsumer(Thread.currentThread().getName(), selectRandomServer);
            List<PartitionInfo> partitionsFor = createConsumer.partitionsFor(topicName);
            ImmutableList.Builder builder = ImmutableList.builder();
            for (PartitionInfo partitionInfo : partitionsFor) {
                Node leader = partitionInfo.leader();
                if (leader == null) {
                    throw new PrestoException(StandardErrorCode.GENERIC_INTERNAL_ERROR, String.format("Leader election in progress for Kafka topic '%s' partition %s", topicName, Integer.valueOf(partitionInfo.partition())));
                }
                HostAddress fromParts = HostAddress.fromParts(leader.host(), leader.port());
                long startOffsetTimestamp = kafkaTableLayoutHandle.getStartOffsetTimestamp();
                long endOffsetTimestamp = kafkaTableLayoutHandle.getEndOffsetTimestamp();
                if (startOffsetTimestamp > endOffsetTimestamp) {
                    throw new IllegalArgumentException(String.format("Invalid Kafka Offset start/end pair: %s - %s", Long.valueOf(startOffsetTimestamp), Long.valueOf(endOffsetTimestamp)));
                }
                TopicPartition topicPartition = new TopicPartition(partitionInfo.topic(), partitionInfo.partition());
                createConsumer.assign(ImmutableList.of(topicPartition));
                builder.add(new KafkaSplit(this.connectorId, topicName, table.getKeyDataFormat(), table.getMessageDataFormat(), table.getKeyDataSchemaLocation().map(KafkaSplitManager::readSchema), table.getMessageDataSchemaLocation().map(KafkaSplitManager::readSchema), partitionInfo.partition(), startOffsetTimestamp == 0 ? ((Long) createConsumer.beginningOffsets(ImmutableList.of(topicPartition)).values().iterator().next()).longValue() : findOffsetsByTimestamp(createConsumer, topicPartition, startOffsetTimestamp), endOffsetTimestamp == 0 ? ((Long) createConsumer.endOffsets(ImmutableList.of(topicPartition)).values().iterator().next()).longValue() : findOffsetsByTimestamp(createConsumer, topicPartition, endOffsetTimestamp), fromParts));
            }
            return new FixedSplitSource(builder.build());
        } catch (Exception e) {
            if (e instanceof PrestoException) {
                throw e;
            }
            throw new PrestoException(KafkaErrorCode.KAFKA_SPLIT_ERROR, String.format("Cannot list splits for table '%s' reading topic '%s'", table.getTableName(), table.getTopicName()), e);
        }
    }

    private static long findOffsetsByTimestamp(KafkaConsumer<ByteBuffer, ByteBuffer> kafkaConsumer, TopicPartition topicPartition, long j) {
        try {
            Map offsetsForTimes = kafkaConsumer.offsetsForTimes(ImmutableMap.of(topicPartition, Long.valueOf(j)));
            if (offsetsForTimes == null || offsetsForTimes.values().size() == 0) {
                return 0L;
            }
            return ((OffsetAndTimestamp) offsetsForTimes.values().iterator().next()).offset();
        } catch (IllegalArgumentException e) {
            throw new PrestoException(KafkaErrorCode.KAFKA_CONSUMER_ERROR, String.format("Failed to find offset by timestamp: %d for partition %d", Long.valueOf(j), Integer.valueOf(topicPartition.partition())), e);
        }
    }

    private static String readSchema(String str) {
        InputStream inputStream = null;
        try {
            try {
                if (isURI(str.trim().toLowerCase(Locale.ENGLISH))) {
                    try {
                        inputStream = new URL(str).openStream();
                    } catch (MalformedURLException e) {
                        inputStream = new FileInputStream(str);
                    }
                } else {
                    inputStream = new FileInputStream(str);
                }
                String charStreams = CharStreams.toString(new InputStreamReader(inputStream, StandardCharsets.UTF_8));
                closeQuietly(inputStream);
                return charStreams;
            } catch (IOException e2) {
                throw new PrestoException(StandardErrorCode.GENERIC_INTERNAL_ERROR, "Could not parse the Avro schema at: " + str, e2);
            }
        } catch (Throwable th) {
            closeQuietly(inputStream);
            throw th;
        }
    }

    private static void closeQuietly(InputStream inputStream) {
        if (inputStream != null) {
            try {
                inputStream.close();
            } catch (IOException e) {
            }
        }
    }

    private static boolean isURI(String str) {
        try {
            URI.create(str);
            return true;
        } catch (Exception e) {
            return false;
        }
    }
}
