package com.linkedin.feathr.offline.evaluator.datasource;

import com.linkedin.feathr.common.AnchorExtractor;
import com.linkedin.feathr.common.DateTimeResolution$;
import com.linkedin.feathr.common.configObj.configbuilder.ConfigUtils;
import com.linkedin.feathr.common.exception.ErrorLabel;
import com.linkedin.feathr.common.exception.FeathrConfigException;
import com.linkedin.feathr.compute.AnyNode;
import com.linkedin.feathr.compute.DataSource;
import com.linkedin.feathr.compute.DataSourceType;
import com.linkedin.feathr.compute.KeyExpressionType;
import com.linkedin.feathr.core.config.producer.common.KeyListExtractor;
import com.linkedin.feathr.offline.client.plugins.AnchorExtractorAdaptor;
import com.linkedin.feathr.offline.client.plugins.FeathrUdfPluginContext$;
import com.linkedin.feathr.offline.client.plugins.SourceKeyExtractorAdaptor;
import com.linkedin.feathr.offline.client.plugins.UdfAdaptor;
import com.linkedin.feathr.offline.config.ConfigLoaderUtils$;
import com.linkedin.feathr.offline.config.JoinConfigSettings;
import com.linkedin.feathr.offline.config.JoinTimeSetting;
import com.linkedin.feathr.offline.evaluator.NodeEvaluator;
import com.linkedin.feathr.offline.graph.DataframeAndColumnMetadata;
import com.linkedin.feathr.offline.graph.DataframeAndColumnMetadata$;
import com.linkedin.feathr.offline.graph.FCMGraphTraverser;
import com.linkedin.feathr.offline.source.DataSource$;
import com.linkedin.feathr.offline.source.SourceFormatType$;
import com.linkedin.feathr.offline.source.TimeWindowParams;
import com.linkedin.feathr.offline.source.accessor.DataPathHandler;
import com.linkedin.feathr.offline.source.accessor.DataSourceAccessor$;
import com.linkedin.feathr.offline.source.dataloader.DataLoaderHandler;
import com.linkedin.feathr.offline.source.pathutil.PathChecker$;
import com.linkedin.feathr.offline.source.pathutil.TimeBasedHdfsPathAnalyzer;
import com.linkedin.feathr.offline.swa.SlidingWindowFeatureUtils$;
import com.linkedin.feathr.offline.util.datetime.DateTimeInterval;
import com.linkedin.feathr.offline.util.datetime.OfflineDateTimeUtils$;
import com.linkedin.feathr.sparkcommon.SourceKeyExtractor;
import java.time.Duration;
import org.apache.log4j.Logger;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import scala.Enumeration;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Predef$;
import scala.Predef$ArrowAssoc$;
import scala.Some;
import scala.Tuple2;
import scala.Tuple3;
import scala.collection.JavaConverters$;
import scala.collection.Seq;
import scala.collection.Seq$;
import scala.collection.TraversableOnce;
import scala.collection.immutable.$colon;
import scala.collection.immutable.List;
import scala.collection.immutable.List$;
import scala.collection.immutable.Map;
import scala.collection.immutable.Nil$;
import scala.collection.mutable.HashMap;
import scala.collection.mutable.HashMap$;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;

/* compiled from: DataSourceNodeEvaluator.scala */
/* loaded from: input_file:com/linkedin/feathr/offline/evaluator/datasource/DataSourceNodeEvaluator$.class */
public final class DataSourceNodeEvaluator$ implements NodeEvaluator {
    public static DataSourceNodeEvaluator$ MODULE$;
    private final Logger log;

    static {
        new DataSourceNodeEvaluator$();
    }

    public Logger log() {
        return this.log;
    }

    private DataframeAndColumnMetadata processContextNode(Dataset<Row> dataset, DataSource dataSource) {
        return new DataframeAndColumnMetadata(dataset, new $colon.colon(dataSource.getExternalSourceRef(), Nil$.MODULE$), DataframeAndColumnMetadata$.MODULE$.apply$default$3(), DataframeAndColumnMetadata$.MODULE$.apply$default$4(), DataframeAndColumnMetadata$.MODULE$.apply$default$5());
    }

