package cn.leancloud.kafka.consumer;

import java.io.Closeable;
import java.time.Duration;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.utils.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:cn/leancloud/kafka/consumer/Fetcher.class */
public final class Fetcher<K, V> implements Runnable, Closeable {
    private static final Logger logger;
    private final long pollTimeoutMillis;
    private final Consumer<K, V> consumer;
    private final ConsumerRecordHandler<K, V> handler;
    private final ExecutorCompletionService<ConsumerRecord<K, V>> service;
    private final Map<ConsumerRecord<K, V>, Future<ConsumerRecord<K, V>>> pendingFutures;
    private final CommitPolicy<K, V> policy;
    private final long gracefulShutdownTimeoutNanos;
    private final CompletableFuture<UnsubscribedStatus> unsubscribeStatusFuture;
    private final long handleRecordTimeoutNanos;
    private volatile boolean closed;
    static final /* synthetic */ boolean $assertionsDisabled;

    /* JADX INFO: Access modifiers changed from: package-private */
    @VisibleForTesting
    /* loaded from: input_file:cn/leancloud/kafka/consumer/Fetcher$TimeoutFuture.class */
    public static class TimeoutFuture<K, V> implements Future<ConsumerRecord<K, V>> {
        private final Future<ConsumerRecord<K, V>> wrappedFuture;
        private final long timeoutAtNanos;
        private final Time time;
        static final /* synthetic */ boolean $assertionsDisabled;

        TimeoutFuture(Future<ConsumerRecord<K, V>> future) {
            this(future, Long.MAX_VALUE);
        }

        TimeoutFuture(Future<ConsumerRecord<K, V>> future, long j) {
            this(future, j, Time.SYSTEM);
        }

        TimeoutFuture(Future<ConsumerRecord<K, V>> future, long j, Time time) {
            if (!$assertionsDisabled && j < 0) {
                throw new AssertionError();
            }
            this.wrappedFuture = future;
            long nanoseconds = time.nanoseconds() + j;
            this.timeoutAtNanos = nanoseconds < 0 ? Long.MAX_VALUE : nanoseconds;
            this.time = time;
        }

        @Override // java.util.concurrent.Future
        public boolean cancel(boolean z) {
            return this.wrappedFuture.cancel(z);
        }

        @Override // java.util.concurrent.Future
        public boolean isCancelled() {
            return this.wrappedFuture.isCancelled();
        }

        @Override // java.util.concurrent.Future
        public boolean isDone() {
            return this.wrappedFuture.isDone();
        }

        @Override // java.util.concurrent.Future
        public ConsumerRecord<K, V> get() throws InterruptedException, ExecutionException {
            return this.wrappedFuture.get();
        }

        @Override // java.util.concurrent.Future
        public ConsumerRecord<K, V> get(long j, TimeUnit timeUnit) throws InterruptedException, ExecutionException, TimeoutException {
            if (timeout()) {
                throw new TimeoutException();
            }
            return this.wrappedFuture.get(Math.max(0L, Math.min(timeUnit.toNanos(j), this.timeoutAtNanos - this.time.nanoseconds())), TimeUnit.NANOSECONDS);
        }

        boolean timeout() {
            return this.time.nanoseconds() >= this.timeoutAtNanos;
        }

