package org.springframework.integration.kafka.inbound;

import java.lang.reflect.Type;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.WakeupException;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.context.Lifecycle;
import org.springframework.integration.endpoint.AbstractMessageSource;
import org.springframework.integration.support.AbstractIntegrationMessageBuilder;
import org.springframework.integration.support.AcknowledgmentCallback;
import org.springframework.integration.support.AcknowledgmentCallbackFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.kafka.support.converter.KafkaMessageHeaders;
import org.springframework.kafka.support.converter.MessagingMessageConverter;
import org.springframework.kafka.support.converter.RecordMessageConverter;
import org.springframework.messaging.Message;
import org.springframework.util.Assert;

/* loaded from: input_file:org/springframework/integration/kafka/inbound/KafkaMessageSource.class */
public class KafkaMessageSource<K, V> extends AbstractMessageSource<Object> implements DisposableBean, Lifecycle {
    private static final long DEFAULT_POLL_TIMEOUT = 50;
    private final Log logger;
    private final ConsumerFactory<K, V> consumerFactory;
    private final KafkaAckCallbackFactory<K, V> ackCallbackFactory;
    private final String[] topics;
    private final Object consumerMonitor;
    private final Map<TopicPartition, Set<KafkaAckInfo<K, V>>> inflightRecords;
    private String groupId;
    private String clientId;
    private long pollTimeout;
    private RecordMessageConverter messageConverter;
    private Type payloadType;
    private ConsumerRebalanceListener rebalanceListener;
    private boolean rawMessageHeader;
    private volatile Consumer<K, V> consumer;
    private boolean running;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: org.springframework.integration.kafka.inbound.KafkaMessageSource$2, reason: invalid class name */
    /* loaded from: input_file:org/springframework/integration/kafka/inbound/KafkaMessageSource$2.class */
    public static /* synthetic */ class AnonymousClass2 {
        static final /* synthetic */ int[] $SwitchMap$org$springframework$integration$support$AcknowledgmentCallback$Status = new int[AcknowledgmentCallback.Status.values().length];

        static {
            try {
                $SwitchMap$org$springframework$integration$support$AcknowledgmentCallback$Status[AcknowledgmentCallback.Status.ACCEPT.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$springframework$integration$support$AcknowledgmentCallback$Status[AcknowledgmentCallback.Status.REJECT.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$org$springframework$integration$support$AcknowledgmentCallback$Status[AcknowledgmentCallback.Status.REQUEUE.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
        }
    }

    /* loaded from: input_file:org/springframework/integration/kafka/inbound/KafkaMessageSource$KafkaAckCallback.class */
    public static class KafkaAckCallback<K, V> implements AcknowledgmentCallback, Acknowledgment {
        private final KafkaAckInfo<K, V> ackInfo;
        private volatile boolean acknowledged;
        private final Log logger = LogFactory.getLog(getClass());
        private boolean autoAckEnabled = true;

        public KafkaAckCallback(KafkaAckInfo<K, V> kafkaAckInfo) {
            Assert.notNull(kafkaAckInfo, "'ackInfo' cannot be null");
            this.ackInfo = kafkaAckInfo;
        }

        /* JADX WARN: Finally extract failed */
        public void acknowledge(AcknowledgmentCallback.Status status) {
            Assert.notNull(status, "'status' cannot be null");
            if (this.acknowledged) {
                throw new IllegalStateException("Already acknowledged");
            }
            synchronized (this.ackInfo.getConsumerMonitor()) {
                try {
                    try {
                        ConsumerRecord<K, V> record = this.ackInfo.getRecord();
                        switch (AnonymousClass2.$SwitchMap$org$springframework$integration$support$AcknowledgmentCallback$Status[status.ordinal()]) {
                            case 1:
                            case 2:
                                commitIfPossible(record);
                                break;
                            case 3:
                                rollback(record);
                                break;
                        }
                        this.acknowledged = true;
                        if (!this.ackInfo.isAckDeferred()) {
                            this.ackInfo.getOffsets().get(this.ackInfo.getTopicPartition()).remove(this.ackInfo);
                        }
                    } catch (WakeupException e) {
                        throw new IllegalStateException((Throwable) e);
                    }
                } catch (Throwable th) {
                    this.acknowledged = true;
                    if (!this.ackInfo.isAckDeferred()) {
                        this.ackInfo.getOffsets().get(this.ackInfo.getTopicPartition()).remove(this.ackInfo);
                    }
                    throw th;
                }
            }
        }

        private void rollback(ConsumerRecord<K, V> consumerRecord) {
            this.ackInfo.getConsumer().seek(this.ackInfo.getTopicPartition(), consumerRecord.offset());
            Set<KafkaAckInfo<K, V>> set = this.ackInfo.getOffsets().get(this.ackInfo.getTopicPartition());
            synchronized (set) {
                if (set.size() > 1) {
                    List list = (List) set.stream().filter(kafkaAckInfo -> {
                        return kafkaAckInfo.getRecord().offset() > consumerRecord.offset();
                    }).map(kafkaAckInfo2 -> {
                        kafkaAckInfo2.setRolledBack(true);
                        return Long.valueOf(kafkaAckInfo2.getRecord().offset());
                    }).collect(Collectors.toList());
                    if (list.size() > 0 && this.logger.isWarnEnabled()) {
                        this.logger.warn("Rolled back " + consumerRecord + " later in-flight offsets " + list + " will also be re-fetched");
                    }
                }
            }
        }

        private void commitIfPossible(ConsumerRecord<K, V> consumerRecord) {
            if (this.ackInfo.isRolledBack()) {
                if (this.logger.isWarnEnabled()) {
                    this.logger.warn("Cannot commit offset for " + consumerRecord + "; an earlier offset was rolled back");
                    return;
                }
                return;
            }
            Set<KafkaAckInfo<K, V>> set = this.ackInfo.getOffsets().get(this.ackInfo.getTopicPartition());
            KafkaAckInfo<K, V> kafkaAckInfo = null;
            synchronized (set) {
                if (((KafkaAckInfo) set.iterator().next()).equals(this.ackInfo)) {
                    ArrayList arrayList = new ArrayList();
                    for (KafkaAckInfo<K, V> kafkaAckInfo2 : set) {
                        if (kafkaAckInfo2 != this.ackInfo) {
                            if (!kafkaAckInfo2.isAckDeferred()) {
                                break;
                            } else {
                                arrayList.add(kafkaAckInfo2);
                            }
                        }
                    }
                    if (arrayList.size() > 0) {
                        kafkaAckInfo = (KafkaAckInfo) arrayList.get(arrayList.size() - 1);
                        if (this.logger.isDebugEnabled()) {
                            this.logger.debug("Committing pending offsets for " + consumerRecord + " and all deferred to " + kafkaAckInfo.getRecord());
                        }
                        set.removeAll(arrayList);
                    } else {
                        kafkaAckInfo = this.ackInfo;
                    }
                } else {
                    this.ackInfo.setAckDeferred(true);
                }
                if (kafkaAckInfo != null) {
                    kafkaAckInfo.getConsumer().commitSync(Collections.singletonMap(kafkaAckInfo.getTopicPartition(), new OffsetAndMetadata(kafkaAckInfo.getRecord().offset() + 1)));
                } else if (this.logger.isDebugEnabled()) {
                    this.logger.debug("Deferring commit offset; earlier messages are in flight.");
                }
            }
        }

        public boolean isAcknowledged() {
            return this.acknowledged;
        }

        public void acknowledge() {
            acknowledge(AcknowledgmentCallback.Status.ACCEPT);
        }

        public void noAutoAck() {
            this.autoAckEnabled = false;
        }

        public boolean isAutoAck() {
            return this.autoAckEnabled;
        }
    }

    /* loaded from: input_file:org/springframework/integration/kafka/inbound/KafkaMessageSource$KafkaAckCallbackFactory.class */
    public static class KafkaAckCallbackFactory<K, V> implements AcknowledgmentCallbackFactory<KafkaAckInfo<K, V>> {
        public AcknowledgmentCallback createCallback(KafkaAckInfo<K, V> kafkaAckInfo) {
            return new KafkaAckCallback(kafkaAckInfo);
        }
    }

    /* loaded from: input_file:org/springframework/integration/kafka/inbound/KafkaMessageSource$KafkaAckInfo.class */
    public interface KafkaAckInfo<K, V> extends Comparable<KafkaAckInfo<K, V>> {
        Object getConsumerMonitor();

        String getGroupId();

        Consumer<K, V> getConsumer();

        ConsumerRecord<K, V> getRecord();

        TopicPartition getTopicPartition();

        Map<TopicPartition, Set<KafkaAckInfo<K, V>>> getOffsets();

        boolean isRolledBack();

        void setRolledBack(boolean z);

        boolean isAckDeferred();

        void setAckDeferred(boolean z);
    }

    /* loaded from: input_file:org/springframework/integration/kafka/inbound/KafkaMessageSource$KafkaAckInfoImpl.class */
    public class KafkaAckInfoImpl implements KafkaAckInfo<K, V> {
        private final ConsumerRecord<K, V> record;
        private final TopicPartition topicPartition;
        private volatile boolean rolledBack;
        private volatile boolean ackDeferred;

        KafkaAckInfoImpl(ConsumerRecord<K, V> consumerRecord, TopicPartition topicPartition) {
            this.record = consumerRecord;
            this.topicPartition = topicPartition;
        }

        @Override // org.springframework.integration.kafka.inbound.KafkaMessageSource.KafkaAckInfo
        public Object getConsumerMonitor() {
            return KafkaMessageSource.this.consumerMonitor;
        }

        @Override // org.springframework.integration.kafka.inbound.KafkaMessageSource.KafkaAckInfo
        public String getGroupId() {
            return KafkaMessageSource.this.groupId;
        }

        @Override // org.springframework.integration.kafka.inbound.KafkaMessageSource.KafkaAckInfo
        public Consumer<K, V> getConsumer() {
            return KafkaMessageSource.this.consumer;
        }

        @Override // org.springframework.integration.kafka.inbound.KafkaMessageSource.KafkaAckInfo
        public ConsumerRecord<K, V> getRecord() {
            return this.record;
        }

        @Override // org.springframework.integration.kafka.inbound.KafkaMessageSource.KafkaAckInfo
        public TopicPartition getTopicPartition() {
            return this.topicPartition;
        }

        @Override // org.springframework.integration.kafka.inbound.KafkaMessageSource.KafkaAckInfo
        public Map<TopicPartition, Set<KafkaAckInfo<K, V>>> getOffsets() {
            return KafkaMessageSource.this.inflightRecords;
        }

        @Override // org.springframework.integration.kafka.inbound.KafkaMessageSource.KafkaAckInfo
        public boolean isRolledBack() {
            return this.rolledBack;
        }

        @Override // org.springframework.integration.kafka.inbound.KafkaMessageSource.KafkaAckInfo
        public void setRolledBack(boolean z) {
            this.rolledBack = z;
        }

        @Override // org.springframework.integration.kafka.inbound.KafkaMessageSource.KafkaAckInfo
        public boolean isAckDeferred() {
            return this.ackDeferred;
        }

        @Override // org.springframework.integration.kafka.inbound.KafkaMessageSource.KafkaAckInfo
        public void setAckDeferred(boolean z) {
            this.ackDeferred = z;
        }

        @Override // java.lang.Comparable
        public int compareTo(KafkaAckInfo<K, V> kafkaAckInfo) {
            return Long.compare(this.record.offset(), kafkaAckInfo.getRecord().offset());
        }

        public String toString() {
            return "KafkaAckInfo [record=" + this.record + ", rolledBack=" + this.rolledBack + ", ackDeferred=" + this.ackDeferred + "]";
        }
    }

    public KafkaMessageSource(ConsumerFactory<K, V> consumerFactory, String... strArr) {
        this(consumerFactory, new KafkaAckCallbackFactory(), strArr);
    }

    public KafkaMessageSource(ConsumerFactory<K, V> consumerFactory, KafkaAckCallbackFactory<K, V> kafkaAckCallbackFactory, String... strArr) {
        this.logger = LogFactory.getLog(getClass());
        this.consumerMonitor = new Object();
        this.inflightRecords = new ConcurrentHashMap();
        this.clientId = "message.source";
        this.pollTimeout = DEFAULT_POLL_TIMEOUT;
        this.messageConverter = new MessagingMessageConverter();
        Assert.notNull(consumerFactory, "'consumerFactory' must not be null");
        Assert.notNull(kafkaAckCallbackFactory, "'ackCallbackFactory' must not be null");
        this.consumerFactory = fixOrRejectConsumerFactory(consumerFactory);
        this.ackCallbackFactory = kafkaAckCallbackFactory;
        this.topics = strArr;
    }

    protected String getGroupId() {
        return this.groupId;
    }

    public void setGroupId(String str) {
        this.groupId = str;
    }

    protected String getClientId() {
        return this.clientId;
    }

    public void setClientId(String str) {
        this.clientId = str;
    }

    protected long getPollTimeout() {
        return this.pollTimeout;
    }

    public void setPollTimeout(long j) {
        this.pollTimeout = j;
    }

    protected RecordMessageConverter getMessageConverter() {
        return this.messageConverter;
    }

    public void setMessageConverter(RecordMessageConverter recordMessageConverter) {
        this.messageConverter = recordMessageConverter;
    }

    protected Type getPayloadType() {
        return this.payloadType;
    }

    public void setPayloadType(Type type) {
        this.payloadType = type;
    }

    protected ConsumerRebalanceListener getRebalanceListener() {
        return this.rebalanceListener;
    }

    public void setRebalanceListener(ConsumerRebalanceListener consumerRebalanceListener) {
        this.rebalanceListener = consumerRebalanceListener;
    }

    public String getComponentType() {
        return "kafka:message-source";
    }

    protected boolean isRawMessageHeader() {
        return this.rawMessageHeader;
    }

    public void setRawMessageHeader(boolean z) {
        this.rawMessageHeader = z;
    }

    private ConsumerFactory<K, V> fixOrRejectConsumerFactory(ConsumerFactory<K, V> consumerFactory) {
        Object obj = consumerFactory.getConfigurationProperties().get("max.poll.records");
        if (obj != null && ((!(obj instanceof Number) || ((Number) obj).intValue() == 1) && (!(obj instanceof String) || Integer.parseInt((String) obj) == 1))) {
            return consumerFactory;
        }
        if (!consumerFactory.getClass().getName().equals(DefaultKafkaConsumerFactory.class.getName())) {
            throw new IllegalArgumentException("Custom consumer factory is not configured with 'max.poll.records = 1'");
        }
        if (this.logger.isWarnEnabled()) {
            this.logger.warn("'max.poll.records' has been forced from " + (obj == null ? "unspecified" : obj) + " to 1, to avoid having to seek after each record");
        }
        HashMap hashMap = new HashMap(consumerFactory.getConfigurationProperties());
        hashMap.put("max.poll.records", 1);
        DefaultKafkaConsumerFactory defaultKafkaConsumerFactory = new DefaultKafkaConsumerFactory(hashMap);
        if (consumerFactory.getKeyDeserializer() != null) {
            defaultKafkaConsumerFactory.setKeyDeserializer(consumerFactory.getKeyDeserializer());
        }
        if (consumerFactory.getValueDeserializer() != null) {
            defaultKafkaConsumerFactory.setValueDeserializer(consumerFactory.getValueDeserializer());
        }
        return defaultKafkaConsumerFactory;
    }

    public synchronized boolean isRunning() {
        return this.running;
    }

    public synchronized void start() {
        this.running = true;
    }

    public synchronized void stop() {
        stopConsumer();
        this.running = false;
    }

    protected synchronized Object doReceive() {
        if (this.consumer == null) {
            createConsumer();
            this.running = true;
        }
        synchronized (this.consumerMonitor) {
            ConsumerRecords poll = this.consumer.poll(this.pollTimeout);
            if (poll == null || poll.count() == 0) {
                return null;
            }
            ConsumerRecord consumerRecord = (ConsumerRecord) poll.iterator().next();
            TopicPartition topicPartition = new TopicPartition(consumerRecord.topic(), consumerRecord.partition());
            KafkaAckInfoImpl kafkaAckInfoImpl = new KafkaAckInfoImpl(consumerRecord, topicPartition);
            Acknowledgment createCallback = this.ackCallbackFactory.createCallback((KafkaAckInfo) kafkaAckInfoImpl);
            this.inflightRecords.computeIfAbsent(topicPartition, topicPartition2 -> {
                return Collections.synchronizedSet(new TreeSet());
            }).add(kafkaAckInfoImpl);
            Message message = this.messageConverter.toMessage(consumerRecord, createCallback instanceof Acknowledgment ? createCallback : null, this.consumer, this.payloadType);
            if (!(message.getHeaders() instanceof KafkaMessageHeaders)) {
                AbstractIntegrationMessageBuilder header = getMessageBuilderFactory().fromMessage(message).setHeader("acknowledgmentCallback", createCallback);
                if (this.rawMessageHeader) {
                    header.setHeader("kafka_data", consumerRecord);
                }
                return header;
            }
            Map rawHeaders = message.getHeaders().getRawHeaders();
            rawHeaders.put("acknowledgmentCallback", createCallback);
            if (this.rawMessageHeader) {
                rawHeaders.put("kafka_data", consumerRecord);
            }
            return message;
        }
    }

    protected void createConsumer() {
        synchronized (this.consumerMonitor) {
            this.consumer = this.consumerFactory.createConsumer(this.groupId, this.clientId, (String) null);
            this.consumer.subscribe(Arrays.asList(this.topics), new ConsumerRebalanceListener() { // from class: org.springframework.integration.kafka.inbound.KafkaMessageSource.1
                public void onPartitionsRevoked(Collection<TopicPartition> collection) {
                    if (KafkaMessageSource.this.logger.isInfoEnabled()) {
                        KafkaMessageSource.this.logger.info("Partitions revoked: " + collection);
                    }
                    if (KafkaMessageSource.this.rebalanceListener != null) {
                        KafkaMessageSource.this.rebalanceListener.onPartitionsRevoked(collection);
                    }
                }

                public void onPartitionsAssigned(Collection<TopicPartition> collection) {
                    if (KafkaMessageSource.this.logger.isInfoEnabled()) {
                        KafkaMessageSource.this.logger.info("Partitions assigned: " + collection);
                    }
                    if (KafkaMessageSource.this.rebalanceListener != null) {
                        KafkaMessageSource.this.rebalanceListener.onPartitionsAssigned(collection);
                    }
                }
            });
        }
    }

    public synchronized void destroy() {
        stopConsumer();
    }

    private void stopConsumer() {
        synchronized (this.consumerMonitor) {
            if (this.consumer != null) {
                this.consumer.close(30L, TimeUnit.SECONDS);
                this.consumer = null;
            }
        }
    }
}