    private DataframeAndColumnMetadata processEventNode(SparkSession sparkSession, DataSource dataSource, Option<DateTimeInterval> option, List<DataPathHandler> list) {
        Tuple3 tuple3;
        SourceKeyExtractor adaptUdf;
        Predef$.MODULE$.assert(dataSource.hasConcreteKey());
        Predef$.MODULE$.assert(((TraversableOnce) JavaConverters$.MODULE$.asScalaBufferConverter(dataSource.getConcreteKey().getKey()).asScala()).nonEmpty());
        com.linkedin.feathr.offline.source.DataSource apply = DataSource$.MODULE$.apply(dataSource.getExternalSourceRef(), SourceFormatType$.MODULE$.TIME_SERIES_PATH(), (Option<TimeWindowParams>) (dataSource.hasTimestampColumnInfo() ? new Some(new TimeWindowParams(dataSource.getTimestampColumnInfo().getExpression(), dataSource.getTimestampColumnInfo().getFormat())) : None$.MODULE$), (Option<String>) (dataSource.hasFilePartitionFormat() ? new Some(dataSource.getFilePartitionFormat()) : None$.MODULE$));
        TimeWindowParams timeWindowParams = dataSource.hasTimestampColumnInfo() ? new TimeWindowParams(dataSource.getTimestampColumnInfo().getExpression(), dataSource.getTimestampColumnInfo().getFormat()) : new TimeWindowParams(SlidingWindowFeatureUtils$.MODULE$.TIMESTAMP_PARTITION_COLUMN(), ConfigUtils.TIMESTAMP_FORMAT_EPOCH);
        String constructTimeStampExpr = SlidingWindowFeatureUtils$.MODULE$.constructTimeStampExpr(timeWindowParams.timestampColumn(), timeWindowParams.timestampColumnFormat(), SlidingWindowFeatureUtils$.MODULE$.constructTimeStampExpr$default$3());
        Dataset<Row> dataset = DataSourceAccessor$.MODULE$.apply(sparkSession, apply, option, None$.MODULE$, false, !dataSource.hasTimestampColumnInfo(), DataSourceAccessor$.MODULE$.apply$default$7(), list).get();
        KeyExpressionType keyExpressionType = dataSource.getKeyExpressionType();
        KeyExpressionType keyExpressionType2 = KeyExpressionType.UDF;
        if (keyExpressionType != null ? !keyExpressionType.equals(keyExpressionType2) : keyExpressionType2 != null) {
            tuple3 = new Tuple3(dataset, (Seq) ConfigLoaderUtils$.MODULE$.javaListToSeqWithDeepCopy(KeyListExtractor.getInstance().extractFromHocon(dataSource.getKeyExpression())).map(str -> {
                return new StringBuilder(17).append("CAST (").append(str).append(" AS string)").toString();
            }, Seq$.MODULE$.canBuildFrom()), constructTimeStampExpr);
        } else {
            Class<?> cls = Class.forName(dataSource.getKeyExpression());
            Object newInstance = cls.newInstance();
            if (!(newInstance instanceof SourceKeyExtractor)) {
                Some registeredUdfAdaptor = FeathrUdfPluginContext$.MODULE$.getRegisteredUdfAdaptor(cls);
                if (registeredUdfAdaptor instanceof Some) {
                    UdfAdaptor udfAdaptor = (UdfAdaptor) registeredUdfAdaptor.value();
                    if (udfAdaptor instanceof SourceKeyExtractorAdaptor) {
                        adaptUdf = ((SourceKeyExtractorAdaptor) udfAdaptor).adaptUdf(cls.getDeclaredConstructor(new Class[0]).newInstance(new Object[0]));
                    }
                }
                throw new UnsupportedOperationException(new StringBuilder(24).append("Unknown extractor type: ").append(cls).toString());
            }
            adaptUdf = (SourceKeyExtractor) newInstance;
            SourceKeyExtractor sourceKeyExtractor = adaptUdf;
            tuple3 = new Tuple3(sourceKeyExtractor.appendKeyColumns(dataset), sourceKeyExtractor.getKeyColumnNames(sourceKeyExtractor.getKeyColumnNames$default$1()), constructTimeStampExpr);
        }
        Tuple3 tuple32 = tuple3;
        if (tuple32 == null) {
            throw new MatchError(tuple32);
        }
        Tuple3 tuple33 = new Tuple3((Dataset) tuple32._1(), (Seq) tuple32._2(), (String) tuple32._3());
        return new DataframeAndColumnMetadata((Dataset) tuple33._1(), (Seq) tuple33._2(), None$.MODULE$, new Some(apply), new Some((String) tuple33._3()));
    }

