package cn.leancloud.kafka.consumer;

import java.io.Closeable;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.WakeupException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:cn/leancloud/kafka/consumer/Fetcher.class */
public final class Fetcher<K, V> implements Runnable, Closeable {
    private static final Logger logger;
    private final long pollTimeout;
    private final Consumer<K, V> consumer;
    private final ConsumerRecordHandler<K, V> handler;
    private final ExecutorCompletionService<ConsumerRecord<K, V>> service;
    private final Map<ConsumerRecord<K, V>, Future<ConsumerRecord<K, V>>> pendingFutures;
    private final CommitPolicy<K, V> policy;
    private final long gracefulShutdownMillis;
    private volatile boolean closed;
    static final /* synthetic */ boolean $assertionsDisabled;

    /* JADX INFO: Access modifiers changed from: package-private */
    public Fetcher(LcKafkaConsumerBuilder<K, V> lcKafkaConsumerBuilder) {
        this(lcKafkaConsumerBuilder.getConsumer(), lcKafkaConsumerBuilder.getPollTimeout(), lcKafkaConsumerBuilder.getConsumerRecordHandler(), lcKafkaConsumerBuilder.getWorkerPool(), lcKafkaConsumerBuilder.getPolicy(), lcKafkaConsumerBuilder.gracefulShutdownMillis());
    }

    Fetcher(Consumer<K, V> consumer, long j, ConsumerRecordHandler<K, V> consumerRecordHandler, ExecutorService executorService, CommitPolicy<K, V> commitPolicy, long j2) {
        this.pendingFutures = new HashMap();
        this.consumer = consumer;
        this.pollTimeout = j;
        this.handler = consumerRecordHandler;
        this.service = new ExecutorCompletionService<>(executorService);
        this.policy = commitPolicy;
        this.gracefulShutdownMillis = j2;
    }

    @Override // java.lang.Runnable
    public void run() {
        logger.debug("Fetcher thread started.");
        long j = this.pollTimeout;
        Consumer<K, V> consumer = this.consumer;
        while (true) {
            try {
                ConsumerRecords<K, V> poll = consumer.poll(j);
                if (logger.isDebugEnabled()) {
                    logger.debug("Fetched " + poll.count() + " records from: " + poll.partitions());
                }
                dispatchFetchedRecords(poll);
                processCompletedRecords();
                if (!this.pendingFutures.isEmpty() && !poll.isEmpty()) {
                    consumer.pause(poll.partitions());
                }
                tryCommitRecordOffsets();
            } catch (WakeupException e) {
                if (closed()) {
                    break;
                }
            } catch (Exception e2) {
                close();
                logger.error("Fetcher quit with unexpected exception. Will rebalance after poll timeout.", e2);
            }
        }
        gracefulShutdown();
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        this.closed = true;
        this.consumer.wakeup();
    }

    Map<ConsumerRecord<K, V>, Future<ConsumerRecord<K, V>>> pendingFutures() {
        return this.pendingFutures;
    }

    private boolean closed() {
        return this.closed;
    }

    private void dispatchFetchedRecords(ConsumerRecords<K, V> consumerRecords) {
        ConsumerRecordHandler<K, V> consumerRecordHandler = this.handler;
        Iterator it = consumerRecords.iterator();
        while (it.hasNext()) {
            ConsumerRecord<K, V> consumerRecord = (ConsumerRecord) it.next();
            this.pendingFutures.put(consumerRecord, this.service.submit(() -> {
                consumerRecordHandler.handleRecord(consumerRecord);
                return consumerRecord;
            }));
            this.policy.addPendingRecord(consumerRecord);
        }
    }

    private void processCompletedRecords() throws InterruptedException, ExecutionException {
        while (true) {
            Future<ConsumerRecord<K, V>> poll = this.service.poll();
            if (poll == null) {
                return;
            }
            if (!$assertionsDisabled && !poll.isDone()) {
                throw new AssertionError();
            }
            ConsumerRecord<K, V> consumerRecord = poll.get();
            Future<ConsumerRecord<K, V>> remove = this.pendingFutures.remove(consumerRecord);
            if (!$assertionsDisabled && remove == null) {
                throw new AssertionError();
            }
            this.policy.completeRecord(consumerRecord);
        }
    }

    private void tryCommitRecordOffsets() {
        Set<TopicPartition> tryCommit = this.policy.tryCommit(this.pendingFutures.isEmpty());
        if (tryCommit.isEmpty()) {
            return;
        }
        try {
            this.consumer.resume(tryCommit);
        } catch (IllegalStateException e) {
            tryCommit.retainAll(this.consumer.assignment());
            this.consumer.resume(tryCommit);
        }
    }

    private void gracefulShutdown() {
        long currentTimeMillis = System.currentTimeMillis();
        long j = this.gracefulShutdownMillis;
        try {
            for (Future<ConsumerRecord<K, V>> future : this.pendingFutures.values()) {
                if (j > 0) {
                    try {
                        future.get(j, TimeUnit.MILLISECONDS);
                        j = this.gracefulShutdownMillis - (System.currentTimeMillis() - currentTimeMillis);
                    } catch (TimeoutException e) {
                        j = 0;
                    }
                } else {
                    future.cancel(false);
                }
            }
            processCompletedRecords();
        } catch (InterruptedException e2) {
            logger.warn("Graceful shutdown was interrupted.");
        } catch (ExecutionException e3) {
            logger.error("Handle message got unexpected exception. Continue shutdown without wait handling message done.", e3);
        }
        this.policy.partialCommit();
        this.pendingFutures.clear();
        logger.debug("Fetcher thread exit.");
    }

    static {
        $assertionsDisabled = !Fetcher.class.desiredAssertionStatus();
        logger = LoggerFactory.getLogger(Fetcher.class);
    }
}
