package com.sitewhere.microservice.kafka;

import com.sitewhere.microservice.lifecycle.TenantEngineLifecycleComponent;
import com.sitewhere.spi.SiteWhereException;
import com.sitewhere.spi.microservice.kafka.IMicroserviceKafkaConsumer;
import com.sitewhere.spi.microservice.lifecycle.ILifecycleProgressMonitor;
import java.time.Duration;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;

/* loaded from: input_file:com/sitewhere/microservice/kafka/MicroserviceKafkaConsumer.class */
public abstract class MicroserviceKafkaConsumer extends TenantEngineLifecycleComponent implements IMicroserviceKafkaConsumer {
    private KafkaConsumer<String, byte[]> consumer;
    private ExecutorService executor;

    /* loaded from: input_file:com/sitewhere/microservice/kafka/MicroserviceKafkaConsumer$MessageConsumer.class */
    private class MessageConsumer implements Runnable {
        private MessageConsumer() {
        }

        @Override // java.lang.Runnable
        public void run() {
            while (true) {
                try {
                    MicroserviceKafkaConsumer.this.getLogger().debug(String.format("Kafka consumer subscribing to %s", MicroserviceKafkaConsumer.this.getSourceTopicNames().toString()));
                    MicroserviceKafkaConsumer.this.getConsumer().subscribe(MicroserviceKafkaConsumer.this.getSourceTopicNames());
                    while (true) {
                        try {
                            try {
                            } catch (WakeupException e) {
                                MicroserviceKafkaConsumer.this.getLogger().info("Consumer thread received shutdown request.");
                                MicroserviceKafkaConsumer.this.getConsumer().unsubscribe();
                                MicroserviceKafkaConsumer.this.getConsumer().close();
                                return;
                            }
                        } catch (Throwable th) {
                            MicroserviceKafkaConsumer.this.getConsumer().close();
                            throw th;
                        }
                    }
                } catch (SiteWhereException e2) {
                    MicroserviceKafkaConsumer.this.getLogger().error("Unable to subscribe to topics.", e2);
                    try {
                        Thread.sleep(1000L);
                    } catch (InterruptedException e3) {
                        return;
                    }
                } catch (Throwable th2) {
                    MicroserviceKafkaConsumer.this.getLogger().error("Unhandled exception while subscribing to topics.", th2);
                    Thread.sleep(1000L);
                }
            }
            ConsumerRecords poll = MicroserviceKafkaConsumer.this.getConsumer().poll(Duration.ofMillis(Long.MAX_VALUE));
            MicroserviceKafkaConsumer.this.getLogger().debug(String.format("Kafka consumer received %d records on poll.", Integer.valueOf(poll.count())));
            for (TopicPartition topicPartition : poll.partitions()) {
                try {
                    List<ConsumerRecord<String, byte[]>> records = poll.records(topicPartition);
                    MicroserviceKafkaConsumer.this.getLogger().debug(String.format("Kafka consumer processing %d records for %s partition %s.", Integer.valueOf(records.size()), topicPartition.topic(), Integer.valueOf(topicPartition.partition())));
                    MicroserviceKafkaConsumer.this.process(topicPartition, records);
                } catch (Throwable th3) {
                    MicroserviceKafkaConsumer.this.getLogger().error("Unhandled exception in consumer processing.", th3);
                }
            }
        }
    }

    /* loaded from: input_file:com/sitewhere/microservice/kafka/MicroserviceKafkaConsumer$MicroserviceConsumerThreadFactory.class */
    private class MicroserviceConsumerThreadFactory implements ThreadFactory {
        private AtomicInteger counter;

        private MicroserviceConsumerThreadFactory() {
            this.counter = new AtomicInteger();
        }

        @Override // java.util.concurrent.ThreadFactory
        public Thread newThread(Runnable runnable) {
            return new Thread(runnable, "Kafka Consumer " + this.counter.incrementAndGet());
        }
    }

    @Override // com.sitewhere.microservice.lifecycle.LifecycleComponent, com.sitewhere.spi.microservice.lifecycle.ILifecycleComponent
    public void start(ILifecycleProgressMonitor iLifecycleProgressMonitor) throws SiteWhereException {
        getLogger().info("Consumer connecting to Kafka: " + KafkaUtils.getBootstrapServers(getMicroservice()));
        getLogger().info("Will be consuming messages from: " + getSourceTopicNames());
        this.consumer = new KafkaConsumer<>(buildConfiguration());
        this.executor = Executors.newSingleThreadExecutor(new MicroserviceConsumerThreadFactory());
        this.executor.execute(new MessageConsumer());
    }

    @Override // com.sitewhere.microservice.lifecycle.LifecycleComponent, com.sitewhere.spi.microservice.lifecycle.ILifecycleComponent
    public void stop(ILifecycleProgressMonitor iLifecycleProgressMonitor) throws SiteWhereException {
        if (getConsumer() != null) {
            getConsumer().wakeup();
        }
        if (this.executor != null) {
            this.executor.shutdown();
        }
    }

    protected Properties buildConfiguration() throws SiteWhereException {
        Properties properties = new Properties();
        properties.put("client.id", getConsumerId());
        properties.put("group.id", getConsumerGroupId());
        properties.put("bootstrap.servers", KafkaUtils.getBootstrapServers(getMicroservice()));
        properties.put("key.deserializer", StringDeserializer.class.getName());
        properties.put("value.deserializer", ByteArrayDeserializer.class.getName());
        properties.put("auto.offset.reset", "earliest");
        properties.put("enable.auto.commit", false);
        return properties;
    }

    @Override // com.sitewhere.spi.microservice.kafka.IMicroserviceKafkaConsumer
    public KafkaConsumer<String, byte[]> getConsumer() {
        return this.consumer;
    }

    protected void setConsumer(KafkaConsumer<String, byte[]> kafkaConsumer) {
        this.consumer = kafkaConsumer;
    }
}
