package com.yahoo.bullet.dsl.connector;

import com.yahoo.bullet.common.BulletConfig;
import com.yahoo.bullet.dsl.BulletDSLConfig;
import com.yahoo.bullet.dsl.BulletDSLException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.KafkaException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/yahoo/bullet/dsl/connector/KafkaConnector.class */
public class KafkaConnector extends BulletConnector {
    private static final Logger log = LoggerFactory.getLogger(KafkaConnector.class);
    private static final long serialVersionUID = -256168979644903950L;
    private transient KafkaConsumer<Object, Object> consumer;
    private List<String> topics;
    private boolean startAtEnd;
    private boolean autoCommit;
    private boolean asyncCommit;
    private Duration timeout;

    public KafkaConnector(BulletConfig bulletConfig) {
        super(bulletConfig);
        this.topics = (List) this.config.getAs(BulletDSLConfig.CONNECTOR_KAFKA_TOPICS, List.class);
        this.startAtEnd = ((Boolean) this.config.getAs(BulletDSLConfig.CONNECTOR_KAFKA_START_AT_END_ENABLE, Boolean.class)).booleanValue();
        this.autoCommit = ((Boolean) this.config.getAs(BulletDSLConfig.CONNECTOR_KAFKA_ENABLE_AUTO_COMMIT, Boolean.class)).booleanValue();
        this.asyncCommit = ((Boolean) this.config.getAs(BulletDSLConfig.CONNECTOR_ASYNC_COMMIT_ENABLE, Boolean.class)).booleanValue();
        this.timeout = Duration.ofMillis(((Number) this.config.getAs(BulletDSLConfig.CONNECTOR_READ_TIMEOUT_MS, Number.class)).longValue());
    }

    @Override // com.yahoo.bullet.dsl.connector.BulletConnector
    public void initialize() {
        this.consumer = new KafkaConsumer<>(this.config.getAllWithPrefix(Optional.empty(), BulletDSLConfig.CONNECTOR_KAFKA_NAMESPACE, true));
        this.consumer.subscribe(this.topics);
        if (this.startAtEnd) {
            this.consumer.seekToEnd(Collections.emptyList());
        }
        this.consumer.poll(Duration.ZERO);
    }

    @Override // com.yahoo.bullet.dsl.connector.BulletConnector
    public List<Object> read() throws BulletDSLException {
        try {
            ConsumerRecords poll = this.consumer.poll(this.timeout);
            ArrayList arrayList = new ArrayList();
            poll.forEach(consumerRecord -> {
                arrayList.add(consumerRecord.value());
            });
            if (!this.autoCommit) {
                commit();
            }
            return arrayList;
        } catch (KafkaException e) {
            throw new BulletDSLException("Could not read from consumer.", e);
        }
    }

    @Override // java.lang.AutoCloseable
    public void close() {
        this.consumer.close();
    }

    private void commit() {
        if (this.asyncCommit) {
            this.consumer.commitAsync();
        } else {
            this.consumer.commitSync();
        }
    }

    void setConsumer(KafkaConsumer<Object, Object> kafkaConsumer) {
        this.consumer = kafkaConsumer;
    }
}
