package co.cask.cdap.etl.batch;

import co.cask.cdap.api.data.schema.Schema;
import co.cask.cdap.api.macro.MacroEvaluator;
import co.cask.cdap.api.metrics.Metrics;
import co.cask.cdap.api.preview.DataTracer;
import co.cask.cdap.etl.api.ErrorTransform;
import co.cask.cdap.etl.api.StageLifecycle;
import co.cask.cdap.etl.api.StageMetrics;
import co.cask.cdap.etl.api.Transformation;
import co.cask.cdap.etl.api.batch.BatchJoiner;
import co.cask.cdap.etl.api.batch.BatchRuntimeContext;
import co.cask.cdap.etl.batch.mapreduce.ConnectorSourceEmitter;
import co.cask.cdap.etl.batch.mapreduce.ErrorOutputWriter;
import co.cask.cdap.etl.batch.mapreduce.OutputWriter;
import co.cask.cdap.etl.batch.mapreduce.PipeTransformExecutor;
import co.cask.cdap.etl.batch.mapreduce.SinkEmitter;
import co.cask.cdap.etl.batch.mapreduce.TransformEmitter;
import co.cask.cdap.etl.common.Constants;
import co.cask.cdap.etl.common.PipelinePhase;
import co.cask.cdap.etl.common.TrackedTransform;
import co.cask.cdap.etl.planner.StageInfo;
import com.google.common.collect.Sets;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import javax.annotation.Nullable;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.Mapper;

/* loaded from: input_file:co/cask/cdap/etl/batch/TransformExecutorFactory.class */
public abstract class TransformExecutorFactory<T> {
    private final String sourceStageName;
    private final MacroEvaluator macroEvaluator;
    protected final PipelinePluginInstantiator pluginInstantiator;
    protected final Metrics metrics;
    protected boolean isMapPhase;
    protected final Map<String, Map<String, Schema>> perStageInputSchemas = new HashMap();
    protected final Map<String, Schema> outputSchemas = new HashMap();

    public TransformExecutorFactory(JobContext jobContext, PipelinePluginInstantiator pipelinePluginInstantiator, Metrics metrics, @Nullable String str, MacroEvaluator macroEvaluator) {
        this.pluginInstantiator = pipelinePluginInstantiator;
        this.metrics = metrics;
        this.sourceStageName = str;
        this.macroEvaluator = macroEvaluator;
        this.isMapPhase = jobContext instanceof Mapper.Context;
    }

    protected abstract BatchRuntimeContext createRuntimeContext(StageInfo stageInfo);

    protected abstract TrackedTransform getTransformation(StageInfo stageInfo) throws Exception;

    public <KEY_OUT, VAL_OUT> PipeTransformExecutor<T> create(PipelinePhase pipelinePhase, OutputWriter<KEY_OUT, VAL_OUT> outputWriter, Map<String, ErrorOutputWriter<Object, Object>> map) throws Exception {
        HashMap hashMap = new HashMap();
        Set<String> sources = pipelinePhase.getSources();
        Iterator<String> it = pipelinePhase.getPluginTypes().iterator();
        while (it.hasNext()) {
            for (StageInfo stageInfo : pipelinePhase.getStagesOfType(it.next())) {
                String name = stageInfo.getName();
                this.outputSchemas.put(name, stageInfo.getOutputSchema());
                this.perStageInputSchemas.put(name, stageInfo.getInputSchemas());
            }
        }
        Iterator<String> it2 = sources.iterator();
        while (it2.hasNext()) {
            setPipeTransformDetail(pipelinePhase, it2.next(), hashMap, map, outputWriter);
        }
        return new PipeTransformExecutor<>(hashMap, this.sourceStageName == null ? pipelinePhase.getSources() : Sets.newHashSet(this.sourceStageName));
    }

    private <KEY_OUT, VAL_OUT> void setPipeTransformDetail(PipelinePhase pipelinePhase, String str, Map<String, PipeTransformDetail> map, Map<String, ErrorOutputWriter<Object, Object>> map2, OutputWriter<KEY_OUT, VAL_OUT> outputWriter) throws Exception {
        if (pipelinePhase.getSinks().contains(str)) {
            StageInfo stage = pipelinePhase.getStage(str);
            String pluginType = stage.getPluginType();
            map.put(str, new PipeTransformDetail(str, (pluginType.equals(Constants.CONNECTOR_TYPE) || pluginType.equals(BatchJoiner.PLUGIN_TYPE)) ? false : true, pluginType.equals(ErrorTransform.PLUGIN_TYPE), getTransformation(stage), new SinkEmitter(str, outputWriter)));
        } else {
            addTransformation(pipelinePhase, str, map, map2);
            for (String str2 : pipelinePhase.getDag().getNodeOutputs(str)) {
                setPipeTransformDetail(pipelinePhase, str2, map, map2, outputWriter);
                map.get(str).addTransformation(str2, map.get(str2));
            }
        }
    }

    private void addTransformation(PipelinePhase pipelinePhase, String str, Map<String, PipeTransformDetail> map, Map<String, ErrorOutputWriter<Object, Object>> map2) throws Exception {
        StageInfo stage = pipelinePhase.getStage(str);
        String pluginType = stage.getPluginType();
        ErrorOutputWriter<Object, Object> errorOutputWriter = map2.containsKey(str) ? map2.get(str) : null;
        if (pipelinePhase.getSources().contains(str) && pluginType.equals(Constants.CONNECTOR_TYPE)) {
            map.put(str, new PipeTransformDetail(str, true, false, getTransformation(stage), new ConnectorSourceEmitter(str)));
        } else if (pluginType.equals(BatchJoiner.PLUGIN_TYPE) && this.isMapPhase) {
            map.put(str, new PipeTransformDetail(str, false, false, getTransformation(stage), new TransformEmitter(str, errorOutputWriter)));
        } else {
            map.put(str, new PipeTransformDetail(str, true, ErrorTransform.PLUGIN_TYPE.equals(pluginType), getTransformation(stage), new TransformEmitter(str, errorOutputWriter)));
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public <T extends Transformation & StageLifecycle<BatchRuntimeContext>> Transformation getInitializedTransformation(StageInfo stageInfo) throws Exception {
        BatchRuntimeContext createRuntimeContext = createRuntimeContext(stageInfo);
        Transformation transformation = (Transformation) this.pluginInstantiator.newPluginInstance(stageInfo.getName(), this.macroEvaluator);
        ((StageLifecycle) transformation).initialize(createRuntimeContext);
        return transformation;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public static <IN, OUT> TrackedTransform<IN, OUT> getTrackedEmitKeyStep(Transformation<IN, OUT> transformation, StageMetrics stageMetrics, DataTracer dataTracer) {
        return new TrackedTransform<>(transformation, stageMetrics, TrackedTransform.RECORDS_IN, null, dataTracer);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public static <IN, OUT> TrackedTransform<IN, OUT> getTrackedAggregateStep(Transformation<IN, OUT> transformation, StageMetrics stageMetrics, DataTracer dataTracer) {
        return new TrackedTransform<>(transformation, stageMetrics, "aggregator.groups", TrackedTransform.RECORDS_OUT, dataTracer);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public static <IN, OUT> TrackedTransform<IN, OUT> getTrackedMergeStep(Transformation<IN, OUT> transformation, StageMetrics stageMetrics, DataTracer dataTracer) {
        return new TrackedTransform<>(transformation, stageMetrics, null, TrackedTransform.RECORDS_OUT, dataTracer);
    }
}
