package org.springframework.integration.kafka.core;

import com.gs.collections.api.block.function.Function;
import com.gs.collections.api.block.function.Function2;
import com.gs.collections.api.map.MutableMap;
import com.gs.collections.api.tuple.Pair;
import com.gs.collections.impl.list.mutable.FastList;
import com.gs.collections.impl.tuple.Tuples;
import com.gs.collections.impl.utility.LazyIterate;
import com.gs.collections.impl.utility.MapIterate;
import java.util.Arrays;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.atomic.AtomicInteger;
import kafka.api.FetchRequestBuilder;
import kafka.api.PartitionOffsetRequestInfo;
import kafka.cluster.Broker;
import kafka.common.ErrorMapping;
import kafka.common.OffsetMetadataAndError;
import kafka.common.TopicAndPartition;
import kafka.javaapi.FetchResponse;
import kafka.javaapi.OffsetCommitRequest;
import kafka.javaapi.OffsetCommitResponse;
import kafka.javaapi.OffsetFetchRequest;
import kafka.javaapi.OffsetFetchResponse;
import kafka.javaapi.OffsetRequest;
import kafka.javaapi.OffsetResponse;
import kafka.javaapi.PartitionMetadata;
import kafka.javaapi.TopicMetadata;
import kafka.javaapi.TopicMetadataRequest;
import kafka.javaapi.TopicMetadataResponse;
import kafka.javaapi.consumer.SimpleConsumer;
import kafka.message.MessageAndOffset;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.util.Assert;

/* loaded from: input_file:org/springframework/integration/kafka/core/DefaultConnection.class */
public class DefaultConnection implements Connection {
    private static Log log = LogFactory.getLog(DefaultConnection.class);
    private final AtomicInteger correlationIdCounter = new AtomicInteger(new Random(new Date().getTime()).nextInt());
    private final SimpleConsumer simpleConsumer;
    private final BrokerAddress brokerAddress;
    private int minBytes;
    private int maxWait;

    /* loaded from: input_file:org/springframework/integration/kafka/core/DefaultConnection$ConvertToKafkaMessageFunction.class */
    private static class ConvertToKafkaMessageFunction implements Function<MessageAndOffset, KafkaMessage> {
        private final FetchRequest request;

        public ConvertToKafkaMessageFunction(FetchRequest fetchRequest) {
            this.request = fetchRequest;
        }

        public KafkaMessage valueOf(MessageAndOffset messageAndOffset) {
            return new KafkaMessage(messageAndOffset.message(), new KafkaMessageMetadata(this.request.getPartition(), messageAndOffset.offset(), messageAndOffset.nextOffset()));
        }
    }

    /* loaded from: input_file:org/springframework/integration/kafka/core/DefaultConnection$ConvertToTopicAndPartitionFunction.class */
    private static class ConvertToTopicAndPartitionFunction implements Function<Partition, TopicAndPartition> {
        private ConvertToTopicAndPartitionFunction() {
        }

        public TopicAndPartition valueOf(Partition partition) {
            return new TopicAndPartition(partition.getTopic(), partition.getId());
        }
    }

    /* loaded from: input_file:org/springframework/integration/kafka/core/DefaultConnection$CreateRequestInfoMapEntryFunction.class */
    private static class CreateRequestInfoMapEntryFunction implements Function2<Partition, Long, Pair<TopicAndPartition, OffsetMetadataAndError>> {
        private CreateRequestInfoMapEntryFunction() {
        }

        public Pair<TopicAndPartition, OffsetMetadataAndError> value(Partition partition, Long l) {
            return Tuples.pair(new TopicAndPartition(partition.getTopic(), partition.getId()), new OffsetMetadataAndError(l.longValue(), OffsetMetadataAndError.NoMetadata(), ErrorMapping.NoError()));
        }
    }

    public DefaultConnection(BrokerAddress brokerAddress, String str, int i, int i2, int i3, int i4) {
        this.brokerAddress = brokerAddress;
        this.minBytes = i3;
        this.maxWait = i4;
        this.simpleConsumer = new SimpleConsumer(brokerAddress.getHost(), brokerAddress.getPort(), i2, i, str);
    }

    @Override // org.springframework.integration.kafka.core.Connection
    public BrokerAddress getBrokerAddress() {
        return this.brokerAddress;
    }

    @Override // org.springframework.integration.kafka.core.Connection
    public void close() {
        this.simpleConsumer.close();
    }

    @Override // org.springframework.integration.kafka.core.Connection
    public Result<KafkaMessageBatch> fetch(FetchRequest... fetchRequestArr) throws ConsumerException {
        FetchRequestBuilder fetchRequestBuilder = new FetchRequestBuilder();
        for (FetchRequest fetchRequest : fetchRequestArr) {
            Partition partition = fetchRequest.getPartition();
            fetchRequestBuilder.addFetch(partition.getTopic(), partition.getId(), fetchRequest.getOffset(), fetchRequest.getMaxSizeInBytes());
        }
        try {
            FetchResponse fetch = this.simpleConsumer.fetch(fetchRequestBuilder.maxWait(this.maxWait).minBytes(this.minBytes).build());
            ResultBuilder resultBuilder = new ResultBuilder();
            for (FetchRequest fetchRequest2 : fetchRequestArr) {
                Partition partition2 = fetchRequest2.getPartition();
                if (log.isDebugEnabled()) {
                    log.debug("Reading from " + partition2 + "@" + fetchRequest2.getOffset());
                }
                short errorCode = fetch.errorCode(partition2.getTopic(), partition2.getId());
                if (ErrorMapping.NoError() == errorCode) {
                    resultBuilder.add(partition2).withResult(new KafkaMessageBatch(partition2, LazyIterate.collect(fetch.messageSet(partition2.getTopic(), partition2.getId()), new ConvertToKafkaMessageFunction(fetchRequest2)).toList(), fetch.highWatermark(partition2.getTopic(), partition2.getId())));
                } else {
                    resultBuilder.add(partition2).withError(errorCode);
                }
            }
            return resultBuilder.build();
        } catch (Exception e) {
            throw new ConsumerException(e);
        }
    }