    private DataframeAndColumnMetadata processTableNode(SparkSession sparkSession, DataSource dataSource, List<DataPathHandler> list) {
        Tuple2 tuple2;
        Tuple2 tuple22;
        Tuple2 tuple23;
        Predef$.MODULE$.assert(dataSource.hasConcreteKey());
        Predef$.MODULE$.assert(((TraversableOnce) JavaConverters$.MODULE$.asScalaBufferConverter(dataSource.getConcreteKey().getKey()).asScala()).nonEmpty());
        com.linkedin.feathr.offline.source.DataSource apply = DataSource$.MODULE$.apply(dataSource.getExternalSourceRef(), SourceFormatType$.MODULE$.FIXED_PATH(), DataSource$.MODULE$.apply$default$3(), DataSource$.MODULE$.apply$default$4());
        Dataset<Row> dataset = DataSourceAccessor$.MODULE$.apply(sparkSession, apply, None$.MODULE$, None$.MODULE$, false, DataSourceAccessor$.MODULE$.apply$default$6(), DataSourceAccessor$.MODULE$.apply$default$7(), list).get();
        KeyExpressionType keyExpressionType = dataSource.getKeyExpressionType();
        KeyExpressionType keyExpressionType2 = KeyExpressionType.UDF;
        if (keyExpressionType != null ? !keyExpressionType.equals(keyExpressionType2) : keyExpressionType2 != null) {
            tuple2 = new Tuple2(dataset, ConfigLoaderUtils$.MODULE$.javaListToSeqWithDeepCopy(KeyListExtractor.getInstance().extractFromHocon(dataSource.getKeyExpression())));
        } else {
            Class<?> cls = Class.forName(dataSource.getKeyExpression());
            Object newInstance = cls.newInstance();
            if (newInstance instanceof SourceKeyExtractor) {
                SourceKeyExtractor sourceKeyExtractor = (SourceKeyExtractor) newInstance;
                tuple23 = new Tuple2(sourceKeyExtractor.appendKeyColumns(dataset), sourceKeyExtractor.getKeyColumnNames(sourceKeyExtractor.getKeyColumnNames$default$1()));
            } else if (newInstance instanceof AnchorExtractor) {
                tuple23 = new Tuple2(dataset, Nil$.MODULE$);
            } else {
                Option<UdfAdaptor<?>> registeredUdfAdaptor = FeathrUdfPluginContext$.MODULE$.getRegisteredUdfAdaptor(cls);
                log().info(new StringBuilder(20).append("x is ").append(registeredUdfAdaptor).append(" and x type is ").append(registeredUdfAdaptor.getClass()).toString());
                boolean z = false;
                Some some = null;
                Option<UdfAdaptor<?>> registeredUdfAdaptor2 = FeathrUdfPluginContext$.MODULE$.getRegisteredUdfAdaptor(cls);
                if (registeredUdfAdaptor2 instanceof Some) {
                    z = true;
                    some = (Some) registeredUdfAdaptor2;
                    UdfAdaptor udfAdaptor = (UdfAdaptor) some.value();
                    if (udfAdaptor instanceof SourceKeyExtractorAdaptor) {
                        SourceKeyExtractor adaptUdf = ((SourceKeyExtractorAdaptor) udfAdaptor).adaptUdf(cls.getDeclaredConstructor(new Class[0]).newInstance(new Object[0]));
                        tuple22 = new Tuple2(adaptUdf.appendKeyColumns(dataset), adaptUdf.getKeyColumnNames(adaptUdf.getKeyColumnNames$default$1()));
                        tuple23 = tuple22;
                    }
                }
                if (!z || !(((UdfAdaptor) some.value()) instanceof AnchorExtractorAdaptor)) {
                    throw new UnsupportedOperationException(new StringBuilder(98).append("Unknown extractor type: ").append(cls).append(" FeathrUdfPluginContext").append(".getRegisteredUdfAdaptor(className) is ").append(FeathrUdfPluginContext$.MODULE$.getRegisteredUdfAdaptor(cls)).append("and type is ").append(registeredUdfAdaptor.get() instanceof AnchorExtractorAdaptor).toString());
                }
                tuple22 = new Tuple2(dataset, Nil$.MODULE$);
                tuple23 = tuple22;
            }
            tuple2 = tuple23;
        }
        Tuple2 tuple24 = tuple2;
        if (tuple24 == null) {
            throw new MatchError(tuple24);
        }
        Tuple2 tuple25 = new Tuple2((Dataset) tuple24._1(), (Seq) tuple24._2());
        return new DataframeAndColumnMetadata((Dataset) tuple25._1(), (Seq) tuple25._2(), DataframeAndColumnMetadata$.MODULE$.apply$default$3(), new Some(apply), DataframeAndColumnMetadata$.MODULE$.apply$default$5());
    }

