package de.viadee.ki.sparkimporter.processing.steps.dataprocessing;

import de.viadee.ki.sparkimporter.processing.interfaces.PreprocessingStepInterface;
import de.viadee.ki.sparkimporter.util.SparkBroadcastHelper;
import de.viadee.ki.sparkimporter.util.SparkImporterUtils;
import de.viadee.ki.sparkimporter.util.SparkImporterVariables;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.functions;

/* loaded from: input_file:de/viadee/ki/sparkimporter/processing/steps/dataprocessing/AddVariableColumnsStep.class */
public class AddVariableColumnsStep implements PreprocessingStepInterface {
    @Override // de.viadee.ki.sparkimporter.processing.interfaces.PreprocessingStepInterface
    public Dataset<Row> runPreprocessingStep(Dataset<Row> dataset, boolean z, String str, Map<String, Object> map) {
        return doAddVariableColumns(doVariableUpdatesAggregation(dataset, z, str), z, str);
    }

    private Dataset<Row> doVariableUpdatesAggregation(Dataset<Row> dataset, boolean z, String str) {
        List asList = Arrays.asList(SparkImporterVariables.VAR_START_TIME, SparkImporterVariables.VAR_END_TIME);
        HashMap hashMap = new HashMap();
        for (String str2 : dataset.columns()) {
            if (str2.endsWith("_rev")) {
                hashMap.put(str2, "max");
            } else if (asList.contains(str2)) {
                hashMap.put(str2, "first");
            } else {
                hashMap.put(str2, "AllButEmptyString");
            }
        }
        Dataset drop = (str.equals(SparkImporterVariables.DATA_LEVEL_PROCESS) ? Arrays.asList(dataset.columns()).contains(SparkImporterVariables.VAR_TIMESTAMP) ? dataset.filter(functions.isnull(dataset.col(SparkImporterVariables.VAR_STATE))).orderBy(new Column[]{functions.desc(SparkImporterVariables.VAR_TIMESTAMP)}).groupBy(SparkImporterVariables.VAR_PROCESS_INSTANCE_ID, new String[]{SparkImporterVariables.VAR_PROCESS_INSTANCE_VARIABLE_NAME}).agg(hashMap) : dataset.filter(functions.isnull(dataset.col(SparkImporterVariables.VAR_STATE))).groupBy(SparkImporterVariables.VAR_PROCESS_INSTANCE_ID, new String[]{SparkImporterVariables.VAR_PROCESS_INSTANCE_VARIABLE_NAME}).agg(hashMap) : dataset.filter(functions.isnull(dataset.col(SparkImporterVariables.VAR_STATE))).groupBy(SparkImporterVariables.VAR_ACT_INST_ID, new String[]{SparkImporterVariables.VAR_PROCESS_INSTANCE_VARIABLE_NAME}).agg(hashMap)).drop(SparkImporterVariables.VAR_PROCESS_INSTANCE_ID).drop(SparkImporterVariables.VAR_PROCESS_INSTANCE_VARIABLE_NAME);
        if (str.equals(SparkImporterVariables.DATA_LEVEL_ACTIVITY)) {
            drop = drop.drop(SparkImporterVariables.VAR_ACT_INST_ID);
        }
        Pattern compile = Pattern.compile("(first|max|allbutemptystring)\\((.+)\\)");
        for (String str3 : drop.columns()) {
            Matcher matcher = compile.matcher(str3);
            if (matcher.find()) {
                drop = drop.withColumnRenamed(str3, matcher.group(2));
            }
        }
        Dataset<Row> union = str.equals(SparkImporterVariables.DATA_LEVEL_PROCESS) ? dataset.select(SparkImporterVariables.VAR_PROCESS_INSTANCE_ID, new String[]{SparkImporterVariables.VAR_STATE, SparkImporterVariables.VAR_PROCESS_INSTANCE_VARIABLE_NAME, SparkImporterVariables.VAR_PROCESS_INSTANCE_VARIABLE_TYPE, SparkImporterVariables.VAR_PROCESS_INSTANCE_VARIABLE_REVISION, SparkImporterVariables.VAR_LONG, SparkImporterVariables.VAR_DOUBLE, SparkImporterVariables.VAR_TEXT, SparkImporterVariables.VAR_TEXT2}).filter(functions.not(functions.isnull(dataset.col(SparkImporterVariables.VAR_STATE)))).union(drop.select(SparkImporterVariables.VAR_PROCESS_INSTANCE_ID, new String[]{SparkImporterVariables.VAR_STATE, SparkImporterVariables.VAR_PROCESS_INSTANCE_VARIABLE_NAME, SparkImporterVariables.VAR_PROCESS_INSTANCE_VARIABLE_TYPE, SparkImporterVariables.VAR_PROCESS_INSTANCE_VARIABLE_REVISION, SparkImporterVariables.VAR_LONG, SparkImporterVariables.VAR_DOUBLE, SparkImporterVariables.VAR_TEXT, SparkImporterVariables.VAR_TEXT2})) : dataset.select(SparkImporterVariables.VAR_PROCESS_INSTANCE_ID, new String[]{SparkImporterVariables.VAR_STATE, SparkImporterVariables.VAR_ACT_INST_ID, SparkImporterVariables.VAR_START_TIME, SparkImporterVariables.VAR_END_TIME, SparkImporterVariables.VAR_DURATION, SparkImporterVariables.VAR_PROCESS_INSTANCE_VARIABLE_NAME, SparkImporterVariables.VAR_PROCESS_INSTANCE_VARIABLE_TYPE, SparkImporterVariables.VAR_PROCESS_INSTANCE_VARIABLE_REVISION, SparkImporterVariables.VAR_LONG, SparkImporterVariables.VAR_DOUBLE, SparkImporterVariables.VAR_TEXT, SparkImporterVariables.VAR_TEXT2}).filter(functions.not(functions.isnull(dataset.col(SparkImporterVariables.VAR_STATE)))).union(drop.select(SparkImporterVariables.VAR_PROCESS_INSTANCE_ID, new String[]{SparkImporterVariables.VAR_STATE, SparkImporterVariables.VAR_ACT_INST_ID, SparkImporterVariables.VAR_START_TIME, SparkImporterVariables.VAR_END_TIME, SparkImporterVariables.VAR_DURATION, SparkImporterVariables.VAR_PROCESS_INSTANCE_VARIABLE_NAME, SparkImporterVariables.VAR_PROCESS_INSTANCE_VARIABLE_TYPE, SparkImporterVariables.VAR_PROCESS_INSTANCE_VARIABLE_REVISION, SparkImporterVariables.VAR_LONG, SparkImporterVariables.VAR_DOUBLE, SparkImporterVariables.VAR_TEXT, SparkImporterVariables.VAR_TEXT2})).orderBy(SparkImporterVariables.VAR_ACT_INST_ID, new String[]{SparkImporterVariables.VAR_START_TIME});
        if (z) {
            SparkImporterUtils.getInstance().writeDatasetToCSV(union, "agg_variable_updates");
        }
        return union;
    }

