package de.viadee.ki.sparkimporter.runner;

import com.beust.jcommander.JCommander;
import com.beust.jcommander.ParameterException;
import de.viadee.ki.sparkimporter.processing.PreprocessingRunner;
import de.viadee.ki.sparkimporter.processing.steps.PipelineStep;
import de.viadee.ki.sparkimporter.processing.steps.importing.ColumnsPreparationStep;
import de.viadee.ki.sparkimporter.processing.steps.importing.InitialCleanupStep;
import de.viadee.ki.sparkimporter.processing.steps.output.WriteToDataSinkStep;
import de.viadee.ki.sparkimporter.util.SparkImporterKafkaImportArguments;
import de.viadee.ki.sparkimporter.util.SparkImporterLogger;
import de.viadee.ki.sparkimporter.util.SparkImporterVariables;
import java.io.File;
import java.lang.invoke.SerializedLambda;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.stream.Collectors;
import org.apache.commons.io.FileUtils;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka010.ConsumerStrategies;
import org.apache.spark.streaming.kafka010.KafkaUtils;
import org.apache.spark.streaming.kafka010.LocationStrategies;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:de/viadee/ki/sparkimporter/runner/KafkaImportRunner.class */
public class KafkaImportRunner extends SparkRunner {
    private static final Logger LOG = LoggerFactory.getLogger(KafkaImportRunner.class);
    public static SparkImporterKafkaImportArguments ARGS;
    private static final String TOPIC_PROCESS_INSTANCE = "processInstance";
    private static final String TOPIC_VARIABLE_UPDATE = "variableUpdate";
    private static final String TOPIC_ACTIVITY_INSTANCE = "activityInstance";
    private CountDownLatch countDownLatch;
    private final Map<String, Object> kafkaConsumerConfigPI = new HashMap();
    private final Map<String, Object> kafkaConsumerConfigVU = new HashMap();
    private final Map<String, Object> kafkaConsumerConfigAI = new HashMap();
    private JavaRDD<String> masterRdd = null;
    private Dataset<Row> masterDataset = null;
    private List<String> receivedQueues = new ArrayList();
    private List<String> emptyQueues = new ArrayList();
    private int EXPECTED_QUEUES_TO_BE_EMPTIED_IN_BATCH_MODE = 2;

    @Override // de.viadee.ki.sparkimporter.runner.SparkRunner
    protected void initialize(String[] strArr) {
        ARGS = SparkImporterKafkaImportArguments.getInstance();
        JCommander build = JCommander.newBuilder().addObject(SparkImporterKafkaImportArguments.getInstance()).build();
        try {
            build.parse(strArr);
        } catch (ParameterException e) {
            LOG.error("Parsing of parameters failed. Error message: " + e.getMessage());
            build.usage();
            System.exit(1);
        }
        this.EXPECTED_QUEUES_TO_BE_EMPTIED_IN_BATCH_MODE = ARGS.getDataLevel().equals(SparkImporterVariables.DATA_LEVEL_PROCESS) ? 2 : 3;
        SparkImporterVariables.setTargetFolder(ARGS.getFileDestination());
        SparkImporterVariables.setWorkingDirectory(ARGS.getWorkingDirectory());
        SparkImporterLogger.setLogDirectory(ARGS.getLogDirectory());
        SparkImporterVariables.setProcessFilterDefinitionId(ARGS.getProcessDefinitionFilterId());
        this.dataLevel = ARGS.getDataLevel();
        PreprocessingRunner.writeStepResultsIntoFile = ARGS.isWriteStepResultsToCSV();
        FileUtils.deleteQuietly(new File(ARGS.getFileDestination()));
        SparkImporterLogger.getInstance().writeInfo("Starting Kafka import " + (ARGS.isBatchMode() ? "in batch mode " : "") + "from: " + ARGS.getKafkaBroker());
    }

