package com.sitewhere.microservice.kafka;

import com.sitewhere.microservice.lifecycle.TenantEngineLifecycleComponent;
import com.sitewhere.spi.SiteWhereException;
import com.sitewhere.spi.microservice.kafka.IKafkaStreamPipeline;
import com.sitewhere.spi.microservice.lifecycle.ILifecycleProgressMonitor;
import java.util.Properties;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;

/* loaded from: input_file:com/sitewhere/microservice/kafka/KafkaStreamPipeline.class */
public abstract class KafkaStreamPipeline extends TenantEngineLifecycleComponent implements IKafkaStreamPipeline {
    private KafkaStreams pipeline;

    @Override // com.sitewhere.microservice.lifecycle.LifecycleComponent, com.sitewhere.spi.microservice.lifecycle.ILifecycleComponent
    public void initialize(ILifecycleProgressMonitor iLifecycleProgressMonitor) throws SiteWhereException {
        Properties properties = new Properties();
        properties.put("application.id", String.format("%s-%s", getMicroservice().getInstanceSettings().getKubernetesName(), getMicroservice().getIdentifier().getShortName()));
        properties.put("bootstrap.servers", getMicroservice().getInstanceSettings().getKafkaBootstrapServers());
        properties.put("default.key.serde", Serdes.String().getClass());
        properties.put("default.value.serde", Serdes.String().getClass());
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        buildStreams(streamsBuilder);
        this.pipeline = new KafkaStreams(streamsBuilder.build(), properties);
    }

    @Override // com.sitewhere.microservice.lifecycle.LifecycleComponent, com.sitewhere.spi.microservice.lifecycle.ILifecycleComponent
    public void start(ILifecycleProgressMonitor iLifecycleProgressMonitor) throws SiteWhereException {
        if (getPipeline() != null && getPipeline().state() == KafkaStreams.State.RUNNING) {
            getPipeline().close();
        }
        getPipeline().start();
    }

    @Override // com.sitewhere.microservice.lifecycle.LifecycleComponent, com.sitewhere.spi.microservice.lifecycle.ILifecycleComponent
    public void stop(ILifecycleProgressMonitor iLifecycleProgressMonitor) throws SiteWhereException {
        if (getPipeline() != null) {
            getPipeline().close();
        }
    }

    protected KafkaStreams getPipeline() {
        return this.pipeline;
    }
}