        static {
            $assertionsDisabled = !Fetcher.class.desiredAssertionStatus();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Fetcher(LcKafkaConsumerBuilder<K, V> lcKafkaConsumerBuilder) {
        this(lcKafkaConsumerBuilder.getConsumer(), lcKafkaConsumerBuilder.getPollTimeout(), lcKafkaConsumerBuilder.getConsumerRecordHandler(), lcKafkaConsumerBuilder.getWorkerPool(), lcKafkaConsumerBuilder.getPolicy(), lcKafkaConsumerBuilder.gracefulShutdownTimeout(), lcKafkaConsumerBuilder.handleRecordTimeout());
    }

    Fetcher(Consumer<K, V> consumer, Duration duration, ConsumerRecordHandler<K, V> consumerRecordHandler, ExecutorService executorService, CommitPolicy<K, V> commitPolicy, Duration duration2, Duration duration3) {
        this.pendingFutures = new HashMap();
        this.consumer = consumer;
        this.pollTimeoutMillis = duration.toMillis();
        this.handler = consumerRecordHandler;
        this.service = new ExecutorCompletionService<>(executorService);
        this.policy = commitPolicy;
        this.gracefulShutdownTimeoutNanos = duration2.toNanos();
        this.unsubscribeStatusFuture = new CompletableFuture<>();
        this.handleRecordTimeoutNanos = duration3.toNanos();
    }

    @Override // java.lang.Runnable
    public void run() {
        logger.debug("Fetcher thread started.");
        long j = this.pollTimeoutMillis;
        Consumer<K, V> consumer = this.consumer;
        UnsubscribedStatus unsubscribedStatus = UnsubscribedStatus.CLOSED;
        while (true) {
            try {
                ConsumerRecords<K, V> poll = consumer.poll(j);
                if (logger.isDebugEnabled()) {
                    logger.debug("Fetched " + poll.count() + " records from: " + poll.partitions());
                }
                dispatchFetchedRecords(poll);
                processCompletedRecords();
                processTimeoutRecords();
                if (!this.pendingFutures.isEmpty() && !poll.isEmpty()) {
                    consumer.pause(poll.partitions());
                }
                tryCommitRecordOffsets();
            } catch (WakeupException e) {
                if (closed()) {
                    break;
                }
            } catch (ExecutionException e2) {
                unsubscribedStatus = UnsubscribedStatus.ERROR;
                close();
            } catch (Exception e3) {
                if (e3 instanceof InterruptedException) {
                    Thread.currentThread().interrupt();
                }
                unsubscribedStatus = UnsubscribedStatus.ERROR;
                close();
                logger.error("Fetcher quit with unexpected exception. Will rebalance after poll timeout.", e3);
            }
        }
        gracefulShutdown(unsubscribedStatus);
        logger.debug("Fetcher thread exit.");
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        this.closed = true;
        this.consumer.wakeup();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public CompletableFuture<UnsubscribedStatus> unsubscribeStatusFuture() {
        return this.unsubscribeStatusFuture;
    }

    @VisibleForTesting
    Map<ConsumerRecord<K, V>, Future<ConsumerRecord<K, V>>> pendingFutures() {
        return this.pendingFutures;
    }

    private boolean closed() {
        return this.closed;
    }

    private void dispatchFetchedRecords(ConsumerRecords<K, V> consumerRecords) {
        ConsumerRecordHandler<K, V> consumerRecordHandler = this.handler;
        Iterator it = consumerRecords.iterator();
        while (it.hasNext()) {
            ConsumerRecord<K, V> consumerRecord = (ConsumerRecord) it.next();
            this.pendingFutures.put(consumerRecord, timeoutAwareFuture(this.service.submit(() -> {
                consumerRecordHandler.handleRecord(consumerRecord);
                return consumerRecord;
            })));
            this.policy.addPendingRecord(consumerRecord);
        }
    }

    private TimeoutFuture<K, V> timeoutAwareFuture(Future<ConsumerRecord<K, V>> future) {
        return unlimitedHandleRecordTime() ? new TimeoutFuture<>(future) : new TimeoutFuture<>(future, this.handleRecordTimeoutNanos);
    }

    private void processCompletedRecords() throws InterruptedException, ExecutionException {
        while (true) {
            Future<ConsumerRecord<K, V>> poll = this.service.poll();
            if (poll == null) {
                return;
            } else {
                processCompletedRecord(poll);
            }
        }
    }

    private void processCompletedRecord(Future<ConsumerRecord<K, V>> future) throws InterruptedException, ExecutionException {
        if (!$assertionsDisabled && !future.isDone()) {
            throw new AssertionError();
        }
        ConsumerRecord<K, V> consumerRecord = future.get();
        if (!$assertionsDisabled && consumerRecord == null) {
            throw new AssertionError();
        }
        if (!$assertionsDisabled && future.isCancelled()) {
            throw new AssertionError();
        }
        Future<ConsumerRecord<K, V>> remove = this.pendingFutures.remove(consumerRecord);
        if (!$assertionsDisabled && remove == null) {
            throw new AssertionError();
        }
        this.policy.completeRecord(consumerRecord);
    }

    private void processTimeoutRecords() throws TimeoutException {
        if (unlimitedHandleRecordTime()) {
            return;
        }
        for (Map.Entry<ConsumerRecord<K, V>, Future<ConsumerRecord<K, V>>> entry : this.pendingFutures.entrySet()) {
            TimeoutFuture timeoutFuture = (TimeoutFuture) entry.getValue();
            if (timeoutFuture.timeout()) {
                timeoutFuture.cancel(false);
                this.pendingFutures.remove(entry.getKey(), entry.getValue());
                throw new TimeoutException("timeout on handling record: " + entry.getKey());
            }
        }
    }

    private void tryCommitRecordOffsets() {
        Set<TopicPartition> tryCommit = this.policy.tryCommit(this.pendingFutures.isEmpty());
        if (tryCommit.isEmpty()) {
            return;
        }
        try {
            this.consumer.resume(tryCommit);
        } catch (IllegalStateException e) {
            tryCommit.retainAll(this.consumer.assignment());
            this.consumer.resume(tryCommit);
        }
    }

    private boolean unlimitedHandleRecordTime() {
        return this.handleRecordTimeoutNanos == 0;
    }

    private void gracefulShutdown(UnsubscribedStatus unsubscribedStatus) {
        long j = 0;
        try {
            try {
                j = waitPendingFuturesDone();
                this.policy.partialCommit();
                this.pendingFutures.clear();
                try {
                    this.consumer.close(j, TimeUnit.NANOSECONDS);
                    this.unsubscribeStatusFuture.complete(unsubscribedStatus);
                } finally {
                }
            } catch (Exception e) {
                logger.error("Graceful shutdown got unexpected exception", e);
                try {
                    this.consumer.close(j, TimeUnit.NANOSECONDS);
                    this.unsubscribeStatusFuture.complete(unsubscribedStatus);
                } finally {
                }
            }
        } catch (Throwable th) {
            try {
                this.consumer.close(j, TimeUnit.NANOSECONDS);
                this.unsubscribeStatusFuture.complete(unsubscribedStatus);
                throw th;
            } finally {
                this.unsubscribeStatusFuture.complete(unsubscribedStatus);
            }
        }
    }

    private long waitPendingFuturesDone() {
        long nanoTime = System.nanoTime();
        long j = this.gracefulShutdownTimeoutNanos;
        for (Map.Entry<ConsumerRecord<K, V>, Future<ConsumerRecord<K, V>>> entry : this.pendingFutures.entrySet()) {
            Future<ConsumerRecord<K, V>> value = entry.getValue();
            try {
                try {
                    try {
                    } catch (CancellationException e) {
                        if (j >= 0) {
                            j = Math.max(0L, this.gracefulShutdownTimeoutNanos - (System.nanoTime() - nanoTime));
                        }
                    } catch (TimeoutException e2) {
                        value.cancel(false);
                        if (j >= 0) {
                            j = Math.max(0L, this.gracefulShutdownTimeoutNanos - (System.nanoTime() - nanoTime));
                        }
                    }
                } catch (InterruptedException e3) {
                    value.cancel(false);
                    Thread.currentThread().interrupt();
                    if (j >= 0) {
                        j = Math.max(0L, this.gracefulShutdownTimeoutNanos - (System.nanoTime() - nanoTime));
                    }
                } catch (ExecutionException e4) {
                    logger.error("Fetcher quit with unexpected exception on handling consumer record: " + entry.getKey(), e4.getCause());
                    if (j >= 0) {
                        j = Math.max(0L, this.gracefulShutdownTimeoutNanos - (System.nanoTime() - nanoTime));
                    }
                }
                if (!$assertionsDisabled && j < 0) {
                    throw new AssertionError();
                }
                ConsumerRecord<K, V> consumerRecord = value.get(j, TimeUnit.MILLISECONDS);
                if (!$assertionsDisabled && consumerRecord == null) {
                    throw new AssertionError();
                }
                this.policy.completeRecord(consumerRecord);
                if (j >= 0) {
                    j = Math.max(0L, this.gracefulShutdownTimeoutNanos - (System.nanoTime() - nanoTime));
                }
            } catch (Throwable th) {
                if (j >= 0) {
                    Math.max(0L, this.gracefulShutdownTimeoutNanos - (System.nanoTime() - nanoTime));
                }
                throw th;
            }
        }
        return j;
    }

    static {
        $assertionsDisabled = !Fetcher.class.desiredAssertionStatus();
        logger = LoggerFactory.getLogger(Fetcher.class);
    }
}
