package cascading.local.tap.kafka.commit;

import cascading.local.tap.kafka.decorator.ForwardingConsumer;
import cascading.local.tap.kafka.decorator.ForwardingConsumerRecords;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:cascading/local/tap/kafka/commit/CommittingConsumer.class */
public class CommittingConsumer<K, V> extends ForwardingConsumer<K, V> {
    private static final Logger LOG = LoggerFactory.getLogger(CommittingConsumer.class);
    Map<TopicPartition, OffsetAndMetadata> currentOffsets;
    CommitListener listener;

    public CommittingConsumer(Properties properties, CommitListener commitListener) {
        super(properties);
        this.currentOffsets = new HashMap();
        this.listener = new CommitListener() { // from class: cascading.local.tap.kafka.commit.CommittingConsumer.1
            @Override // cascading.local.tap.kafka.commit.CommitListener
            public void onClose(Consumer consumer, Map<TopicPartition, OffsetAndMetadata> map) {
                CommittingConsumer.LOG.info("committing offsets on close");
            }

            @Override // cascading.local.tap.kafka.commit.CommitListener
            public void onRevoke(Consumer consumer, Map<TopicPartition, OffsetAndMetadata> map) {
                CommittingConsumer.LOG.info("committing offsets on partition revoke");
            }

            @Override // cascading.local.tap.kafka.commit.CommitListener
            public boolean onFail(Consumer consumer, RuntimeException runtimeException, Map<TopicPartition, OffsetAndMetadata> map) {
                CommittingConsumer.LOG.error("failed committing offsets", runtimeException);
                return true;
            }
        };
        this.listener = commitListener;
    }

    public CommittingConsumer(Properties properties) {
        super(properties);
        this.currentOffsets = new HashMap();
        this.listener = new CommitListener() { // from class: cascading.local.tap.kafka.commit.CommittingConsumer.1
            @Override // cascading.local.tap.kafka.commit.CommitListener
            public void onClose(Consumer consumer, Map<TopicPartition, OffsetAndMetadata> map) {
                CommittingConsumer.LOG.info("committing offsets on close");
            }

            @Override // cascading.local.tap.kafka.commit.CommitListener
            public void onRevoke(Consumer consumer, Map<TopicPartition, OffsetAndMetadata> map) {
                CommittingConsumer.LOG.info("committing offsets on partition revoke");
            }

            @Override // cascading.local.tap.kafka.commit.CommitListener
            public boolean onFail(Consumer consumer, RuntimeException runtimeException, Map<TopicPartition, OffsetAndMetadata> map) {
                CommittingConsumer.LOG.error("failed committing offsets", runtimeException);
                return true;
            }
        };
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // cascading.local.tap.kafka.decorator.ForwardingConsumer
    public KafkaConsumer<K, V> createKafkaConsumerInstance(Properties properties) {
        if (Boolean.parseBoolean(properties.getProperty("enable.auto.commit"))) {
            LOG.info("disabling kafka auto-commit");
        }
        properties.setProperty("enable.auto.commit", "false");
        return super.createKafkaConsumerInstance(properties);
    }

    @Override // cascading.local.tap.kafka.decorator.ForwardingConsumer
    public void subscribe(Collection<String> collection) {
        super.subscribe(collection, new CommittingRebalanceListener(getConsumer(), this.listener, this.currentOffsets));
    }

    @Override // cascading.local.tap.kafka.decorator.ForwardingConsumer
    public void subscribe(Collection<String> collection, ConsumerRebalanceListener consumerRebalanceListener) {
        super.subscribe(collection, new CommittingRebalanceListener(consumerRebalanceListener, getConsumer(), this.listener, this.currentOffsets));
    }

    @Override // cascading.local.tap.kafka.decorator.ForwardingConsumer
    public void subscribe(Pattern pattern, ConsumerRebalanceListener consumerRebalanceListener) {
        super.subscribe(pattern, new CommittingRebalanceListener(consumerRebalanceListener, getConsumer(), this.listener, this.currentOffsets));
    }

    @Override // cascading.local.tap.kafka.decorator.ForwardingConsumer
    public ConsumerRecords<K, V> poll(long j) {
        return new ForwardingConsumerRecords<K, V>(super.poll(j)) { // from class: cascading.local.tap.kafka.commit.CommittingConsumer.2
            @Override // cascading.local.tap.kafka.decorator.ForwardingConsumerRecords
            public Iterator<ConsumerRecord<K, V>> iterator() {
                return new OffsetRecorderIterator(CommittingConsumer.this.currentOffsets, super.iterator());
            }
        };
    }

    @Override // cascading.local.tap.kafka.decorator.ForwardingConsumer
    public void close() {
        try {
            this.listener.onClose(getConsumer(), this.currentOffsets);
            getConsumer().commitSync(this.currentOffsets);
        } catch (RuntimeException e) {
            if (this.listener.onFail(getConsumer(), e, this.currentOffsets)) {
                throw e;
            }
        } finally {
            super.close();
        }
    }

    @Override // cascading.local.tap.kafka.decorator.ForwardingConsumer
    public void close(long j, TimeUnit timeUnit) {
        try {
            try {
                this.listener.onClose(getConsumer(), this.currentOffsets);
                getConsumer().commitSync(this.currentOffsets);
                super.close(j, timeUnit);
            } catch (RuntimeException e) {
                if (this.listener.onFail(getConsumer(), e, this.currentOffsets)) {
                    throw e;
                }
                super.close(j, timeUnit);
            }
        } catch (Throwable th) {
            super.close(j, timeUnit);
            throw th;
        }
    }
}
