package eu.stratosphere.sopremo.operator;

import eu.stratosphere.api.common.operators.FileDataSource;
import eu.stratosphere.api.common.operators.GenericDataSink;
import eu.stratosphere.api.common.operators.util.OperatorUtil;
import eu.stratosphere.pact.common.plan.PactModule;
import eu.stratosphere.sopremo.EvaluationContext;
import eu.stratosphere.sopremo.Schema;
import eu.stratosphere.sopremo.io.Sink;
import eu.stratosphere.sopremo.io.Source;
import eu.stratosphere.sopremo.operator.Operator;
import eu.stratosphere.sopremo.serialization.SopremoRecordLayout;
import eu.stratosphere.util.IdentityList;
import eu.stratosphere.util.dag.GraphTraverseListener;
import eu.stratosphere.util.dag.OneTimeTraverser;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
import java.util.IdentityHashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;

/* loaded from: input_file:eu/stratosphere/sopremo/operator/ElementarySopremoModule.class */
public class ElementarySopremoModule extends SopremoModule {

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:eu/stratosphere/sopremo/operator/ElementarySopremoModule$PactAssembler.class */
    public class PactAssembler {
        private final Map<Operator<?>, PactModule> modules = new IdentityHashMap();
        private final Map<Operator<?>, List<List<eu.stratosphere.api.common.operators.Operator>>> operatorOutputs = new IdentityHashMap();
        private final EvaluationContext context;

        public PactAssembler(EvaluationContext evaluationContext) {
            this.context = evaluationContext;
        }

        public Collection<eu.stratosphere.api.common.operators.Operator> assemble(SopremoRecordLayout sopremoRecordLayout) {
            convertDAGToModules(sopremoRecordLayout);
            connectModules();
            return findPACTSinks();
        }

        private void addOutputtingPactInOperator(Operator<?> operator, eu.stratosphere.api.common.operators.Operator operator2, List<eu.stratosphere.api.common.operators.Operator> list) {
            int indexOf = new IdentityList(this.modules.get(operator).getInputs()).indexOf(operator2);
            if (indexOf >= operator.getInputs().size() || indexOf == -1) {
                list.add(operator2);
                return;
            }
            Operator.Output source = operator.getInputs().get(indexOf).getSource();
            for (eu.stratosphere.api.common.operators.Operator operator3 : this.operatorOutputs.get(source.getOperator()).get(source.getIndex())) {
                if (!(operator3 instanceof FileDataSource) || (source.getOperator() instanceof Source)) {
                    list.add(operator3);
                } else {
                    addOutputtingPactInOperator(source.getOperator(), operator3, list);
                }
            }
        }

        private void connectModules() {
            for (Map.Entry<Operator<?>, PactModule> entry : this.modules.entrySet()) {
                Operator<?> key = entry.getKey();
                for (eu.stratosphere.api.common.operators.Operator operator : entry.getValue().getReachableNodes()) {
                    List inputs = OperatorUtil.getInputs(operator);
                    for (int i = 0; i < inputs.size(); i++) {
                        ArrayList arrayList = new ArrayList();
                        List list = (List) inputs.get(i);
                        for (int i2 = 0; i2 < list.size(); i2++) {
                            addOutputtingPactInOperator(key, (eu.stratosphere.api.common.operators.Operator) list.get(i2), arrayList);
                        }
                        inputs.set(i, arrayList);
                    }
                    OperatorUtil.setInputs(operator, inputs);
                }
            }
        }

        private void convertDAGToModules(final SopremoRecordLayout sopremoRecordLayout) {
            OneTimeTraverser.INSTANCE.traverse(ElementarySopremoModule.this.getAllOutputs(), OperatorNavigator.ELEMENTARY, new GraphTraverseListener<ElementaryOperator<?>>() { // from class: eu.stratosphere.sopremo.operator.ElementarySopremoModule.PactAssembler.1
                public void nodeTraversed(ElementaryOperator<?> elementaryOperator) {
                    EvaluationContext evaluationContext = PactAssembler.this.context;
                    evaluationContext.setOperatorDescription(elementaryOperator.getName());
                    PactModule asPactModule = elementaryOperator.asPactModule(evaluationContext, sopremoRecordLayout);
                    PactAssembler.this.modules.put(elementaryOperator, asPactModule);
                    List outputs = asPactModule.getOutputs();
                    ArrayList arrayList = new ArrayList();
                    Iterator it = outputs.iterator();
                    while (it.hasNext()) {
                        arrayList.add(((GenericDataSink) it.next()).getInputs());
                    }
                    PactAssembler.this.operatorOutputs.put(elementaryOperator, arrayList);
                }
            });
            Iterator<PactModule> it = this.modules.values().iterator();
            while (it.hasNext()) {
                it.next().validate();
            }
        }

        private List<eu.stratosphere.api.common.operators.Operator> findPACTSinks() {
            ArrayList arrayList = new ArrayList();
            for (Operator operator : ElementarySopremoModule.this.getAllOutputs()) {
                for (GenericDataSink genericDataSink : this.modules.get(operator).getAllOutputs()) {
                    if (operator instanceof Sink) {
                        arrayList.add(genericDataSink);
                    } else {
                        arrayList.addAll(genericDataSink.getInputs());
                    }
                }
            }
            return arrayList;
        }
    }

    public ElementarySopremoModule(int i, int i2) {
        super(i, i2);
    }

    ElementarySopremoModule() {
    }

    @Override // eu.stratosphere.sopremo.operator.SopremoModule
    public ElementarySopremoModule asElementary() {
        return this;
    }

    public PactModule asPactModule(EvaluationContext evaluationContext, SopremoRecordLayout sopremoRecordLayout) {
        return PactModule.valueOf((Collection<? extends eu.stratosphere.api.common.operators.Operator>) assemblePact(evaluationContext, sopremoRecordLayout));
    }

    public Collection<eu.stratosphere.api.common.operators.Operator> assemblePact(EvaluationContext evaluationContext, SopremoRecordLayout sopremoRecordLayout) {
        return new PactAssembler(evaluationContext).assemble(sopremoRecordLayout);
    }

    @Override // eu.stratosphere.sopremo.operator.SopremoModule
    /* renamed from: clone, reason: merged with bridge method [inline-methods] */
    public ElementarySopremoModule mo47clone() {
        ElementarySopremoModule elementarySopremoModule = new ElementarySopremoModule(getNumInputs(), getNumOutputs());
        elementarySopremoModule.copyPropertiesFrom(this);
        return elementarySopremoModule;
    }

    public Iterable<? extends ElementaryOperator<?>> getReachableNodes() {
        return super.getReachableNodes();
    }

    public Schema getSchema() {
        HashSet hashSet = new HashSet();
        Iterator<? extends ElementaryOperator<?>> it = getReachableNodes().iterator();
        while (it.hasNext()) {
            hashSet.addAll(it.next().getAllKeyExpressions());
        }
        return new Schema(new ArrayList(hashSet));
    }

    public static ElementarySopremoModule valueOf(Collection<? extends Operator<?>> collection) {
        List<Operator<?>> findInputs = findInputs(collection);
        ElementarySopremoModule elementarySopremoModule = new ElementarySopremoModule(findInputs.size(), collection.size());
        connectOutputs(elementarySopremoModule, collection);
        connectInputs(elementarySopremoModule, findInputs);
        return elementarySopremoModule;
    }

    public static ElementarySopremoModule valueOf(Operator<?>... operatorArr) {
        return valueOf((Collection<? extends Operator<?>>) Arrays.asList(operatorArr));
    }
}