    private Map<String, Duration> getOptimizedDurationMap(Seq<AnyNode> seq) {
        Map map = ((TraversableOnce) ((Seq) seq.filter(anyNode -> {
            return BoxesRunTime.boxToBoolean($anonfun$getOptimizedDurationMap$1(anyNode));
        })).map(anyNode2 -> {
            return Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(anyNode2.getAggregation().getInput().getId()), Duration.parse((CharSequence) anyNode2.getAggregation().getFunction().getParameters().get("window_size")));
        }, Seq$.MODULE$.canBuildFrom())).toMap(Predef$.MODULE$.$conforms());
        Seq seq2 = (Seq) seq.filter(anyNode3 -> {
            return BoxesRunTime.boxToBoolean($anonfun$getOptimizedDurationMap$3(anyNode3));
        });
        HashMap empty = HashMap$.MODULE$.empty();
        seq2.map(anyNode4 -> {
            String externalSourceRef = anyNode4.getDataSource().getExternalSourceRef();
            if (empty.contains(externalSourceRef) && ((Duration) empty.apply(externalSourceRef)).toHours() >= ((Duration) map.apply(anyNode4.getDataSource().getId())).toHours()) {
                return BoxedUnit.UNIT;
            }
            return empty.put(externalSourceRef, map.apply(anyNode4.getDataSource().getId()));
        }, Seq$.MODULE$.canBuildFrom());
        return empty.toMap(Predef$.MODULE$.$conforms());
    }

    @Override // com.linkedin.feathr.offline.evaluator.NodeEvaluator
    public Dataset<Row> evaluate(AnyNode anyNode, FCMGraphTraverser fCMGraphTraverser, Dataset<Row> dataset, List<DataPathHandler> list) {
        BoxedUnit boxedUnit;
        DataSource dataSource = anyNode.getDataSource();
        Integer id = anyNode.getDataSource().getId();
        DataSourceType sourceType = dataSource.getSourceType();
        if (DataSourceType.CONTEXT.equals(sourceType)) {
            if (dataSource.hasConcreteKey()) {
                fCMGraphTraverser.nodeIdToDataframeAndColumnMetadataMap().update(BoxesRunTime.boxToInteger(Predef$.MODULE$.Integer2int(id)), new DataframeAndColumnMetadata(dataset, new $colon.colon(dataSource.getKeyExpression(), Nil$.MODULE$), DataframeAndColumnMetadata$.MODULE$.apply$default$3(), DataframeAndColumnMetadata$.MODULE$.apply$default$4(), DataframeAndColumnMetadata$.MODULE$.apply$default$5()));
                boxedUnit = BoxedUnit.UNIT;
            } else {
                fCMGraphTraverser.nodeIdToDataframeAndColumnMetadataMap().update(BoxesRunTime.boxToInteger(Predef$.MODULE$.Integer2int(id)), processContextNode(dataset, dataSource));
                boxedUnit = BoxedUnit.UNIT;
            }
        } else if (DataSourceType.UPDATE.equals(sourceType)) {
            fCMGraphTraverser.nodeIdToDataframeAndColumnMetadataMap().update(BoxesRunTime.boxToInteger(Predef$.MODULE$.Integer2int(id)), processTableNode(fCMGraphTraverser.ss(), dataSource, list));
            BoxedUnit boxedUnit2 = BoxedUnit.UNIT;
        } else {
            if (!DataSourceType.EVENT.equals(sourceType)) {
                throw new MatchError(sourceType);
            }
            List<DataLoaderHandler> list2 = (List) list.map(dataPathHandler -> {
                return dataPathHandler.dataLoaderHandler();
            }, List$.MODULE$.canBuildFrom());
            Enumeration.Value dateTimeResolution = new TimeBasedHdfsPathAnalyzer(PathChecker$.MODULE$.apply(fCMGraphTraverser.ss(), list2), list2).analyze(anyNode.getDataSource().getExternalSourceRef()).dateTimeResolution();
            Enumeration.Value DAILY = DateTimeResolution$.MODULE$.DAILY();
            DateTimeInterval obsTimeRange = (dateTimeResolution != null ? !dateTimeResolution.equals(DAILY) : DAILY != null) ? fCMGraphTraverser.timeConfigSettings().obsTimeRange() : fCMGraphTraverser.timeConfigSettings().obsTimeRange().adjustWithDateTimeResolution(DateTimeResolution$.MODULE$.DAILY());
            Duration duration = (Duration) getOptimizedDurationMap(fCMGraphTraverser.nodes()).apply(anyNode.getDataSource().getExternalSourceRef());
            if (fCMGraphTraverser.timeConfigSettings().timeConfigSettings().isEmpty() || ((JoinConfigSettings) fCMGraphTraverser.timeConfigSettings().timeConfigSettings().get()).joinTimeSetting().isEmpty()) {
                throw new FeathrConfigException(ErrorLabel.FEATHR_USER_ERROR, "joinTimeSettings section is not defined in join config, cannot perform window aggregation operation");
            }
            fCMGraphTraverser.nodeIdToDataframeAndColumnMetadataMap().update(BoxesRunTime.boxToInteger(Predef$.MODULE$.Integer2int(anyNode.getDataSource().getId())), processEventNode(fCMGraphTraverser.ss(), anyNode.getDataSource(), new Some(OfflineDateTimeUtils$.MODULE$.getFactDataTimeRange(obsTimeRange, duration, new Duration[]{(Duration) ((JoinTimeSetting) ((JoinConfigSettings) fCMGraphTraverser.timeConfigSettings().timeConfigSettings().get()).joinTimeSetting().get()).simulateTimeDelay().getOrElse(() -> {
                return Duration.ZERO;
            })})), list));
            BoxedUnit boxedUnit3 = BoxedUnit.UNIT;
        }
        return dataset;
    }

    @Override // com.linkedin.feathr.offline.evaluator.NodeEvaluator
    public Dataset<Row> batchEvaluate(Seq<AnyNode> seq, FCMGraphTraverser fCMGraphTraverser, Dataset<Row> dataset, List<DataPathHandler> list) {
        seq.foreach(anyNode -> {
            return MODULE$.evaluate(anyNode, fCMGraphTraverser, dataset, list);
        });
        return dataset;
    }

    public static final /* synthetic */ boolean $anonfun$getOptimizedDurationMap$1(AnyNode anyNode) {
        return anyNode.getAggregation() != null;
    }

    public static final /* synthetic */ boolean $anonfun$getOptimizedDurationMap$3(AnyNode anyNode) {
        if (anyNode.isDataSource()) {
            DataSourceType sourceType = anyNode.getDataSource().getSourceType();
            DataSourceType dataSourceType = DataSourceType.EVENT;
            if (sourceType != null ? sourceType.equals(dataSourceType) : dataSourceType == null) {
                return true;
            }
        }
        return false;
    }

    private DataSourceNodeEvaluator$() {
        MODULE$ = this;
        this.log = Logger.getLogger(getClass());
    }
}