    private Dataset<Row> doAddVariableColumns(Dataset<Row> dataset, boolean z, String str) {
        for (String str2 : ((Map) SparkBroadcastHelper.getInstance().getBroadcastVariable(SparkBroadcastHelper.BROADCAST_VARIABLE.PROCESS_VARIABLES_ESCALATED)).keySet()) {
            dataset = dataset.withColumn(str2, functions.when(dataset.col(SparkImporterVariables.VAR_PROCESS_INSTANCE_VARIABLE_NAME).equalTo(str2), functions.when(dataset.col(SparkImporterVariables.VAR_PROCESS_INSTANCE_VARIABLE_TYPE).equalTo("string"), dataset.col(SparkImporterVariables.VAR_TEXT)).when(dataset.col(SparkImporterVariables.VAR_PROCESS_INSTANCE_VARIABLE_TYPE).equalTo("null"), dataset.col(SparkImporterVariables.VAR_TEXT)).when(dataset.col(SparkImporterVariables.VAR_PROCESS_INSTANCE_VARIABLE_TYPE).equalTo("boolean"), dataset.col(SparkImporterVariables.VAR_LONG)).when(dataset.col(SparkImporterVariables.VAR_PROCESS_INSTANCE_VARIABLE_TYPE).equalTo("integer"), dataset.col(SparkImporterVariables.VAR_LONG)).when(dataset.col(SparkImporterVariables.VAR_PROCESS_INSTANCE_VARIABLE_TYPE).equalTo("long"), dataset.col(SparkImporterVariables.VAR_LONG)).when(dataset.col(SparkImporterVariables.VAR_PROCESS_INSTANCE_VARIABLE_TYPE).equalTo("double"), dataset.col(SparkImporterVariables.VAR_DOUBLE)).when(dataset.col(SparkImporterVariables.VAR_PROCESS_INSTANCE_VARIABLE_TYPE).equalTo("date"), dataset.col(SparkImporterVariables.VAR_LONG)).otherwise(dataset.col(SparkImporterVariables.VAR_TEXT2))).otherwise((Object) null));
            if (str.equals(SparkImporterVariables.DATA_LEVEL_PROCESS) && SparkImporterVariables.isRevCountEnabled()) {
                dataset = dataset.withColumn(str2 + "_rev", functions.when(dataset.col(SparkImporterVariables.VAR_PROCESS_INSTANCE_VARIABLE_NAME).equalTo(str2), dataset.col(SparkImporterVariables.VAR_PROCESS_INSTANCE_VARIABLE_REVISION)).otherwise("0"));
            }
        }
        Dataset<Row> drop = dataset.drop(new String[]{SparkImporterVariables.VAR_PROCESS_INSTANCE_VARIABLE_TYPE, SparkImporterVariables.VAR_PROCESS_INSTANCE_VARIABLE_REVISION, SparkImporterVariables.VAR_DOUBLE, SparkImporterVariables.VAR_LONG, SparkImporterVariables.VAR_TEXT, SparkImporterVariables.VAR_TEXT2});
        if (!SparkImporterVariables.isDevProcessStateColumnWorkaroundEnabled()) {
            drop = drop.drop(SparkImporterVariables.VAR_PROCESS_INSTANCE_VARIABLE_NAME);
        }
        if (z) {
            SparkImporterUtils.getInstance().writeDatasetToCSV(drop, "add_var_columns");
        }
        return drop;
    }
}