    private synchronized void processMasterRDD(JavaRDD<String> javaRDD, String str) {
        if (javaRDD.count() == 0) {
            if (ARGS.isBatchMode()) {
                SparkImporterLogger.getInstance().writeInfo("Kafka queue '" + str + "' returned zero entries.");
                if (!this.emptyQueues.contains(str)) {
                    this.emptyQueues.add(str);
                }
                if (this.emptyQueues.size() == this.EXPECTED_QUEUES_TO_BE_EMPTIED_IN_BATCH_MODE) {
                    SparkImporterLogger.getInstance().writeInfo("All Kafka queues (" + ((String) this.emptyQueues.stream().collect(Collectors.joining(","))) + ") returned zero entries once. Stopping as running in batch mode");
                    this.countDownLatch.countDown();
                    return;
                }
                return;
            }
            return;
        }
        if (!this.receivedQueues.contains(str)) {
            this.receivedQueues.add(str);
        }
        if (this.masterDataset != null) {
            this.masterDataset = this.sparkSession.createDataFrame(this.sparkSession.read().json(this.sparkSession.createDataset(javaRDD.rdd(), Encoders.STRING())).rdd(), this.masterDataset.schema());
        } else if (this.receivedQueues.size() == this.EXPECTED_QUEUES_TO_BE_EMPTIED_IN_BATCH_MODE) {
            this.masterRdd = this.masterRdd.union(javaRDD);
            this.masterDataset = this.sparkSession.read().json(this.sparkSession.createDataset(this.masterRdd.rdd(), Encoders.STRING()));
        } else if (this.masterRdd == null) {
            this.masterRdd = javaRDD;
        } else {
            this.masterRdd = this.masterRdd.union(javaRDD);
        }
    }

    @Override // de.viadee.ki.sparkimporter.runner.SparkRunner
    protected List<PipelineStep> buildDefaultPipeline() {
        ArrayList arrayList = new ArrayList();
        arrayList.add(new PipelineStep(new ColumnsPreparationStep(), ""));
        arrayList.add(new PipelineStep(new InitialCleanupStep(), "ColumnsPreparationStep"));
        arrayList.add(new PipelineStep(new WriteToDataSinkStep(), "InitialCleanupStep"));
        return arrayList;
    }

