package co.cask.cdap.etl.spark.batch;

import co.cask.cdap.api.ProgramStatus;
import co.cask.cdap.api.data.batch.InputFormatProvider;
import co.cask.cdap.api.data.batch.OutputFormatProvider;
import co.cask.cdap.api.data.schema.Schema;
import co.cask.cdap.api.spark.AbstractSpark;
import co.cask.cdap.api.spark.SparkClientContext;
import co.cask.cdap.etl.api.batch.BatchAggregator;
import co.cask.cdap.etl.api.batch.BatchAggregatorContext;
import co.cask.cdap.etl.api.batch.BatchConfigurable;
import co.cask.cdap.etl.api.batch.BatchJoiner;
import co.cask.cdap.etl.api.batch.BatchJoinerContext;
import co.cask.cdap.etl.api.batch.BatchSink;
import co.cask.cdap.etl.api.batch.BatchSource;
import co.cask.cdap.etl.api.batch.MultiInputBatchConfigurable;
import co.cask.cdap.etl.api.batch.SparkSink;
import co.cask.cdap.etl.batch.BatchPhaseSpec;
import co.cask.cdap.etl.common.CompositeFinisher;
import co.cask.cdap.etl.common.Constants;
import co.cask.cdap.etl.common.DatasetContextLookupProvider;
import co.cask.cdap.etl.common.DefaultMacroEvaluator;
import co.cask.cdap.etl.common.Finisher;
import co.cask.cdap.etl.common.SetMultimapCodec;
import co.cask.cdap.etl.planner.StageInfo;
import co.cask.cdap.internal.io.SchemaTypeAdapter;
import com.google.common.collect.SetMultimap;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import java.io.BufferedWriter;
import java.io.File;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.OpenOption;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import org.apache.spark.SparkConf;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:lib/hydrator-spark-core-4.1.2.jar:co/cask/cdap/etl/spark/batch/ETLSpark.class */
public class ETLSpark extends AbstractSpark {
    private static final Logger LOG = LoggerFactory.getLogger(ETLSpark.class);
    private static final Gson GSON = new GsonBuilder().registerTypeAdapter(Schema.class, new SchemaTypeAdapter()).registerTypeAdapter(SetMultimap.class, new SetMultimapCodec()).registerTypeAdapter(DatasetInfo.class, new DatasetInfoTypeAdapter()).registerTypeAdapter(OutputFormatProvider.class, new OutputFormatProviderTypeAdapter()).registerTypeAdapter(InputFormatProvider.class, new InputFormatProviderTypeAdapter()).create();
    private final BatchPhaseSpec phaseSpec;
    private Finisher finisher;
    private List<File> cleanupFiles;

    public ETLSpark(BatchPhaseSpec batchPhaseSpec) {
        this.phaseSpec = batchPhaseSpec;
    }

    protected void configure() {
        setName(this.phaseSpec.getPhaseName());
        setDescription(this.phaseSpec.getDescription());
        setMainClass(BatchSparkPipelineDriver.class);
        setExecutorResources(this.phaseSpec.getResources());
        setDriverResources(this.phaseSpec.getDriverResources());
        setClientResources(this.phaseSpec.getClientResources());
        HashMap hashMap = new HashMap();
        hashMap.put(Constants.PIPELINEID, GSON.toJson(this.phaseSpec, BatchPhaseSpec.class));
        setProperties(hashMap);
    }

