package com.sitewhere.microservice.kafka;

import com.sitewhere.microservice.lifecycle.TenantEngineLifecycleComponent;
import com.sitewhere.spi.SiteWhereException;
import com.sitewhere.spi.microservice.instance.IInstanceSettings;
import com.sitewhere.spi.microservice.kafka.IMicroserviceKafkaProducer;
import com.sitewhere.spi.microservice.lifecycle.ILifecycleProgressMonitor;
import java.util.Arrays;
import java.util.Collections;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.errors.InvalidReplicationFactorException;
import org.apache.kafka.common.errors.RetriableException;
import org.apache.kafka.common.errors.TopicExistsException;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.StringSerializer;

/* loaded from: input_file:com/sitewhere/microservice/kafka/MicroserviceKafkaProducer.class */
public abstract class MicroserviceKafkaProducer extends TenantEngineLifecycleComponent implements IMicroserviceKafkaProducer {
    private static final int KAFKA_RETRY_INTERVAL_MS = 10000;
    private KafkaProducer<String, byte[]> producer;
    private AdminClient kafkaAdmin;
    private AckPolicy ackPolicy;
    private CountDownLatch kafkaAvailable;
    ExecutorService waiterService;

    /* loaded from: input_file:com/sitewhere/microservice/kafka/MicroserviceKafkaProducer$KafkaWaiter.class */
    private class KafkaWaiter implements Runnable {
        private KafkaWaiter() {
        }

        @Override // java.lang.Runnable
        public void run() {
            MicroserviceKafkaProducer.this.getLogger().info("Attempting to connect to Kafka...");
            while (true) {
                try {
                    MicroserviceKafkaProducer.this.kafkaAdmin = AdminClient.create(MicroserviceKafkaProducer.this.buildAdminConfiguration());
                    if (((TopicDescription) ((Map) MicroserviceKafkaProducer.this.getKafkaAdmin().describeTopics(Arrays.asList(MicroserviceKafkaProducer.this.getTargetTopicName())).all().get()).get(MicroserviceKafkaProducer.this.getTargetTopicName())) != null) {
                        MicroserviceKafkaProducer.this.getLogger().info("Kafka detected as available.");
                        MicroserviceKafkaProducer.this.getKafkaAvailable().countDown();
                        return;
                    }
                } catch (ExecutionException e) {
                    Throwable cause = e.getCause();
                    if (cause instanceof UnknownTopicOrPartitionException) {
                        try {
                            IInstanceSettings instanceSettings = MicroserviceKafkaProducer.this.getMicroservice().getInstanceSettings();
                            MicroserviceKafkaProducer.this.getKafkaAdmin().createTopics(Collections.singletonList(new NewTopic(MicroserviceKafkaProducer.this.getTargetTopicName(), instanceSettings.getKafkaDefaultTopicPartitions().intValue(), (short) instanceSettings.getKafkaDefaultTopicReplicationFactor().intValue()))).all().get();
                            MicroserviceKafkaProducer.this.getLogger().info(String.format("Kafka topic '%s' created.", MicroserviceKafkaProducer.this.getTargetTopicName()));
                        } catch (SiteWhereException e2) {
                            MicroserviceKafkaProducer.this.getLogger().error("Exception creating topic.", e2);
                        } catch (InterruptedException e3) {
                            MicroserviceKafkaProducer.this.getLogger().error("Interrupted while creating topic.");
                            return;
                        } catch (ExecutionException e4) {
                            if (e4.getCause() instanceof TopicExistsException) {
                                MicroserviceKafkaProducer.this.getLogger().debug("Topic already existed.");
                            } else if (e4.getCause() instanceof InvalidReplicationFactorException) {
                                MicroserviceKafkaProducer.this.getLogger().info("Not enough replicas are available to create topic. Waiting.");
                                try {
                                    Thread.sleep(1000L);
                                } catch (InterruptedException e5) {
                                    MicroserviceKafkaProducer.this.getLogger().error("Interrupted while waiting for replicas.");
                                    return;
                                }
                            } else {
                                MicroserviceKafkaProducer.this.getLogger().error("Kakfa exception creating topic.", e4);
                            }
                        }
                    } else {
                        MicroserviceKafkaProducer.this.getLogger().warn("Execution exception connecting to Kafka. Will continue attempting to connect. (" + e.getMessage() + ")", cause);
                    }
                } catch (ConfigException e6) {
                    MicroserviceKafkaProducer.this.getLogger().warn("Configuration issue connecting to Kafka. Will continue attempting to connect.", e6);
                } catch (Throwable th) {
                    MicroserviceKafkaProducer.this.getLogger().warn("Exception while connecting to Kafka. Will continue attempting to connect.", th);
                }
                try {
                    Thread.sleep(10000L);
                } catch (InterruptedException e7) {
                    MicroserviceKafkaProducer.this.getLogger().warn("Interrupted while waiting for Kafka to become available.");
                    return;
                }
            }
        }
    }

