package com.expediagroup.apiary.extensions.events.metastore.kafka.messaging;

import com.expediagroup.apiary.extensions.events.metastore.common.Preconditions;
import com.expediagroup.apiary.extensions.events.metastore.event.ApiaryListenerEvent;
import com.expediagroup.apiary.extensions.events.metastore.io.MetaStoreEventSerDe;
import com.expediagroup.apiary.extensions.events.metastore.io.jackson.JsonMetaStoreEventSerDe;
import com.google.common.annotations.VisibleForTesting;
import java.io.Closeable;
import java.time.Duration;
import java.util.Collections;
import java.util.Iterator;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;

/* loaded from: input_file:com/expediagroup/apiary/extensions/events/metastore/kafka/messaging/KafkaMessageReader.class */
public class KafkaMessageReader implements Iterator<ApiaryListenerEvent>, Closeable {
    private static final Duration POLL_TIMEOUT = Duration.ofMinutes(5);
    private KafkaConsumer<Long, byte[]> consumer;
    private MetaStoreEventSerDe eventSerDe;
    private Iterator<ConsumerRecord<Long, byte[]>> records;

    /* loaded from: input_file:com/expediagroup/apiary/extensions/events/metastore/kafka/messaging/KafkaMessageReader$KafkaMessageReaderBuilder.class */
    public static final class KafkaMessageReaderBuilder {
        private String bootstrapServers;
        private String topicName;
        private String groupId = "apiary-kafka-metastore-receiver-";
        private MetaStoreEventSerDe metaStoreEventSerDe = new JsonMetaStoreEventSerDe();
        private Properties consumerProperties = new Properties();

        private KafkaMessageReaderBuilder(String str, String str2, String str3) {
            this.bootstrapServers = str;
            this.topicName = str2;
            this.groupId += str3;
        }

        public static KafkaMessageReaderBuilder builder(String str, String str2, String str3) {
            return new KafkaMessageReaderBuilder(Preconditions.checkNotEmpty(str, "Bootstrap servers is not set"), Preconditions.checkNotEmpty(str2, "Topic name is not set"), Preconditions.checkNotEmpty(str3, "Application name is not set"));
        }

        public KafkaMessageReaderBuilder withMetaStoreEventSerDe(MetaStoreEventSerDe metaStoreEventSerDe) {
            this.metaStoreEventSerDe = metaStoreEventSerDe;
            return this;
        }

        public KafkaMessageReaderBuilder withConsumerProperties(Properties properties) {
            this.consumerProperties = properties;
            return this;
        }

        public KafkaMessageReader build() {
            Properties properties = new Properties();
            properties.put("bootstrap.servers", this.bootstrapServers);
            properties.put("group.id", this.groupId);
            properties.put("key.deserializer", "org.apache.kafka.common.serialization.LongDeserializer");
            properties.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
            this.consumerProperties.forEach((obj, obj2) -> {
                properties.merge(obj, obj2, (obj, obj2) -> {
                    return obj;
                });
            });
            return new KafkaMessageReader(this.topicName, this.metaStoreEventSerDe, properties);
        }
    }

    private KafkaMessageReader(String str, MetaStoreEventSerDe metaStoreEventSerDe, Properties properties) {
        this(str, metaStoreEventSerDe, (KafkaConsumer<Long, byte[]>) new KafkaConsumer(properties));
    }

    @VisibleForTesting
    KafkaMessageReader(String str, MetaStoreEventSerDe metaStoreEventSerDe, KafkaConsumer<Long, byte[]> kafkaConsumer) {
        this.eventSerDe = metaStoreEventSerDe;
        this.consumer = kafkaConsumer;
        this.consumer.subscribe(Collections.singletonList(str));
    }

    /* JADX WARN: Can't rename method to resolve collision */
    @Override // java.util.Iterator
    public ApiaryListenerEvent next() {
        readRecordsIfNeeded();
        return this.eventSerDe.unmarshal((byte[]) this.records.next().value());
    }

    @Override // java.util.Iterator
    public boolean hasNext() {
        return true;
    }

    @Override // java.util.Iterator
    public void remove() {
        throw new UnsupportedOperationException("Cannot remove message from Kafka topic");
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        this.consumer.close();
    }

    private void readRecordsIfNeeded() {
        while (true) {
            if (this.records != null && this.records.hasNext()) {
                return;
            } else {
                this.records = this.consumer.poll(POLL_TIMEOUT).iterator();
            }
        }
    }
}
