package com.acxiom.pipeline.drivers;

import com.acxiom.pipeline.PipelineDependencyExecutor$;
import com.acxiom.pipeline.PipelineExecution;
import com.acxiom.pipeline.utils.DriverUtils$;
import com.acxiom.pipeline.utils.ReflectionUtils$;
import com.acxiom.pipeline.utils.StreamingUtils$;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.log4j.Logger;
import org.apache.spark.rdd.RDD;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.Row$;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.StringType$;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructField$;
import org.apache.spark.sql.types.StructType$;
import org.apache.spark.streaming.StreamingContext;
import org.apache.spark.streaming.dstream.InputDStream;
import org.apache.spark.streaming.kafka010.CanCommitOffsets;
import org.apache.spark.streaming.kafka010.ConsumerStrategies$;
import org.apache.spark.streaming.kafka010.HasOffsetRanges;
import org.apache.spark.streaming.kafka010.KafkaUtils$;
import org.apache.spark.streaming.kafka010.LocationStrategies$;
import org.apache.spark.streaming.kafka010.OffsetRange;
import scala.Predef$;
import scala.Predef$ArrowAssoc$;
import scala.Some;
import scala.Tuple2;
import scala.collection.immutable.$colon;
import scala.collection.immutable.List;
import scala.collection.immutable.Map;
import scala.collection.immutable.Nil$;
import scala.collection.mutable.ArrayOps;
import scala.reflect.ClassTag$;
import scala.runtime.BoxedUnit;

/* compiled from: KafkaPipelineDriver.scala */
/* loaded from: input_file:com/acxiom/pipeline/drivers/KafkaPipelineDriver$.class */
public final class KafkaPipelineDriver$ {
    public static KafkaPipelineDriver$ MODULE$;
    private final Logger logger;

    static {
        new KafkaPipelineDriver$();
    }

    private Logger logger() {
        return this.logger;
    }

    public void main(String[] strArr) {
        Map<String, Object> extractParameters = DriverUtils$.MODULE$.extractParameters(strArr, new Some(new $colon.colon("driverSetupClass", new $colon.colon("topics", new $colon.colon("kafkaNodes", Nil$.MODULE$)))));
        String str = (String) extractParameters.apply("driverSetupClass");
        DriverSetup driverSetup = (DriverSetup) ReflectionUtils$.MODULE$.loadClass(str, new Some(Predef$.MODULE$.Map().apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("parameters"), extractParameters)}))), ReflectionUtils$.MODULE$.loadClass$default$3());
        String[] split = ((String) extractParameters.apply("topics")).split(",");
        logger().info(new StringBuilder(43).append("Listening for Kafka messages using topics: ").append(new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps(split)).mkString(",")).toString());
        if (driverSetup.executionPlan().isEmpty()) {
            throw new IllegalStateException(new StringBuilder(75).append("Unable to obtain valid execution plan. Please check the DriverSetup class: ").append(str).toString());
        }
        List list = (List) driverSetup.executionPlan().get();
        SparkSession sparkSession = (SparkSession) ((PipelineExecution) list.head()).pipelineContext().sparkSession().get();
        StreamingContext createStreamingContext = StreamingUtils$.MODULE$.createStreamingContext(sparkSession.sparkContext(), new Some((String) extractParameters.getOrElse("duration-type", () -> {
            return "seconds";
        })), new Some((String) extractParameters.getOrElse("duration", () -> {
            return "10";
        })));
        InputDStream createDirectStream = KafkaUtils$.MODULE$.createDirectStream(createStreamingContext, LocationStrategies$.MODULE$.PreferConsistent(), ConsumerStrategies$.MODULE$.Subscribe(Predef$.MODULE$.wrapRefArray(split), Predef$.MODULE$.Map().apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("bootstrap.servers"), (String) extractParameters.apply("kafkaNodes")), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("key.deserializer"), StringDeserializer.class), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("value.deserializer"), StringDeserializer.class), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("group.id"), (String) extractParameters.getOrElse("groupId", () -> {
            return "default_stream_listener";
        })), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("auto.offset.reset"), "earliest"), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("enable.auto.commit"), Predef$.MODULE$.boolean2Boolean(false))}))));
        createDirectStream.foreachRDD(rdd -> {
            $anonfun$main$4(sparkSession, driverSetup, list, createDirectStream, rdd);
            return BoxedUnit.UNIT;
        });
        createStreamingContext.start();
        StreamingUtils$.MODULE$.setTerminationState(createStreamingContext, extractParameters);
        logger().info("Shutting down Kafka Pipeline Driver");
    }

    public static final /* synthetic */ void $anonfun$main$4(SparkSession sparkSession, DriverSetup driverSetup, List list, InputDStream inputDStream, RDD rdd) {
        if (rdd.isEmpty()) {
            return;
        }
        MODULE$.logger().debug("RDD received");
        OffsetRange[] offsetRanges = ((HasOffsetRanges) rdd).offsetRanges();
        PipelineDependencyExecutor$.MODULE$.executePlan(DriverUtils$.MODULE$.addInitialDataFrameToExecutionPlan(driverSetup.refreshExecutionPlan(list), sparkSession.createDataFrame(rdd.map(consumerRecord -> {
            return Row$.MODULE$.apply(Predef$.MODULE$.genericWrapArray(new Object[]{consumerRecord.key(), consumerRecord.value(), consumerRecord.topic()}));
        }, ClassTag$.MODULE$.apply(Row.class)), StructType$.MODULE$.apply(new $colon.colon(new StructField("key", StringType$.MODULE$, StructField$.MODULE$.apply$default$3(), StructField$.MODULE$.apply$default$4()), new $colon.colon(new StructField("value", StringType$.MODULE$, StructField$.MODULE$.apply$default$3(), StructField$.MODULE$.apply$default$4()), new $colon.colon(new StructField("topic", StringType$.MODULE$, StructField$.MODULE$.apply$default$3(), StructField$.MODULE$.apply$default$4()), Nil$.MODULE$))))).toDF()));
        ((CanCommitOffsets) inputDStream).commitAsync(offsetRanges);
        MODULE$.logger().debug(new StringBuilder(25).append("Committing Kafka offsets ").append(new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps(offsetRanges)).mkString(",")).toString());
    }

    private KafkaPipelineDriver$() {
        MODULE$ = this;
        this.logger = Logger.getLogger(getClass());
    }
}
