package com.sitewhere.microservice.kafka;

import com.sitewhere.microservice.lifecycle.TenantEngineLifecycleComponent;
import com.sitewhere.spi.SiteWhereException;
import com.sitewhere.spi.microservice.kafka.IKafkaStreamPipeline;
import com.sitewhere.spi.microservice.lifecycle.ILifecycleProgressMonitor;
import java.util.Properties;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.errors.LogAndContinueExceptionHandler;

/* loaded from: input_file:com/sitewhere/microservice/kafka/KafkaStreamPipeline.class */
public abstract class KafkaStreamPipeline extends TenantEngineLifecycleComponent implements IKafkaStreamPipeline {
    private KafkaStreams pipeline;

    public abstract String getPipelineName();

    public Class<?> getDefaultKeySerdeClass() {
        return Serdes.String().getClass();
    }

    public Class<?> getDefaultValueSerdeClass() {
        return Serdes.String().getClass();
    }

    @Override // com.sitewhere.microservice.lifecycle.LifecycleComponent, com.sitewhere.spi.microservice.lifecycle.ILifecycleComponent
    public void initialize(ILifecycleProgressMonitor iLifecycleProgressMonitor) throws SiteWhereException {
        new KafkaMultiTopicWaiter(this, getSourceTopicNames()).verify();
        Properties properties = new Properties();
        properties.put("application.id", String.format("%s-%s-%s-%s-%s", getMicroservice().getInstanceSettings().getProductId(), getMicroservice().getInstanceSettings().getKubernetesNamespace(), getTenantEngine().getTenantResource().getMetadata().getName(), getMicroservice().getIdentifier().getPath(), getPipelineName()));
        properties.put("bootstrap.servers", KafkaUtils.getBootstrapServers(getMicroservice()));
        properties.put("default.key.serde", getDefaultKeySerdeClass());
        properties.put("default.value.serde", getDefaultValueSerdeClass());
        properties.put("default.deserialization.exception.handler", LogAndContinueExceptionHandler.class);
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        buildStreams(streamsBuilder);
        this.pipeline = new KafkaStreams(streamsBuilder.build(), properties);
        getPipeline().setUncaughtExceptionHandler((thread, th) -> {
            getLogger().warn("Unhandled exception in Kafka Streams processing.", th);
        });
        Runtime runtime = Runtime.getRuntime();
        KafkaStreams pipeline = getPipeline();
        pipeline.getClass();
        runtime.addShutdownHook(new Thread(pipeline::close));
    }

    @Override // com.sitewhere.microservice.lifecycle.LifecycleComponent, com.sitewhere.spi.microservice.lifecycle.ILifecycleComponent
    public void start(ILifecycleProgressMonitor iLifecycleProgressMonitor) throws SiteWhereException {
        if (getPipeline() != null && getPipeline().state() == KafkaStreams.State.RUNNING) {
            getPipeline().close();
        }
        getPipeline().start();
    }

    @Override // com.sitewhere.microservice.lifecycle.LifecycleComponent, com.sitewhere.spi.microservice.lifecycle.ILifecycleComponent
    public void stop(ILifecycleProgressMonitor iLifecycleProgressMonitor) throws SiteWhereException {
        if (getPipeline() != null) {
            getPipeline().close();
        }
    }

    protected KafkaStreams getPipeline() {
        return this.pipeline;
    }
}
