package com.linkedin.feathr.offline.config.location;

import com.linkedin.feathr.common.Header;
import com.linkedin.feathr.common.exception.FeathrException;
import com.linkedin.feathr.offline.generation.FeatureGenUtils$;
import com.linkedin.feathr.offline.join.DataFrameKeyCombiner;
import com.linkedin.feathr.offline.join.DataFrameKeyCombiner$;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.functions$;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Predef$;
import scala.Some;
import scala.Tuple2;
import scala.collection.mutable.ArrayOps;
import scala.runtime.BoxedUnit;

/* compiled from: GenericLocation.scala */
/* loaded from: input_file:com/linkedin/feathr/offline/config/location/GenericLocationAdHocPatches$.class */
public final class GenericLocationAdHocPatches$ {
    public static GenericLocationAdHocPatches$ MODULE$;

    static {
        new GenericLocationAdHocPatches$();
    }

    public Dataset<Row> readDf(SparkSession sparkSession, GenericLocation genericLocation) {
        genericLocation.conf().foreach(tuple2 -> {
            $anonfun$readDf$1(sparkSession, tuple2);
            return BoxedUnit.UNIT;
        });
        return "org.elasticsearch.spark.sql".equals(genericLocation.format().toLowerCase()) ? sparkSession.read().format(genericLocation.format()).option("es.nodes.wan.only", "true").option("pushdown", true).options(genericLocation.options()).load() : sparkSession.read().format(genericLocation.format()).options(genericLocation.options()).load();
    }

    public void writeDf(SparkSession sparkSession, Dataset<Row> dataset, Option<Header> option, GenericLocation genericLocation) {
        Dataset withColumn;
        Dataset<Row> dataset2;
        Dataset<Row> withColumn2;
        genericLocation.conf().foreach(tuple2 -> {
            $anonfun$writeDf$1(sparkSession, tuple2);
            return BoxedUnit.UNIT;
        });
        String lowerCase = genericLocation.format().toLowerCase();
        if (!"cosmos.oltp".equals(lowerCase)) {
            if (!"org.elasticsearch.spark.sql".equals(lowerCase)) {
                if ("aerospike".equals(lowerCase)) {
                    (!new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps(dataset.columns())).contains("__key") ? dataset.withColumn("__key", functions$.MODULE$.monotonically_increasing_id().cast("string")) : dataset).write().format(genericLocation.format()).option("aerospike.updatebykey", "__key").options(genericLocation.options()).mode((String) genericLocation.mode().getOrElse(() -> {
                        return "append";
                    })).save();
                    BoxedUnit boxedUnit = BoxedUnit.UNIT;
                    return;
                } else {
                    dataset.write().format(genericLocation.format()).options(genericLocation.options()).mode((String) genericLocation.mode().getOrElse(() -> {
                        return "default";
                    })).save();
                    BoxedUnit boxedUnit2 = BoxedUnit.UNIT;
                    return;
                }
            }
            if (option instanceof Some) {
                Header header = (Header) ((Some) option).value();
                DataFrameKeyCombiner apply = DataFrameKeyCombiner$.MODULE$.apply();
                Tuple2<String, Dataset<Row>> combine = apply.combine(dataset, FeatureGenUtils$.MODULE$.getKeyColumnsFromHeader(header), apply.combine$default$3());
                if (combine == null) {
                    throw new MatchError(combine);
                }
                Tuple2 tuple22 = new Tuple2((String) combine._1(), (Dataset) combine._2());
                withColumn = ((Dataset) tuple22._2()).withColumnRenamed((String) tuple22._1(), "_id");
            } else {
                if (!None$.MODULE$.equals(option)) {
                    throw new MatchError(option);
                }
                withColumn = dataset.withColumn("_id", functions$.MODULE$.monotonically_increasing_id().cast("string"));
            }
            withColumn.write().format(genericLocation.format()).option("es.nodes.wan.only", "true").option("es.mapping.id", "_id").option("es.mapping.exclude", "_id").option("es.write.operation", "upsert").options(genericLocation.options()).mode((String) genericLocation.mode().getOrElse(() -> {
                return "overwrite";
            })).save();
            BoxedUnit boxedUnit3 = BoxedUnit.UNIT;
            return;
        }
        String str = (String) genericLocation.options().getOrElse("spark.cosmos.accountEndpoint", () -> {
            throw new FeathrException("Missing spark__cosmos__accountEndpoint");
        });
        String str2 = (String) genericLocation.options().getOrElse("spark.cosmos.accountKey", () -> {
            throw new FeathrException("Missing spark__cosmos__accountKey");
        });
        String str3 = (String) genericLocation.options().getOrElse("spark.cosmos.database", () -> {
            throw new FeathrException("Missing spark__cosmos__database");
        });
        String str4 = (String) genericLocation.options().getOrElse("spark.cosmos.container", () -> {
            throw new FeathrException("Missing spark__cosmos__container");
        });
        sparkSession.conf().set("spark.sql.catalog.cosmosCatalog", "com.azure.cosmos.spark.CosmosCatalog");
        sparkSession.conf().set("spark.sql.catalog.cosmosCatalog.spark.cosmos.accountEndpoint", str);
        sparkSession.conf().set("spark.sql.catalog.cosmosCatalog.spark.cosmos.accountKey", str2);
        sparkSession.sql(new StringBuilder(45).append("CREATE DATABASE IF NOT EXISTS cosmosCatalog.").append(str3).append(";").toString());
        sparkSession.sql(new StringBuilder(100).append("CREATE TABLE IF NOT EXISTS cosmosCatalog.").append(str3).append(".").append(str4).append(" using cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/id')").toString());
        if (new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps(dataset.columns())).contains("id")) {
            dataset2 = dataset;
        } else {
            if (option instanceof Some) {
                Header header2 = (Header) ((Some) option).value();
                DataFrameKeyCombiner apply2 = DataFrameKeyCombiner$.MODULE$.apply();
                Tuple2<String, Dataset<Row>> combine2 = apply2.combine(dataset, FeatureGenUtils$.MODULE$.getKeyColumnsFromHeader(header2), apply2.combine$default$3());
                if (combine2 == null) {
                    throw new MatchError(combine2);
                }
                Tuple2 tuple23 = new Tuple2((String) combine2._1(), (Dataset) combine2._2());
                withColumn2 = ((Dataset) tuple23._2()).withColumnRenamed((String) tuple23._1(), "id");
            } else {
                if (!None$.MODULE$.equals(option)) {
                    throw new MatchError(option);
                }
                withColumn2 = dataset.withColumn("id", functions$.MODULE$.monotonically_increasing_id().cast("string"));
            }
            dataset2 = withColumn2;
        }
        dataset2.write().format(genericLocation.format()).options(genericLocation.options()).mode((String) genericLocation.mode().getOrElse(() -> {
            return "append";
        })).save();
        BoxedUnit boxedUnit4 = BoxedUnit.UNIT;
    }

    public static final /* synthetic */ void $anonfun$readDf$1(SparkSession sparkSession, Tuple2 tuple2) {
        sparkSession.conf().set((String) tuple2._1(), (String) tuple2._2());
    }

    public static final /* synthetic */ void $anonfun$writeDf$1(SparkSession sparkSession, Tuple2 tuple2) {
        sparkSession.conf().set((String) tuple2._1(), (String) tuple2._2());
    }

    private GenericLocationAdHocPatches$() {
        MODULE$ = this;
    }
}
