package com.sitewhere.microservice.kafka;

import com.sitewhere.microservice.lifecycle.TenantEngineLifecycleComponent;
import com.sitewhere.spi.SiteWhereException;
import com.sitewhere.spi.microservice.kafka.IMicroserviceKafkaProducer;
import com.sitewhere.spi.microservice.lifecycle.ILifecycleProgressMonitor;
import com.sitewhere.spi.microservice.lifecycle.ITenantEngineLifecycleComponent;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.errors.RetriableException;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.StringSerializer;

/* loaded from: input_file:com/sitewhere/microservice/kafka/MicroserviceKafkaProducer.class */
public abstract class MicroserviceKafkaProducer<K, P> extends TenantEngineLifecycleComponent implements IMicroserviceKafkaProducer<K, P> {
    private KafkaProducer<K, P> producer;
    private AckPolicy ackPolicy = AckPolicy.Leader;
    private CountDownLatch kafkaAvailable;
    ExecutorService waiterService;

    /* loaded from: input_file:com/sitewhere/microservice/kafka/MicroserviceKafkaProducer$KafkaWaiter.class */
    private class KafkaWaiter extends KafkaTopicWaiter {
        public KafkaWaiter(ITenantEngineLifecycleComponent iTenantEngineLifecycleComponent, String str) {
            super(iTenantEngineLifecycleComponent, str);
        }

        @Override // com.sitewhere.microservice.kafka.KafkaTopicWaiter
        protected void onTopicAvailable() {
            MicroserviceKafkaProducer.this.getKafkaAvailable().countDown();
        }
    }

    @Override // com.sitewhere.microservice.lifecycle.LifecycleComponent, com.sitewhere.spi.microservice.lifecycle.ILifecycleComponent
    public void start(ILifecycleProgressMonitor iLifecycleProgressMonitor) throws SiteWhereException {
        getLogger().info("Producer connecting to Kafka: " + KafkaUtils.getBootstrapServers(getMicroservice()));
        getLogger().info("Will be producing messages for: " + getTargetTopicName());
        getLogger().info("Keys will be encoded with: " + getKeySerializer().getName());
        this.kafkaAvailable = new CountDownLatch(1);
        this.waiterService = Executors.newSingleThreadExecutor();
        getWaiterService().execute(new KafkaWaiter(this, getTargetTopicName()));
    }

    @Override // com.sitewhere.microservice.lifecycle.LifecycleComponent, com.sitewhere.spi.microservice.lifecycle.ILifecycleComponent
    public void stop(ILifecycleProgressMonitor iLifecycleProgressMonitor) throws SiteWhereException {
        if (getProducer() != null) {
            getProducer().close();
        }
        if (getWaiterService() != null) {
            getWaiterService().shutdown();
        }
    }

    @Override // com.sitewhere.spi.microservice.kafka.IMicroserviceKafkaProducer
    public Class<?> getKeySerializer() {
        return StringSerializer.class;
    }

    @Override // com.sitewhere.spi.microservice.kafka.IMicroserviceKafkaProducer
    public Class<?> getValueSerializer() {
        return ByteArraySerializer.class;
    }

    @Override // com.sitewhere.spi.microservice.kafka.IMicroserviceKafkaProducer
    public Future<RecordMetadata> send(K k, P p) throws SiteWhereException {
        while (true) {
            ProducerRecord producerRecord = new ProducerRecord(getTargetTopicName(), k, p);
            try {
                if (getKafkaAvailable().getCount() != 0) {
                    getLogger().info("Producer waiting on Kafka to become available...");
                    getKafkaAvailable().await();
                }
                if (getProducer() == null) {
                    this.producer = new KafkaProducer<>(buildConfiguration());
                }
                return getProducer().send(producerRecord);
            } catch (IllegalStateException e) {
                throw new SiteWhereException("Producer unable to send record.", e);
            } catch (InterruptedException e2) {
                throw new SiteWhereException("Producer interrupted while waiting for Kafka.", e2);
            } catch (RetriableException e3) {
                try {
                    getLogger().info(String.format("Got retriable exception [%s] while sending Kafka payload. Waiting to retry.", e3.getMessage()));
                    Thread.sleep(5000L);
                } catch (InterruptedException e4) {
                    getLogger().info("Interrupted while waiting to send Kafka payload.");
                }
            } catch (Throwable th) {
                throw new SiteWhereException("Unhandled exception in producer while sending record.", th);
            }
        }
    }

    protected Properties buildConfiguration() throws SiteWhereException {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", KafkaUtils.getBootstrapServers(getMicroservice()));
        properties.put("acks", getAckPolicy().getConfig());
        properties.put("key.serializer", getKeySerializer().getName());
        properties.put("value.serializer", getValueSerializer().getName());
        return properties;
    }

    public AckPolicy getAckPolicy() {
        return this.ackPolicy;
    }

    public void setAckPolicy(AckPolicy ackPolicy) {
        this.ackPolicy = ackPolicy;
    }

    protected KafkaProducer<K, P> getProducer() {
        return this.producer;
    }

    protected CountDownLatch getKafkaAvailable() {
        return this.kafkaAvailable;
    }

    protected ExecutorService getWaiterService() {
        return this.waiterService;
    }
}