    @Override // org.springframework.integration.kafka.core.Connection
    public Result<Long> fetchStoredOffsetsForConsumer(String str, Partition... partitionArr) throws ConsumerException {
        try {
            OffsetFetchResponse fetchOffsets = this.simpleConsumer.fetchOffsets(new OffsetFetchRequest(str, FastList.newList(Arrays.asList(partitionArr)).collect(new ConvertToTopicAndPartitionFunction()), kafka.api.OffsetFetchRequest.CurrentVersion(), createCorrelationId().intValue(), this.simpleConsumer.clientId()));
            ResultBuilder resultBuilder = new ResultBuilder();
            for (Partition partition : partitionArr) {
                OffsetMetadataAndError offsetMetadataAndError = (OffsetMetadataAndError) fetchOffsets.offsets().get(new TopicAndPartition(partition.getTopic(), partition.getId()));
                short error = offsetMetadataAndError.error();
                if (ErrorMapping.NoError() == error) {
                    resultBuilder.add(partition).withResult(Long.valueOf(offsetMetadataAndError.offset()));
                } else {
                    resultBuilder.add(partition).withError(error);
                }
            }
            return resultBuilder.build();
        } catch (Exception e) {
            throw new ConsumerException(e);
        }
    }

    @Override // org.springframework.integration.kafka.core.Connection
    public Result<Long> fetchInitialOffset(long j, Partition... partitionArr) throws ConsumerException {
        Assert.isTrue(partitionArr.length > 0, "Must provide at least one partition");
        HashMap hashMap = new HashMap();
        for (Partition partition : partitionArr) {
            hashMap.put(new TopicAndPartition(partition.getTopic(), partition.getId()), new PartitionOffsetRequestInfo(j, 1));
        }
        try {
            OffsetResponse offsetsBefore = this.simpleConsumer.getOffsetsBefore(new OffsetRequest(hashMap, kafka.api.OffsetRequest.CurrentVersion(), this.simpleConsumer.clientId()));
            ResultBuilder resultBuilder = new ResultBuilder();
            for (Partition partition2 : partitionArr) {
                short errorCode = offsetsBefore.errorCode(partition2.getTopic(), partition2.getId());
                if (ErrorMapping.NoError() == errorCode) {
                    long[] offsets = offsetsBefore.offsets(partition2.getTopic(), partition2.getId());
                    if (offsets.length == 0) {
                        throw new ConsumerException("Inconsistent response: no error has been returned, but no offsets either");
                    }
                    resultBuilder.add(partition2).withResult(Long.valueOf(offsets[0]));
                } else {
                    resultBuilder.add(partition2).withError(errorCode);
                }
            }
            return resultBuilder.build();
        } catch (Exception e) {
            throw new ConsumerException(e);
        }
    }

    @Override // org.springframework.integration.kafka.core.Connection
    public Result<Void> commitOffsetsForConsumer(String str, Map<Partition, Long> map) throws ConsumerException {
        MutableMap collect = MapIterate.collect(map, new CreateRequestInfoMapEntryFunction());
        try {
            OffsetCommitResponse commitOffsets = this.simpleConsumer.commitOffsets(new OffsetCommitRequest(str, collect, kafka.api.OffsetCommitRequest.CurrentVersion(), createCorrelationId().intValue(), this.simpleConsumer.clientId()));
            ResultBuilder resultBuilder = new ResultBuilder();
            for (TopicAndPartition topicAndPartition : collect.keySet()) {
                if (commitOffsets.errors().containsKey(topicAndPartition)) {
                    resultBuilder.add(new Partition(topicAndPartition.topic(), topicAndPartition.partition())).withError(((Short) commitOffsets.errors().get(topicAndPartition)).shortValue());
                }
            }
            return resultBuilder.build();
        } catch (Exception e) {
            throw new ConsumerException(e);
        }
    }

    @Override // org.springframework.integration.kafka.core.Connection
    public Result<BrokerAddress> findLeaders(String... strArr) throws ConsumerException {
        try {
            TopicMetadataResponse send = this.simpleConsumer.send(new TopicMetadataRequest(Arrays.asList(strArr), createCorrelationId().intValue()));
            ResultBuilder resultBuilder = new ResultBuilder();
            for (TopicMetadata topicMetadata : send.topicsMetadata()) {
                if (topicMetadata.errorCode() != ErrorMapping.NoError()) {
                    resultBuilder.add(new Partition(topicMetadata.topic(), -1)).withError(topicMetadata.errorCode());
                } else {
                    for (PartitionMetadata partitionMetadata : topicMetadata.partitionsMetadata()) {
                        Partition partition = new Partition(topicMetadata.topic(), partitionMetadata.partitionId());
                        if (ErrorMapping.NoError() == partitionMetadata.errorCode()) {
                            Broker leader = partitionMetadata.leader();
                            resultBuilder.add(partition).withResult(new BrokerAddress(leader.host(), leader.port()));
                        } else {
                            resultBuilder.add(partition).withError(partitionMetadata.errorCode());
                        }
                    }
                }
            }
            return resultBuilder.build();
        } catch (Exception e) {
            throw new ConsumerException(e);
        }
    }

    private Integer createCorrelationId() {
        return Integer.valueOf(this.correlationIdCounter.incrementAndGet());
    }
}
