package co.cask.cdap.etl.spark.batch;

import co.cask.cdap.api.data.batch.Input;
import co.cask.cdap.api.data.batch.InputFormatProvider;
import co.cask.cdap.api.data.batch.Split;
import co.cask.cdap.api.data.format.FormatSpecification;
import co.cask.cdap.api.data.format.StructuredRecord;
import co.cask.cdap.api.spark.JavaSparkExecutionContext;
import com.google.common.base.Objects;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableMap;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.hadoop.conf.Configuration;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;

/* loaded from: input_file:lib/hydrator-spark-core-4.3.2.jar:co/cask/cdap/etl/spark/batch/SparkBatchSourceFactory.class */
final class SparkBatchSourceFactory {
    private final Map<String, Input.StreamInput> streams = new HashMap();
    private final Map<String, InputFormatProvider> inputFormatProviders = new HashMap();
    private final Map<String, DatasetInfo> datasetInfos = new HashMap();
    private final Map<String, Set<String>> sourceInputs = new HashMap();

    /* loaded from: input_file:lib/hydrator-spark-core-4.3.2.jar:co/cask/cdap/etl/spark/batch/SparkBatchSourceFactory$BasicInputFormatProvider.class */
    static final class BasicInputFormatProvider implements InputFormatProvider {
        private final String inputFormatClassName;
        private final Map<String, String> configuration;

        /* JADX INFO: Access modifiers changed from: package-private */
        public BasicInputFormatProvider(String str, Map<String, String> map) {
            this.inputFormatClassName = str;
            this.configuration = ImmutableMap.copyOf((Map) map);
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public BasicInputFormatProvider() {
            this.inputFormatClassName = "";
            this.configuration = new HashMap();
        }

        @Override // co.cask.cdap.api.data.batch.InputFormatProvider
        public String getInputFormatClassName() {
            return this.inputFormatClassName;
        }

        @Override // co.cask.cdap.api.data.batch.InputFormatProvider
        public Map<String, String> getInputFormatConfiguration() {
            return this.configuration;
        }
    }

    public void addInput(String str, Input input) {
        if (input instanceof Input.DatasetInput) {
            Input.DatasetInput datasetInput = (Input.DatasetInput) input;
            addInput(str, datasetInput.getName(), datasetInput.getAlias(), datasetInput.getArguments(), datasetInput.getSplits());
        } else if (input instanceof Input.InputFormatProviderInput) {
            Input.InputFormatProviderInput inputFormatProviderInput = (Input.InputFormatProviderInput) input;
            addInput(str, inputFormatProviderInput.getAlias(), new BasicInputFormatProvider(inputFormatProviderInput.getInputFormatProvider().getInputFormatClassName(), inputFormatProviderInput.getInputFormatProvider().getInputFormatConfiguration()));
        } else if (input instanceof Input.StreamInput) {
            Input.StreamInput streamInput = (Input.StreamInput) input;
            addInput(str, streamInput.getAlias(), streamInput);
        }
    }

    private void addInput(String str, String str2, Input.StreamInput streamInput) {
        duplicateAliasCheck(str2);
        this.streams.put(str2, streamInput);
        addStageInput(str, str2);
    }

    private void addInput(String str, String str2, String str3, Map<String, String> map, List<Split> list) {
        duplicateAliasCheck(str3);
        this.datasetInfos.put(str3, new DatasetInfo(str2, map, list));
        addStageInput(str, str3);
    }

    private void addInput(String str, String str2, BasicInputFormatProvider basicInputFormatProvider) {
        duplicateAliasCheck(str2);
        this.inputFormatProviders.put(str2, basicInputFormatProvider);
        addStageInput(str, str2);
    }

    private void duplicateAliasCheck(String str) {
        if (this.inputFormatProviders.containsKey(str) || this.datasetInfos.containsKey(str) || this.streams.containsKey(str)) {
            throw new IllegalStateException(str + " has already been added. Can't add an input with the same alias.");
        }
    }

    public <K, V> JavaPairRDD<K, V> createRDD(JavaSparkExecutionContext javaSparkExecutionContext, JavaSparkContext javaSparkContext, String str, Class<K> cls, Class<V> cls2) {
        Set<String> set = this.sourceInputs.get(str);
        if (set == null || set.isEmpty()) {
            throw new IllegalArgumentException(str + " has no input. Please check that the source calls setInput at some input.");
        }
        JavaPairRDD<K, V> fromJavaRDD = JavaPairRDD.fromJavaRDD(javaSparkContext.emptyRDD());
        Iterator<String> it = set.iterator();
        while (it.hasNext()) {
            fromJavaRDD = fromJavaRDD.union(createInputRDD(javaSparkExecutionContext, javaSparkContext, it.next(), cls, cls2));
        }
        return fromJavaRDD;
    }

    /* JADX WARN: Multi-variable type inference failed */
    private <K, V> JavaPairRDD<K, V> createInputRDD(JavaSparkExecutionContext javaSparkExecutionContext, JavaSparkContext javaSparkContext, String str, Class<K> cls, Class<V> cls2) {
        if (this.streams.containsKey(str)) {
            Input.StreamInput streamInput = this.streams.get(str);
            FormatSpecification bodyFormatSpec = streamInput.getBodyFormatSpec();
            if (bodyFormatSpec != null) {
                return javaSparkExecutionContext.fromStream(streamInput.getName(), bodyFormatSpec, streamInput.getStartTime(), streamInput.getEndTime(), StructuredRecord.class);
            }
            String decoderType = streamInput.getDecoderType();
            if (decoderType == null) {
                return javaSparkExecutionContext.fromStream(streamInput.getName(), streamInput.getStartTime(), streamInput.getEndTime(), cls2);
            }
            try {
                return javaSparkExecutionContext.fromStream(streamInput.getName(), streamInput.getStartTime(), streamInput.getEndTime(), Thread.currentThread().getContextClassLoader().loadClass(decoderType), cls, cls2);
            } catch (Exception e) {
                throw Throwables.propagate(e);
            }
        }
        if (!this.inputFormatProviders.containsKey(str)) {
            if (!this.datasetInfos.containsKey(str)) {
                throw new IllegalStateException("Unknown source type");
            }
            DatasetInfo datasetInfo = this.datasetInfos.get(str);
            return javaSparkExecutionContext.fromDataset(datasetInfo.getDatasetName(), datasetInfo.getDatasetArgs());
        }
        InputFormatProvider inputFormatProvider = this.inputFormatProviders.get(str);
        Configuration configuration = new Configuration();
        configuration.clear();
        for (Map.Entry<String, String> entry : inputFormatProvider.getInputFormatConfiguration().entrySet()) {
            configuration.set(entry.getKey(), entry.getValue());
        }
        try {
            return javaSparkContext.newAPIHadoopRDD(configuration, ((ClassLoader) Objects.firstNonNull(Thread.currentThread().getContextClassLoader(), getClass().getClassLoader())).loadClass(inputFormatProvider.getInputFormatClassName()), cls, cls2);
        } catch (ClassNotFoundException e2) {
            throw Throwables.propagate(e2);
        }
    }

    private void addStageInput(String str, String str2) {
        Set<String> set = this.sourceInputs.get(str);
        if (set == null) {
            set = new HashSet();
        }
        set.add(str2);
        this.sourceInputs.put(str, set);
    }
}