    public MicroserviceKafkaProducer(AckPolicy ackPolicy) {
        this.ackPolicy = ackPolicy;
    }

    @Override // com.sitewhere.microservice.lifecycle.LifecycleComponent, com.sitewhere.spi.microservice.lifecycle.ILifecycleComponent
    public void start(ILifecycleProgressMonitor iLifecycleProgressMonitor) throws SiteWhereException {
        getLogger().info("Producer connecting to Kafka: " + getMicroservice().getInstanceSettings().getKafkaBootstrapServers());
        getLogger().info("Will be producing messages for: " + getTargetTopicName());
        this.kafkaAvailable = new CountDownLatch(1);
        this.waiterService = Executors.newSingleThreadExecutor();
        getWaiterService().execute(new KafkaWaiter());
    }

    @Override // com.sitewhere.microservice.lifecycle.LifecycleComponent, com.sitewhere.spi.microservice.lifecycle.ILifecycleComponent
    public void stop(ILifecycleProgressMonitor iLifecycleProgressMonitor) throws SiteWhereException {
        if (getProducer() != null) {
            getProducer().close();
        }
        if (getWaiterService() != null) {
            getWaiterService().shutdown();
        }
    }

    @Override // com.sitewhere.spi.microservice.kafka.IMicroserviceKafkaProducer
    public Future<RecordMetadata> send(String str, byte[] bArr) throws SiteWhereException {
        while (true) {
            ProducerRecord producerRecord = new ProducerRecord(getTargetTopicName(), str, bArr);
            try {
                if (getKafkaAvailable().getCount() != 0) {
                    getLogger().info("Producer waiting on Kafka to become available...");
                    getKafkaAvailable().await();
                }
                if (getProducer() == null) {
                    this.producer = new KafkaProducer<>(buildConfiguration());
                }
                return getProducer().send(producerRecord);
            } catch (IllegalStateException e) {
                throw new SiteWhereException("Producer unable to send record.", e);
            } catch (InterruptedException e2) {
                throw new SiteWhereException("Producer interrupted while waiting for Kafka.", e2);
            } catch (RetriableException e3) {
                try {
                    getLogger().info(String.format("Got retriable exception [%s] while sending Kafka payload. Waiting to retry.", e3.getMessage()));
                    Thread.sleep(5000L);
                } catch (InterruptedException e4) {
                    getLogger().info("Interrupted while waiting to send Kafka payload.");
                }
            } catch (Throwable th) {
                throw new SiteWhereException("Unhandled exception in producer while sending record.", th);
            }
        }
    }

    protected Properties buildConfiguration() throws SiteWhereException {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", getMicroservice().getInstanceSettings().getKafkaBootstrapServers());
        properties.put("acks", getAckPolicy().getConfig());
        properties.put("key.serializer", StringSerializer.class.getName());
        properties.put("value.serializer", ByteArraySerializer.class.getName());
        return properties;
    }

    protected Properties buildAdminConfiguration() throws SiteWhereException {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", getMicroservice().getInstanceSettings().getKafkaBootstrapServers());
        return properties;
    }

    protected KafkaProducer<String, byte[]> getProducer() {
        return this.producer;
    }

    protected void setProducer(KafkaProducer<String, byte[]> kafkaProducer) {
        this.producer = kafkaProducer;
    }

    protected AdminClient getKafkaAdmin() {
        return this.kafkaAdmin;
    }

    protected void setKafkaAdmin(AdminClient adminClient) {
        this.kafkaAdmin = adminClient;
    }

    protected AckPolicy getAckPolicy() {
        return this.ackPolicy;
    }

    protected void setAckPolicy(AckPolicy ackPolicy) {
        this.ackPolicy = ackPolicy;
    }

    protected CountDownLatch getKafkaAvailable() {
        return this.kafkaAvailable;
    }

    protected void setKafkaAvailable(CountDownLatch countDownLatch) {
        this.kafkaAvailable = countDownLatch;
    }

    protected ExecutorService getWaiterService() {
        return this.waiterService;
    }

    protected void setWaiterService(ExecutorService executorService) {
        this.waiterService = executorService;
    }
}
