package com.linkedin.feathr.offline.job;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.module.scala.DefaultScalaModule$;
import com.linkedin.feathr.common.FeatureValue;
import com.linkedin.feathr.common.TaggedFeatureName;
import com.linkedin.feathr.common.configObj.configbuilder.FeatureGenConfigBuilder;
import com.linkedin.feathr.offline.anchored.feature.FeatureAnchorWithSource;
import com.linkedin.feathr.offline.client.FeathrClient$;
import com.linkedin.feathr.offline.config.FeathrConfigLoader$;
import com.linkedin.feathr.offline.config.datasource.DataSourceConfigUtils$;
import com.linkedin.feathr.offline.config.datasource.DataSourceConfigs;
import com.linkedin.feathr.offline.source.accessor.DataPathHandler;
import com.linkedin.feathr.offline.source.accessor.DataSourceAccessor;
import com.linkedin.feathr.offline.transformation.AnchorToDataSourceMapper;
import com.linkedin.feathr.offline.util.CmdLineParser;
import com.linkedin.feathr.offline.util.FeathrUtils$;
import com.linkedin.feathr.offline.util.OptionParam;
import com.linkedin.feathr.offline.util.SparkFeaturizedDataset;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import com.typesafe.config.ConfigRenderOptions;
import java.util.Set;
import org.apache.avro.generic.GenericRecord;
import org.apache.commons.cli.Option;
import org.apache.log4j.Logger;
import org.apache.spark.SparkConf;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.SparkSession$;
import scala.MatchError;
import scala.Predef$;
import scala.Predef$ArrowAssoc$;
import scala.Some;
import scala.Tuple2;
import scala.Tuple4;
import scala.collection.IterableLike;
import scala.collection.JavaConverters$;
import scala.collection.TraversableLike;
import scala.collection.TraversableOnce;
import scala.collection.immutable.$colon;
import scala.collection.immutable.List;
import scala.collection.immutable.List$;
import scala.collection.immutable.Map;
import scala.collection.immutable.Map$;
import scala.collection.immutable.Nil$;
import scala.collection.immutable.StringOps;
import scala.collection.mutable.Buffer$;
import scala.collection.mutable.HashMap;
import scala.math.Ordering$String$;
import scala.runtime.BoxesRunTime;
import scala.util.Properties$;

/* compiled from: FeatureGenJob.scala */
/* loaded from: input_file:com/linkedin/feathr/offline/job/FeatureGenJob$.class */
public final class FeatureGenJob$ {
    public static FeatureGenJob$ MODULE$;
    private final Logger logger;

    static {
        new FeatureGenJob$();
    }

    public Logger logger() {
        return this.logger;
    }

