package com.sitewhere.microservice.kafka;

import com.sitewhere.microservice.configuration.model.instance.infrastructure.KafkaConfiguration;
import com.sitewhere.spi.SiteWhereException;
import com.sitewhere.spi.microservice.lifecycle.ITenantEngineLifecycleComponent;
import java.util.Arrays;
import java.util.Collections;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.errors.InvalidReplicationFactorException;
import org.apache.kafka.common.errors.TopicExistsException;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.slf4j.Logger;

/* loaded from: input_file:com/sitewhere/microservice/kafka/KafkaTopicWaiter.class */
public abstract class KafkaTopicWaiter implements Runnable {
    private static final int KAFKA_RETRY_INTERVAL_MS = 5000;
    private ITenantEngineLifecycleComponent component;
    private String topicName;
    private AdminClient kafkaAdmin;

    public KafkaTopicWaiter(ITenantEngineLifecycleComponent iTenantEngineLifecycleComponent, String str) {
        this.component = iTenantEngineLifecycleComponent;
        this.topicName = str;
    }

    protected abstract void onTopicAvailable();

    protected Properties buildAdminConfiguration() throws SiteWhereException {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", KafkaUtils.getBootstrapServers(getComponent().getMicroservice()));
        return properties;
    }

    @Override // java.lang.Runnable
    public void run() {
        KafkaConfiguration kafka = getComponent().getMicroservice().getInstanceConfiguration().getInfrastructure().getKafka();
        getLogger().info("Attempting to connect to Kafka...");
        while (true) {
            try {
                this.kafkaAdmin = AdminClient.create(buildAdminConfiguration());
                if (((TopicDescription) ((Map) getKafkaAdmin().describeTopics(Arrays.asList(getTopicName())).all().get()).get(getTopicName())) != null) {
                    getLogger().info("Kafka detected as available.");
                    onTopicAvailable();
                    return;
                }
            } catch (ConfigException e) {
                getLogger().warn("Configuration issue connecting to Kafka. Will continue attempting to connect.", e);
            } catch (ExecutionException e2) {
                Throwable cause = e2.getCause();
                if (cause instanceof UnknownTopicOrPartitionException) {
                    try {
                        getKafkaAdmin().createTopics(Collections.singletonList(new NewTopic(getTopicName(), kafka.getDefaultTopicPartitions(), (short) kafka.getDefaultTopicReplicationFactor()))).all().get();
                        getLogger().info(String.format("Kafka topic '%s' created.", getTopicName()));
                    } catch (InterruptedException e3) {
                        getLogger().error("Interrupted while creating topic.");
                        return;
                    } catch (ExecutionException e4) {
                        if (e4.getCause() instanceof TopicExistsException) {
                            getLogger().debug("Topic already existed.");
                        } else if (e4.getCause() instanceof InvalidReplicationFactorException) {
                            getLogger().info("Not enough replicas are available to create topic. Waiting.");
                            try {
                                Thread.sleep(1000L);
                            } catch (InterruptedException e5) {
                                getLogger().error("Interrupted while waiting for replicas.");
                                return;
                            }
                        } else {
                            getLogger().error("Kakfa exception creating topic.", e4);
                        }
                    } catch (Throwable th) {
                        getLogger().error("Unhandled exception while creating topic.", th);
                    }
                } else {
                    getLogger().warn("Execution exception connecting to Kafka. Will continue attempting to connect. (" + e2.getMessage() + ")", cause);
                }
            } catch (Throwable th2) {
                getLogger().warn("Exception while connecting to Kafka. Will continue attempting to connect.", th2);
            }
            try {
                Thread.sleep(5000L);
            } catch (InterruptedException e6) {
                getLogger().warn("Interrupted while waiting for Kafka to become available.");
                return;
            }
        }
    }

    protected Logger getLogger() {
        return getComponent().getLogger();
    }

    protected ITenantEngineLifecycleComponent getComponent() {
        return this.component;
    }

    protected String getTopicName() {
        return this.topicName;
    }

    protected AdminClient getKafkaAdmin() {
        return this.kafkaAdmin;
    }
}
