package cn.leancloud.kafka.consumer;

import java.time.Duration;
import java.util.Collections;
import java.util.Map;
import java.util.Set;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.RetriableException;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:cn/leancloud/kafka/consumer/AbstractCommitPolicy.class */
public abstract class AbstractCommitPolicy<K, V> implements CommitPolicy {
    static SleepFunction sleepFunction = Thread::sleep;
    protected final Consumer<K, V> consumer;
    private final long syncCommitRetryIntervalMs;
    private final int maxAttemptsForEachSyncCommit;
    private boolean commitPuased;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:cn/leancloud/kafka/consumer/AbstractCommitPolicy$RetryContext.class */
    public static class RetryContext {
        private final long retryInterval;
        private final int maxAttempts;
        private int numOfAttempts;

        private RetryContext(long j, int i) {
            this.retryInterval = j;
            this.maxAttempts = i;
            this.numOfAttempts = 0;
        }

        void onError(RetriableException retriableException) {
            int i = this.numOfAttempts + 1;
            this.numOfAttempts = i;
            if (i >= this.maxAttempts) {
                throw retriableException;
            }
            try {
                AbstractCommitPolicy.sleepFunction.sleep(this.retryInterval);
            } catch (InterruptedException e) {
                retriableException.addSuppressed(e);
                Thread.currentThread().interrupt();
                throw retriableException;
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:cn/leancloud/kafka/consumer/AbstractCommitPolicy$SleepFunction.class */
    public interface SleepFunction {
        void sleep(long j) throws InterruptedException;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public AbstractCommitPolicy(Consumer<K, V> consumer, Duration duration, int i) {
        this.consumer = consumer;
        this.syncCommitRetryIntervalMs = duration.toMillis();
        this.maxAttemptsForEachSyncCommit = i;
    }

    @Override // cn.leancloud.kafka.consumer.CommitPolicy
    public Set<TopicPartition> partialCommitSync(ProcessRecordsProgress processRecordsProgress) {
        Map<TopicPartition, OffsetAndMetadata> completedOffsetsToCommit = processRecordsProgress.completedOffsetsToCommit();
        if (completedOffsetsToCommit.isEmpty()) {
            return Collections.emptySet();
        }
        commitSyncWithRetry(completedOffsetsToCommit);
        processRecordsProgress.updateCommittedOffsets(completedOffsetsToCommit);
        return processRecordsProgress.clearCompletedPartitions(completedOffsetsToCommit);
    }

    @Override // cn.leancloud.kafka.consumer.CommitPolicy
    public void pauseCommit() {
        this.commitPuased = true;
    }

    @Override // cn.leancloud.kafka.consumer.CommitPolicy
    public void resumeCommit() {
        this.commitPuased = false;
    }

    @Override // cn.leancloud.kafka.consumer.CommitPolicy
    public boolean commitPaused() {
        return this.commitPuased;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Set<TopicPartition> fullCommitSync(ProcessRecordsProgress processRecordsProgress) {
        commitSyncWithRetry();
        Set<TopicPartition> allPartitions = processRecordsProgress.allPartitions();
        processRecordsProgress.clearAll();
        return allPartitions;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void commitSyncWithRetry() {
        RetryContext context = context();
        while (true) {
            try {
                this.consumer.commitSync();
                return;
            } catch (RetriableException e) {
                context.onError(e);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void commitSyncWithRetry(Map<TopicPartition, OffsetAndMetadata> map) {
        RetryContext context = context();
        while (true) {
            try {
                this.consumer.commitSync(map);
                return;
            } catch (RetriableException e) {
                context.onError(e);
            }
        }
    }

    private RetryContext context() {
        return new RetryContext(this.syncCommitRetryIntervalMs, this.maxAttemptsForEachSyncCommit);
    }
}