    public Tuple4<String, FeatureDefinitionsInput, FeatureGenJobContext, DataSourceConfigs> parseInputArguments(String[] strArr) {
        CmdLineParser cmdLineParser = new CmdLineParser(strArr, Predef$.MODULE$.Map().apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("feathr-config"), new OptionParam("lf", "Path of the feathr local config file", "FCONF", FeatureValue.EMPTY_TERM)), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("feature-config"), new OptionParam("f", "Names of the feathr feature config files", "EFCONF", FeatureValue.EMPTY_TERM)), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("local-override-all"), new OptionParam("loa", "Local config overrides all other configs", "LOCAL_OVERRIDE", "true")), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("work-dir"), new OptionParam("wd", "work directory, used to store temporary results, etc.", "WORK_DIR", FeatureValue.EMPTY_TERM)), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("generation-config"), new OptionParam("gc", "Path of the feature generation config file", "JCONF", null)), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("params-override"), new OptionParam("ac", "parameter to override in feature generation config", "PARAM_OVERRIDE", "[]")), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("feature-conf-override"), new OptionParam("fco", "parameter to override in feature definition config", "FEATURE_CONF_OVERRIDE", "[]")), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("redis-config"), new OptionParam("ac", "Authentication config for Redis", "REDIS_CONFIG", FeatureValue.EMPTY_TERM)), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("s3-config"), new OptionParam("sc", "Authentication config for S3", "S3_CONFIG", FeatureValue.EMPTY_TERM)), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("adls-config"), new OptionParam("adlc", "Authentication config for ADLS (abfs)", "ADLS_CONFIG", FeatureValue.EMPTY_TERM)), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("blob-config"), new OptionParam("bc", "Authentication config for Azure Blob Storage (wasb)", "BLOB_CONFIG", FeatureValue.EMPTY_TERM)), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("sql-config"), new OptionParam("sqlc", "Authentication config for Azure SQL Database (jdbc)", "SQL_CONFIG", FeatureValue.EMPTY_TERM)), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("snowflake-config"), new OptionParam("sfc", "Authentication config for Snowflake Database (jdbc)", "SNOWFLAKE_CONFIG", FeatureValue.EMPTY_TERM)), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("monitoring-config"), new OptionParam("mc", "Feature monitoring related configs", "MONITORING_CONFIG", FeatureValue.EMPTY_TERM)), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("kafka-config"), new OptionParam("kc", "Authentication config for Kafka", "KAFKA_CONFIG", FeatureValue.EMPTY_TERM)), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("system-properties"), new OptionParam("sps", "Additional System Properties", "SYSTEM_PROPERTIES_CONFIG", FeatureValue.EMPTY_TERM))})), new $colon.colon(new Option("LOCALMODE", "local-mode", false, "Run in local mode"), Nil$.MODULE$));
        ((HashMap) new ObjectMapper().registerModule(DefaultScalaModule$.MODULE$).readValue((String) cmdLineParser.extractOptionalValue("system-properties").getOrElse(() -> {
            return "{}";
        }), HashMap.class)).foreach(tuple2 -> {
            return Properties$.MODULE$.setProp((String) tuple2._1(), (String) tuple2._2());
        });
        String extractRequiredValue = cmdLineParser.extractRequiredValue("generation-config");
        FeatureDefinitionsInput featureDefinitionsInput = new FeatureDefinitionsInput(cmdLineParser.extractOptionalValue("feathr-config"), cmdLineParser.extractOptionalValue("feature-config"), cmdLineParser.extractRequiredValue("local-override-all"));
        scala.Option<String> extractOptionalValue = cmdLineParser.extractOptionalValue("params-override");
        scala.Option map = cmdLineParser.extractOptionalValue("feature-conf-override").map(str -> {
            return MODULE$.convertToHoconConfig(str);
        });
        String extractRequiredValue2 = cmdLineParser.extractRequiredValue("work-dir");
        DataSourceConfigs configs = DataSourceConfigUtils$.MODULE$.getConfigs(cmdLineParser);
        FeatureGenJobContext featureGenJobContext = new FeatureGenJobContext(extractRequiredValue2, extractOptionalValue, map);
        Predef$.MODULE$.println("dataSourceConfigs: ");
        Predef$.MODULE$.println(configs);
        return new Tuple4<>(extractRequiredValue, featureDefinitionsInput, featureGenJobContext, configs);
    }

    public String convertToHoconConfig(String str) {
        return new StringOps(Predef$.MODULE$.augmentString(new StringOps(Predef$.MODULE$.augmentString(str)).stripPrefix("["))).stripSuffix("]");
    }

    public Map<TaggedFeatureName, SparkFeaturizedDataset> run(SparkSession sparkSession, String str, FeatureDefinitionsInput featureDefinitionsInput, FeatureGenJobContext featureGenJobContext) {
        String hdfsFileReader = FeatureJoinJob$.MODULE$.hdfsFileReader(sparkSession, str);
        Tuple2<scala.Option<String>, scala.Option<String>> overrideFeatureDefs = overrideFeatureDefs(featureDefinitionsInput.feathrFeatureDefPaths().map(str2 -> {
            return FeatureJoinJob$.MODULE$.hdfsFileReader(sparkSession, str2);
        }), featureDefinitionsInput.feathrLocalFeatureDefPath().map(str3 -> {
            return FeatureJoinJob$.MODULE$.hdfsFileReader(sparkSession, str3);
        }), featureGenJobContext);
        if (overrideFeatureDefs == null) {
            throw new MatchError(overrideFeatureDefs);
        }
        Tuple2 tuple2 = new Tuple2((scala.Option) overrideFeatureDefs._1(), (scala.Option) overrideFeatureDefs._2());
        return run(sparkSession, hdfsFileReader, (scala.Option) tuple2._1(), (scala.Option) tuple2._2(), featureGenJobContext, run$default$6());
    }

    public Tuple2<scala.Option<String>, scala.Option<String>> overrideFeatureDefs(scala.Option<String> option, scala.Option<String> option2, FeatureGenJobContext featureGenJobContext) {
        return new Tuple2<>((option.isDefined() && featureGenJobContext.featureConfOverride().isDefined()) ? new Some<>(FeathrConfigLoader$.MODULE$.resolveOverride((String) option.get(), (String) featureGenJobContext.featureConfOverride().get())) : option, (option2.isDefined() && featureGenJobContext.featureConfOverride().isDefined()) ? new Some<>(FeathrConfigLoader$.MODULE$.resolveOverride((String) option2.get(), (String) featureGenJobContext.featureConfOverride().get())) : option2);
    }

    public Map<TaggedFeatureName, SparkFeaturizedDataset> run(SparkSession sparkSession, String str, scala.Option<String> option, scala.Option<String> option2, FeatureGenJobContext featureGenJobContext, List<DataPathHandler> list) {
        logger().info(new StringBuilder(19).append("featureDefConfig : ").append(option).toString());
        logger().info(new StringBuilder(21).append("localFeatureConfig : ").append(option2).toString());
        return FeathrClient$.MODULE$.builder(sparkSession).addFeatureDef(option).addLocalOverrideDef(option2).addDataPathHandlers(list).build().generateFeatures(parseFeatureGenApplicationConfig(str, featureGenJobContext, list));
    }

    public List<DataPathHandler> run$default$6() {
        return Nil$.MODULE$;
    }

    public FeatureGenSpec parseFeatureGenApplicationConfig(String str, FeatureGenJobContext featureGenJobContext, List<DataPathHandler> list) {
        return new FeatureGenSpec(FeatureGenConfigBuilder.build(ConfigFactory.parseString(overrideFeatureGeneration(str, featureGenJobContext.paramsOverride()))), (List) list.map(dataPathHandler -> {
            return dataPathHandler.dataLoaderHandler();
        }, List$.MODULE$.canBuildFrom()));
    }

    public List<DataPathHandler> parseFeatureGenApplicationConfig$default$3() {
        return Nil$.MODULE$;
    }

    public String overrideFeatureGeneration(String str, scala.Option<String> option) {
        Config parseString = ConfigFactory.parseString(str);
        return ((Config) option.map(str2 -> {
            Config parseString2 = ConfigFactory.parseString(new StringBuilder(15).append("operational: {").append(new StringOps(Predef$.MODULE$.augmentString(new StringOps(Predef$.MODULE$.augmentString(str2)).stripPrefix("["))).stripSuffix("]")).append("}").toString());
            return parseString2.withFallback(ConfigFactory.parseString(new StringBuilder(21).append("operational.output:[").append(((TraversableOnce) ((TraversableLike) ((IterableLike) JavaConverters$.MODULE$.asScalaBufferConverter(parseString.getConfigList("operational.output")).asScala()).zipWithIndex(Buffer$.MODULE$.canBuildFrom())).map(tuple2 -> {
                if (tuple2 == null) {
                    throw new MatchError(tuple2);
                }
                Config config = (Config) tuple2._1();
                String sb = new StringBuilder(20).append("operational.output(").append(tuple2._2$mcI$sp()).append(")").toString();
                return (parseString2.hasPath(sb) ? parseString2.getConfig(sb).withFallback(config) : config).root().render(ConfigRenderOptions.concise());
            }, Buffer$.MODULE$.canBuildFrom())).mkString(",")).append("]").toString())).withFallback(parseString);
        }).getOrElse(() -> {
            return parseString;
        })).root().render();
    }

    public java.util.Map<String, Dataset<Row>> loadSourceDataframe(String[] strArr, Set<String> set) {
        logger().info(new StringBuilder(25).append("FeatureJoinJob args are: ").append(strArr).toString());
        logger().info("Feature join job: loadDataframe");
        logger().info(set);
        FeathrGenPreparationInfo prepareSparkSession = prepareSparkSession(strArr);
        SparkSession sparkSession = prepareSparkSession.sparkSession();
        FeatureDefinitionsInput featureDefs = prepareSparkSession.featureDefs();
        FeatureGenJobContext jobContext = prepareSparkSession.jobContext();
        scala.Option<String> map = featureDefs.feathrFeatureDefPaths().map(str -> {
            return FeatureJoinJob$.MODULE$.hdfsFileReader(sparkSession, str);
        });
        Tuple2<scala.Option<String>, scala.Option<String>> overrideFeatureDefs = overrideFeatureDefs(map, featureDefs.feathrLocalFeatureDefPath().map(str2 -> {
            return FeatureJoinJob$.MODULE$.hdfsFileReader(sparkSession, str2);
        }), jobContext);
        if (overrideFeatureDefs == null) {
            throw new MatchError(overrideFeatureDefs);
        }
        Tuple2 tuple2 = new Tuple2((scala.Option) overrideFeatureDefs._1(), (scala.Option) overrideFeatureDefs._2());
        Map<String, FeatureAnchorWithSource> allAnchoredFeatures = FeathrClient$.MODULE$.builder(sparkSession).addFeatureDef(map).addLocalOverrideDef((scala.Option<String>) tuple2._2()).build().allAnchoredFeatures();
        return (java.util.Map) JavaConverters$.MODULE$.mapAsJavaMapConverter((Map) ((TraversableLike) new AnchorToDataSourceMapper(Nil$.MODULE$).getBasicAnchorDFMapForJoin(sparkSession, allAnchoredFeatures.values().toSeq(), new StringOps(Predef$.MODULE$.augmentString(FeathrUtils$.MODULE$.getFeathrJobParam(sparkSession, FeathrUtils$.MODULE$.FAIL_ON_MISSING_PARTITION()))).toBoolean()).filter(tuple22 -> {
            return BoxesRunTime.boxToBoolean($anonfun$loadSourceDataframe$3(set, tuple22));
        })).map(tuple23 -> {
            return new Tuple2(((TraversableOnce) ((FeatureAnchorWithSource) tuple23._1()).featureAnchor().features().toSeq().sorted(Ordering$String$.MODULE$)).mkString(","), ((DataSourceAccessor) tuple23._2()).get());
        }, Map$.MODULE$.canBuildFrom())).asJava();
    }

    public FeathrGenPreparationInfo prepareSparkSession(String[] strArr) {
        Tuple4<String, FeatureDefinitionsInput, FeatureGenJobContext, DataSourceConfigs> parseInputArguments = parseInputArguments(strArr);
        if (parseInputArguments == null) {
            throw new MatchError(parseInputArguments);
        }
        Tuple4 tuple4 = new Tuple4((String) parseInputArguments._1(), (FeatureDefinitionsInput) parseInputArguments._2(), (FeatureGenJobContext) parseInputArguments._3(), (DataSourceConfigs) parseInputArguments._4());
        String str = (String) tuple4._1();
        FeatureDefinitionsInput featureDefinitionsInput = (FeatureDefinitionsInput) tuple4._2();
        FeatureGenJobContext featureGenJobContext = (FeatureGenJobContext) tuple4._3();
        DataSourceConfigs dataSourceConfigs = (DataSourceConfigs) tuple4._4();
        SparkConf registerKryoClasses = new SparkConf().registerKryoClasses(new Class[]{GenericRecord.class});
        DataSourceConfigUtils$.MODULE$.setupSparkConf(registerKryoClasses, dataSourceConfigs);
        SparkSession orCreate = SparkSession$.MODULE$.builder().config(registerKryoClasses).appName(getClass().getName()).enableHiveSupport().getOrCreate();
        DataSourceConfigUtils$.MODULE$.setupHadoopConf(orCreate, dataSourceConfigs);
        return new FeathrGenPreparationInfo(orCreate, str, featureDefinitionsInput, featureGenJobContext);
    }

    public Map<TaggedFeatureName, SparkFeaturizedDataset> process(String[] strArr) {
        FeathrGenPreparationInfo prepareSparkSession = prepareSparkSession(strArr);
        return run(prepareSparkSession.sparkSession(), prepareSparkSession.applicationConfigPath(), prepareSparkSession.featureDefs(), prepareSparkSession.jobContext());
    }

    public void mainWithPreprocessedDataFrame(String[] strArr, java.util.Map<String, Dataset<Row>> map) {
        PreprocessedDataFrameManager$.MODULE$.preprocessedDfMap_$eq(((TraversableOnce) JavaConverters$.MODULE$.mapAsScalaMapConverter(map).asScala()).toMap(Predef$.MODULE$.$conforms()));
        main(strArr);
    }

    public void main(String[] strArr) {
        process(strArr);
    }

    public static final /* synthetic */ boolean $anonfun$loadSourceDataframe$3(Set set, Tuple2 tuple2) {
        return set.contains(((TraversableOnce) ((FeatureAnchorWithSource) tuple2._1()).featureAnchor().features().toSeq().sorted(Ordering$String$.MODULE$)).mkString(","));
    }

    private FeatureGenJob$() {
        MODULE$ = this;
        this.logger = Logger.getLogger(getClass());
    }
}