    public void initialize() throws Exception {
        SparkClientContext context = getContext();
        this.cleanupFiles = new ArrayList();
        CompositeFinisher.Builder builder = CompositeFinisher.builder();
        SparkConf sparkConf = new SparkConf();
        sparkConf.set("spark.driver.extraJavaOptions", "-XX:MaxPermSize=256m");
        sparkConf.set("spark.executor.extraJavaOptions", "-XX:MaxPermSize=256m");
        context.setSparkConf(sparkConf);
        BatchPhaseSpec batchPhaseSpec = (BatchPhaseSpec) GSON.fromJson((String) context.getSpecification().getProperties().get(Constants.PIPELINEID), BatchPhaseSpec.class);
        DatasetContextLookupProvider datasetContextLookupProvider = new DatasetContextLookupProvider(context);
        DefaultMacroEvaluator defaultMacroEvaluator = new DefaultMacroEvaluator(context.getWorkflowToken(), context.getRuntimeArguments(), context.getLogicalStartTime(), context, context.getNamespace());
        SparkBatchSourceFactory sparkBatchSourceFactory = new SparkBatchSourceFactory();
        SparkBatchSinkFactory sparkBatchSinkFactory = new SparkBatchSinkFactory();
        HashMap hashMap = new HashMap();
        Iterator<StageInfo> it = batchPhaseSpec.getPhase().iterator();
        while (it.hasNext()) {
            StageInfo next = it.next();
            String name = next.getName();
            String pluginType = next.getPluginType();
            if (BatchSource.PLUGIN_TYPE.equals(pluginType)) {
                BatchConfigurable batchConfigurable = (BatchConfigurable) context.newPluginInstance(name, defaultMacroEvaluator);
                SparkBatchSourceContext sparkBatchSourceContext = new SparkBatchSourceContext(sparkBatchSourceFactory, context, datasetContextLookupProvider, next, context.getDataTracer(name).isEnabled());
                batchConfigurable.prepareRun(sparkBatchSourceContext);
                builder.add((BatchConfigurable<BatchConfigurable>) batchConfigurable, (BatchConfigurable) sparkBatchSourceContext);
            } else if (BatchSink.PLUGIN_TYPE.equals(pluginType)) {
                BatchConfigurable batchConfigurable2 = (BatchConfigurable) context.newPluginInstance(name, defaultMacroEvaluator);
                SparkBatchSinkContext sparkBatchSinkContext = new SparkBatchSinkContext(sparkBatchSinkFactory, context, null, next, context.getDataTracer(name).isEnabled());
                batchConfigurable2.prepareRun(sparkBatchSinkContext);
                builder.add((BatchConfigurable<BatchConfigurable>) batchConfigurable2, (BatchConfigurable) sparkBatchSinkContext);
            } else if (SparkSink.PLUGIN_TYPE.equals(pluginType)) {
                BatchConfigurable batchConfigurable3 = (BatchConfigurable) context.newPluginInstance(name, defaultMacroEvaluator);
                BasicSparkPluginContext basicSparkPluginContext = new BasicSparkPluginContext(context, datasetContextLookupProvider, next);
                batchConfigurable3.prepareRun(basicSparkPluginContext);
                builder.add((BatchConfigurable<BatchConfigurable>) batchConfigurable3, (BatchConfigurable) basicSparkPluginContext);
            } else if (BatchAggregator.PLUGIN_TYPE.equals(pluginType)) {
                BatchAggregator batchAggregator = (BatchAggregator) context.newPluginInstance(name, defaultMacroEvaluator);
                SparkAggregatorContext sparkAggregatorContext = new SparkAggregatorContext(context, new DatasetContextLookupProvider(context), next);
                batchAggregator.prepareRun((BatchAggregatorContext) sparkAggregatorContext);
                builder.add((BatchConfigurable<BatchAggregator>) batchAggregator, (BatchAggregator) sparkAggregatorContext);
                hashMap.put(name, sparkAggregatorContext.getNumPartitions());
            } else if (BatchJoiner.PLUGIN_TYPE.equals(pluginType)) {
                BatchJoiner batchJoiner = (BatchJoiner) context.newPluginInstance(name, defaultMacroEvaluator);
                SparkJoinerContext sparkJoinerContext = new SparkJoinerContext(next, context);
                batchJoiner.prepareRun((BatchJoinerContext) sparkJoinerContext);
                builder.add((MultiInputBatchConfigurable<BatchJoiner>) batchJoiner, (BatchJoiner) sparkJoinerContext);
                hashMap.put(name, sparkJoinerContext.getNumPartitions());
            }
        }
        File createTempFile = File.createTempFile("HydratorSpark", ".config");
        this.cleanupFiles.add(createTempFile);
        BufferedWriter newBufferedWriter = Files.newBufferedWriter(createTempFile.toPath(), StandardCharsets.UTF_8, new OpenOption[0]);
        Throwable th = null;
        try {
            try {
                newBufferedWriter.write(GSON.toJson(new SparkBatchSourceSinkFactoryInfo(sparkBatchSourceFactory, sparkBatchSinkFactory, hashMap)));
                if (newBufferedWriter != null) {
                    if (0 != 0) {
                        try {
                            newBufferedWriter.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        newBufferedWriter.close();
                    }
                }
                this.finisher = builder.build();
                context.localize("HydratorSpark.config", createTempFile.toURI());
            } finally {
            }
        } catch (Throwable th3) {
            if (newBufferedWriter != null) {
                if (th != null) {
                    try {
                        newBufferedWriter.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    newBufferedWriter.close();
                }
            }
            throw th3;
        }
    }

    public void destroy() {
        this.finisher.onFinish(getContext().getState().getStatus() == ProgramStatus.COMPLETED);
        for (File file : this.cleanupFiles) {
            if (!file.delete()) {
                LOG.warn("Failed to clean up resource {} ", file);
            }
        }
    }
}
