package co.cask.cdap.etl.spark.batch;

import co.cask.cdap.api.TxRunnable;
import co.cask.cdap.api.data.DatasetContext;
import co.cask.cdap.api.data.batch.InputFormatProvider;
import co.cask.cdap.api.data.batch.OutputFormatProvider;
import co.cask.cdap.api.data.schema.Schema;
import co.cask.cdap.api.spark.JavaSparkExecutionContext;
import co.cask.cdap.api.spark.JavaSparkMain;
import co.cask.cdap.etl.api.JoinElement;
import co.cask.cdap.etl.api.batch.BatchSource;
import co.cask.cdap.etl.batch.BatchPhaseSpec;
import co.cask.cdap.etl.common.Constants;
import co.cask.cdap.etl.common.SetMultimapCodec;
import co.cask.cdap.etl.planner.StageInfo;
import co.cask.cdap.etl.spark.SparkCollection;
import co.cask.cdap.etl.spark.SparkPairCollection;
import co.cask.cdap.etl.spark.SparkPipelineRunner;
import co.cask.cdap.etl.spark.function.BatchSourceFunction;
import co.cask.cdap.etl.spark.function.JoinMergeFunction;
import co.cask.cdap.etl.spark.function.JoinOnFunction;
import co.cask.cdap.etl.spark.function.PluginFunctionContext;
import co.cask.cdap.internal.io.SchemaTypeAdapter;
import com.google.common.collect.SetMultimap;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import java.io.BufferedReader;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.util.List;
import java.util.Map;
import org.apache.spark.api.java.JavaSparkContext;
import scala.Tuple2;

/* loaded from: input_file:lib/hydrator-spark-core-4.1.2.jar:co/cask/cdap/etl/spark/batch/BatchSparkPipelineDriver.class */
public class BatchSparkPipelineDriver extends SparkPipelineRunner implements JavaSparkMain, TxRunnable {
    private static final Gson GSON = new GsonBuilder().registerTypeAdapter(SetMultimap.class, new SetMultimapCodec()).registerTypeAdapter(Schema.class, new SchemaTypeAdapter()).registerTypeAdapter(DatasetInfo.class, new DatasetInfoTypeAdapter()).registerTypeAdapter(OutputFormatProvider.class, new OutputFormatProviderTypeAdapter()).registerTypeAdapter(InputFormatProvider.class, new InputFormatProviderTypeAdapter()).create();
    private transient JavaSparkContext jsc;
    private transient JavaSparkExecutionContext sec;
    private transient SparkBatchSourceFactory sourceFactory;
    private transient SparkBatchSinkFactory sinkFactory;
    private transient DatasetContext datasetContext;
    private transient Map<String, Integer> stagePartitions;
    private transient int numOfRecordsPreview;

    @Override // co.cask.cdap.etl.spark.SparkPipelineRunner
    protected SparkCollection<Tuple2<Boolean, Object>> getSource(StageInfo stageInfo) {
        return new RDDCollection(this.sec, this.jsc, this.datasetContext, this.sinkFactory, this.sourceFactory.createRDD(this.sec, this.jsc, stageInfo.getName(), Object.class, Object.class).flatMap(new BatchSourceFunction(new PluginFunctionContext(stageInfo, this.sec), this.numOfRecordsPreview)));
    }

    @Override // co.cask.cdap.etl.spark.SparkPipelineRunner
    protected SparkPairCollection<Object, Object> addJoinKey(StageInfo stageInfo, String str, SparkCollection<Object> sparkCollection) throws Exception {
        return sparkCollection.flatMapToPair(new JoinOnFunction(new PluginFunctionContext(stageInfo, this.sec), str));
    }

    @Override // co.cask.cdap.etl.spark.SparkPipelineRunner
    protected SparkCollection<Object> mergeJoinResults(StageInfo stageInfo, SparkPairCollection<Object, List<JoinElement<Object>>> sparkPairCollection) throws Exception {
        return sparkPairCollection.flatMap(new JoinMergeFunction(new PluginFunctionContext(stageInfo, this.sec)));
    }

    public void run(JavaSparkExecutionContext javaSparkExecutionContext) throws Exception {
        this.jsc = new JavaSparkContext();
        this.sec = javaSparkExecutionContext;
        javaSparkExecutionContext.execute(this);
    }

    public void run(DatasetContext datasetContext) throws Exception {
        BatchPhaseSpec batchPhaseSpec = (BatchPhaseSpec) GSON.fromJson(this.sec.getSpecification().getProperty(Constants.PIPELINEID), BatchPhaseSpec.class);
        BufferedReader newBufferedReader = Files.newBufferedReader(this.sec.getLocalizationContext().getLocalFile("HydratorSpark.config").toPath(), StandardCharsets.UTF_8);
        Throwable th = null;
        try {
            try {
                SparkBatchSourceSinkFactoryInfo sparkBatchSourceSinkFactoryInfo = (SparkBatchSourceSinkFactoryInfo) GSON.fromJson(newBufferedReader.readLine(), SparkBatchSourceSinkFactoryInfo.class);
                this.sourceFactory = sparkBatchSourceSinkFactoryInfo.getSparkBatchSourceFactory();
                this.sinkFactory = sparkBatchSourceSinkFactoryInfo.getSparkBatchSinkFactory();
                this.stagePartitions = sparkBatchSourceSinkFactoryInfo.getStagePartitions();
                if (newBufferedReader != null) {
                    if (0 != 0) {
                        try {
                            newBufferedReader.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        newBufferedReader.close();
                    }
                }
                this.datasetContext = datasetContext;
                this.numOfRecordsPreview = batchPhaseSpec.getNumOfRecordsPreview();
                runPipeline(batchPhaseSpec.getPhase(), BatchSource.PLUGIN_TYPE, this.sec, this.stagePartitions);
            } finally {
            }
        } catch (Throwable th3) {
            if (newBufferedReader != null) {
                if (th != null) {
                    try {
                        newBufferedReader.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    newBufferedReader.close();
                }
            }
            throw th3;
        }
    }
}