    @Override // de.viadee.ki.sparkimporter.runner.SparkRunner
    protected Dataset<Row> loadInitialDataset() {
        long currentTimeMillis = System.currentTimeMillis();
        this.masterRdd = this.sparkSession.emptyDataset(Encoders.STRING()).javaRDD();
        if (ARGS.isBatchMode()) {
            this.countDownLatch = new CountDownLatch(1);
        }
        this.kafkaConsumerConfigPI.put("bootstrap.servers", ARGS.getKafkaBroker());
        this.kafkaConsumerConfigPI.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        this.kafkaConsumerConfigPI.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        this.kafkaConsumerConfigPI.put(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString());
        this.kafkaConsumerConfigPI.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        this.kafkaConsumerConfigVU.put("bootstrap.servers", ARGS.getKafkaBroker());
        this.kafkaConsumerConfigVU.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        this.kafkaConsumerConfigVU.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        this.kafkaConsumerConfigVU.put(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString());
        this.kafkaConsumerConfigVU.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        JavaStreamingContext javaStreamingContext = new JavaStreamingContext(new JavaSparkContext(this.sparkSession.sparkContext()), Duration.apply(5000));
        KafkaUtils.createDirectStream(javaStreamingContext, LocationStrategies.PreferConsistent(), ConsumerStrategies.Subscribe(Arrays.asList(TOPIC_PROCESS_INSTANCE), this.kafkaConsumerConfigPI)).map(consumerRecord -> {
            return (String) consumerRecord.value();
        }).foreachRDD(javaRDD -> {
            processMasterRDD(javaRDD, TOPIC_PROCESS_INSTANCE);
        });
        KafkaUtils.createDirectStream(javaStreamingContext, LocationStrategies.PreferConsistent(), ConsumerStrategies.Subscribe(Arrays.asList(TOPIC_VARIABLE_UPDATE), this.kafkaConsumerConfigVU)).map(consumerRecord2 -> {
            return (String) consumerRecord2.value();
        }).foreachRDD(javaRDD2 -> {
            processMasterRDD(javaRDD2, TOPIC_VARIABLE_UPDATE);
        });
        if (ARGS.getDataLevel().equals(SparkImporterVariables.DATA_LEVEL_ACTIVITY)) {
            this.kafkaConsumerConfigAI.put("bootstrap.servers", ARGS.getKafkaBroker());
            this.kafkaConsumerConfigAI.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
            this.kafkaConsumerConfigAI.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
            this.kafkaConsumerConfigAI.put(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString());
            this.kafkaConsumerConfigAI.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
            KafkaUtils.createDirectStream(javaStreamingContext, LocationStrategies.PreferConsistent(), ConsumerStrategies.Subscribe(Arrays.asList(TOPIC_ACTIVITY_INSTANCE), this.kafkaConsumerConfigAI)).map(consumerRecord3 -> {
                return (String) consumerRecord3.value();
            }).foreachRDD(javaRDD3 -> {
                processMasterRDD(javaRDD3, TOPIC_ACTIVITY_INSTANCE);
            });
        }
        javaStreamingContext.start();
        if (ARGS.isBatchMode()) {
            try {
                this.countDownLatch.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            SparkImporterLogger.getInstance().writeInfo("Kafka import finished (took " + ((System.currentTimeMillis() - currentTimeMillis) / 1000) + " seconds in total)");
            javaStreamingContext.stop(false);
        } else {
            try {
                javaStreamingContext.awaitTermination();
                SparkImporterLogger.getInstance().writeInfo("Kafka import finished (took " + ((System.currentTimeMillis() - currentTimeMillis) / 1000) + " seconds in total)");
            } catch (InterruptedException e2) {
                e2.printStackTrace();
            }
        }
        return this.masterDataset;
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -63859514:
                if (implMethodName.equals("lambda$loadInitialDataset$928907e0$1")) {
                    z = 4;
                    break;
                }
                break;
            case -63859513:
                if (implMethodName.equals("lambda$loadInitialDataset$928907e0$2")) {
                    z = 3;
                    break;
                }
                break;
            case -63859512:
                if (implMethodName.equals("lambda$loadInitialDataset$928907e0$3")) {
                    z = 5;
                    break;
                }
                break;
            case 559940242:
                if (implMethodName.equals("lambda$loadInitialDataset$524250ac$1")) {
                    z = false;
                    break;
                }
                break;
            case 559940243:
                if (implMethodName.equals("lambda$loadInitialDataset$524250ac$2")) {
                    z = true;
                    break;
                }
                break;
            case 559940244:
                if (implMethodName.equals("lambda$loadInitialDataset$524250ac$3")) {
                    z = 2;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 7 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/spark/api/java/function/VoidFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("call") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)V") && serializedLambda.getImplClass().equals("de/viadee/ki/sparkimporter/runner/KafkaImportRunner") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/spark/api/java/JavaRDD;)V")) {
                    KafkaImportRunner kafkaImportRunner = (KafkaImportRunner) serializedLambda.getCapturedArg(0);
                    return javaRDD -> {
                        processMasterRDD(javaRDD, TOPIC_PROCESS_INSTANCE);
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 7 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/spark/api/java/function/VoidFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("call") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)V") && serializedLambda.getImplClass().equals("de/viadee/ki/sparkimporter/runner/KafkaImportRunner") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/spark/api/java/JavaRDD;)V")) {
                    KafkaImportRunner kafkaImportRunner2 = (KafkaImportRunner) serializedLambda.getCapturedArg(0);
                    return javaRDD2 -> {
                        processMasterRDD(javaRDD2, TOPIC_VARIABLE_UPDATE);
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 7 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/spark/api/java/function/VoidFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("call") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)V") && serializedLambda.getImplClass().equals("de/viadee/ki/sparkimporter/runner/KafkaImportRunner") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/spark/api/java/JavaRDD;)V")) {
                    KafkaImportRunner kafkaImportRunner3 = (KafkaImportRunner) serializedLambda.getCapturedArg(0);
                    return javaRDD3 -> {
                        processMasterRDD(javaRDD3, TOPIC_ACTIVITY_INSTANCE);
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/spark/api/java/function/Function") && serializedLambda.getFunctionalInterfaceMethodName().equals("call") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("de/viadee/ki/sparkimporter/runner/KafkaImportRunner") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/kafka/clients/consumer/ConsumerRecord;)Ljava/lang/String;")) {
                    return consumerRecord2 -> {
                        return (String) consumerRecord2.value();
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/spark/api/java/function/Function") && serializedLambda.getFunctionalInterfaceMethodName().equals("call") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("de/viadee/ki/sparkimporter/runner/KafkaImportRunner") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/kafka/clients/consumer/ConsumerRecord;)Ljava/lang/String;")) {
                    return consumerRecord -> {
                        return (String) consumerRecord.value();
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/spark/api/java/function/Function") && serializedLambda.getFunctionalInterfaceMethodName().equals("call") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("de/viadee/ki/sparkimporter/runner/KafkaImportRunner") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/kafka/clients/consumer/ConsumerRecord;)Ljava/lang/String;")) {
                    return consumerRecord3 -> {
                        return (String) consumerRecord3.value();
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
