package eu.stratosphere.spargel.java;

import eu.stratosphere.api.common.aggregators.Aggregator;
import eu.stratosphere.api.common.operators.BinaryOperatorInformation;
import eu.stratosphere.api.common.operators.DualInputOperator;
import eu.stratosphere.api.common.operators.DualInputSemanticProperties;
import eu.stratosphere.api.common.operators.Operator;
import eu.stratosphere.api.common.operators.base.CoGroupOperatorBase;
import eu.stratosphere.api.common.operators.base.DeltaIterationBase;
import eu.stratosphere.api.java.DataSet;
import eu.stratosphere.api.java.functions.CoGroupFunction;
import eu.stratosphere.api.java.operators.CustomUnaryOperation;
import eu.stratosphere.api.java.operators.TwoInputOperator;
import eu.stratosphere.api.java.tuple.Tuple;
import eu.stratosphere.api.java.tuple.Tuple2;
import eu.stratosphere.api.java.tuple.Tuple3;
import eu.stratosphere.api.java.typeutils.TupleTypeInfo;
import eu.stratosphere.api.java.typeutils.TypeExtractor;
import eu.stratosphere.configuration.Configuration;
import eu.stratosphere.types.TypeInformation;
import eu.stratosphere.util.Collector;
import java.lang.Comparable;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import org.apache.commons.lang3.Validate;

/* loaded from: input_file:eu/stratosphere/spargel/java/VertexCentricIteration.class */
public class VertexCentricIteration<VertexKey extends Comparable<VertexKey>, VertexValue, Message, EdgeValue> implements CustomUnaryOperation<Tuple2<VertexKey, VertexValue>, Tuple2<VertexKey, VertexValue>> {
    private final VertexUpdateFunction<VertexKey, VertexValue, Message> updateFunction;
    private final MessagingFunction<VertexKey, VertexValue, Message, EdgeValue> messagingFunction;
    private final DataSet<Tuple2<VertexKey, VertexKey>> edgesWithoutValue;
    private final DataSet<Tuple3<VertexKey, VertexKey, EdgeValue>> edgesWithValue;
    private final TypeInformation<Message> messageType;
    private final Map<String, Class<? extends Aggregator<?>>> aggregators;
    private final int maximumNumberOfIterations;
    private DataSet<Tuple2<VertexKey, VertexValue>> initialVertices;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:eu/stratosphere/spargel/java/VertexCentricIteration$GraphIterationOperator.class */
    public static final class GraphIterationOperator<VertexKey extends Comparable<VertexKey>, VertexValue, Message, EdgeType extends Tuple> extends TwoInputOperator<Tuple2<VertexKey, VertexValue>, EdgeType, Tuple2<VertexKey, VertexValue>, GraphIterationOperator<VertexKey, VertexValue, Message, EdgeType>> {
        private final DataSet<EdgeType> edges;
        private final CoGroupFunction<Tuple2<VertexKey, Message>, Tuple2<VertexKey, VertexValue>, Tuple2<VertexKey, VertexValue>> updateFunction;
        private final CoGroupFunction<EdgeType, Tuple2<VertexKey, VertexValue>, Tuple2<VertexKey, Message>> messagingFunction;
        private final TypeInformation<Tuple2<VertexKey, Message>> messageType;
        private final Map<String, Class<? extends Aggregator<?>>> aggregators;
        private final int maximumNumberOfIterations;

        private GraphIterationOperator(DataSet<Tuple2<VertexKey, VertexValue>> dataSet, DataSet<EdgeType> dataSet2, VertexUpdateUdf<VertexKey, VertexValue, Message> vertexUpdateUdf, CoGroupFunction<EdgeType, Tuple2<VertexKey, VertexValue>, Tuple2<VertexKey, Message>> coGroupFunction, TypeInformation<Message> typeInformation, Map<String, Class<? extends Aggregator<?>>> map, int i) {
            super(dataSet, dataSet2, dataSet.getType());
            this.edges = dataSet2;
            this.updateFunction = vertexUpdateUdf;
            this.messagingFunction = coGroupFunction;
            this.aggregators = map;
            this.maximumNumberOfIterations = i;
            this.messageType = new TupleTypeInfo(new TypeInformation[]{dataSet.getType().getTypeAt(0), typeInformation});
        }

        protected DualInputOperator<Tuple2<VertexKey, VertexValue>, Tuple2<VertexKey, VertexValue>, Tuple2<VertexKey, VertexValue>, ?> translateToDataFlow(Operator<Tuple2<VertexKey, VertexValue>> operator, Operator<EdgeType> operator2) {
            int[] iArr = {0};
            DeltaIterationBase deltaIterationBase = new DeltaIterationBase(new BinaryOperatorInformation(getInput1Type(), getInput1Type(), getInput1Type()), iArr, getName() != null ? getName() : "Vertex-centric iteration (" + this.updateFunction + " | " + this.messagingFunction + ")");
            deltaIterationBase.setMaximumNumberOfIterations(this.maximumNumberOfIterations);
            for (Map.Entry<String, Class<? extends Aggregator<?>>> entry : this.aggregators.entrySet()) {
                deltaIterationBase.getAggregators().registerAggregator(entry.getKey(), entry.getValue());
            }
            CoGroupOperatorBase coGroupOperatorBase = new CoGroupOperatorBase(this.messagingFunction, new BinaryOperatorInformation(this.edges.getType(), getInput1Type(), this.messageType), iArr, iArr, "Messaging");
            coGroupOperatorBase.setSecondInput(deltaIterationBase.getWorkset());
            CoGroupOperatorBase coGroupOperatorBase2 = new CoGroupOperatorBase(this.updateFunction, new BinaryOperatorInformation(this.messageType, getInput1Type(), getInput1Type()), iArr, iArr, "Vertex State Updates");
            coGroupOperatorBase2.setFirstInput(coGroupOperatorBase);
            coGroupOperatorBase2.setSecondInput(deltaIterationBase.getSolutionSet());
            DualInputSemanticProperties dualInputSemanticProperties = new DualInputSemanticProperties();
            dualInputSemanticProperties.addForwardedField1(0, 0);
            dualInputSemanticProperties.addForwardedField2(0, 0);
            coGroupOperatorBase2.setSemanticProperties(dualInputSemanticProperties);
            deltaIterationBase.setSolutionSetDelta(coGroupOperatorBase2);
            deltaIterationBase.setNextWorkset(coGroupOperatorBase2);
            deltaIterationBase.setFirstInput(operator);
            deltaIterationBase.setSecondInput(operator);
            coGroupOperatorBase.setFirstInput(operator2);
            return deltaIterationBase;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:eu/stratosphere/spargel/java/VertexCentricIteration$MessagingUdfNoEdgeValues.class */
    public static final class MessagingUdfNoEdgeValues<VertexKey extends Comparable<VertexKey>, VertexValue, Message> extends CoGroupFunction<Tuple2<VertexKey, VertexKey>, Tuple2<VertexKey, VertexValue>, Tuple2<VertexKey, Message>> {
        private static final long serialVersionUID = 1;
        private final MessagingFunction<VertexKey, VertexValue, Message, ?> messagingFunction;

        private MessagingUdfNoEdgeValues(MessagingFunction<VertexKey, VertexValue, Message, ?> messagingFunction) {
            this.messagingFunction = messagingFunction;
        }

        /* JADX WARN: Multi-variable type inference failed */
        public void coGroup(Iterator<Tuple2<VertexKey, VertexKey>> it, Iterator<Tuple2<VertexKey, VertexValue>> it2, Collector<Tuple2<VertexKey, Message>> collector) throws Exception {
            if (it2.hasNext()) {
                Tuple2<VertexKey, VertexValue> next = it2.next();
                this.messagingFunction.set(it, collector);
                this.messagingFunction.sendMessages((Comparable) next.f0, next.f1);
            }
        }

        public void open(Configuration configuration) throws Exception {
            if (getIterationRuntimeContext().getSuperstepNumber() == 1) {
                this.messagingFunction.init(getIterationRuntimeContext(), false);
            }
            this.messagingFunction.preSuperstep();
        }

        public void close() throws Exception {
            this.messagingFunction.postSuperstep();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:eu/stratosphere/spargel/java/VertexCentricIteration$MessagingUdfWithEdgeValues.class */
    public static final class MessagingUdfWithEdgeValues<VertexKey extends Comparable<VertexKey>, VertexValue, Message, EdgeValue> extends CoGroupFunction<Tuple3<VertexKey, VertexKey, EdgeValue>, Tuple2<VertexKey, VertexValue>, Tuple2<VertexKey, Message>> {
        private static final long serialVersionUID = 1;
        private final MessagingFunction<VertexKey, VertexValue, Message, EdgeValue> messagingFunction;

        private MessagingUdfWithEdgeValues(MessagingFunction<VertexKey, VertexValue, Message, EdgeValue> messagingFunction) {
            this.messagingFunction = messagingFunction;
        }

        /* JADX WARN: Multi-variable type inference failed */
        public void coGroup(Iterator<Tuple3<VertexKey, VertexKey, EdgeValue>> it, Iterator<Tuple2<VertexKey, VertexValue>> it2, Collector<Tuple2<VertexKey, Message>> collector) throws Exception {
            if (it2.hasNext()) {
                Tuple2<VertexKey, VertexValue> next = it2.next();
                this.messagingFunction.set(it, collector);
                this.messagingFunction.sendMessages((Comparable) next.f0, next.f1);
            }
        }

        public void open(Configuration configuration) throws Exception {
            if (getIterationRuntimeContext().getSuperstepNumber() == 1) {
                this.messagingFunction.init(getIterationRuntimeContext(), true);
            }
            this.messagingFunction.preSuperstep();
        }

        public void close() throws Exception {
            this.messagingFunction.postSuperstep();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:eu/stratosphere/spargel/java/VertexCentricIteration$VertexUpdateUdf.class */
    public static final class VertexUpdateUdf<VertexKey extends Comparable<VertexKey>, VertexValue, Message> extends CoGroupFunction<Tuple2<VertexKey, Message>, Tuple2<VertexKey, VertexValue>, Tuple2<VertexKey, VertexValue>> {
        private static final long serialVersionUID = 1;
        private final VertexUpdateFunction<VertexKey, VertexValue, Message> vertexUpdateFunction;
        private final MessageIterator<Message> messageIter;

        private VertexUpdateUdf(VertexUpdateFunction<VertexKey, VertexValue, Message> vertexUpdateFunction) {
            this.messageIter = new MessageIterator<>();
            this.vertexUpdateFunction = vertexUpdateFunction;
        }

        /* JADX WARN: Multi-variable type inference failed */
        public void coGroup(Iterator<Tuple2<VertexKey, Message>> it, Iterator<Tuple2<VertexKey, VertexValue>> it2, Collector<Tuple2<VertexKey, VertexValue>> collector) throws Exception {
            if (it2.hasNext()) {
                Tuple2<VertexKey, VertexValue> next = it2.next();
                this.messageIter.setSource(it);
                this.vertexUpdateFunction.setOutput(next, collector);
                this.vertexUpdateFunction.updateVertex((Comparable) next.f0, next.f1, this.messageIter);
                return;
            }
            if (!it.hasNext()) {
                throw new Exception();
            }
            String str = "Target vertex does not exist!.";
            try {
                str = "Target vertex '" + it.next().f0 + "' does not exist!.";
            } catch (Throwable th) {
            }
            throw new Exception(str);
        }

        public void open(Configuration configuration) throws Exception {
            if (getIterationRuntimeContext().getSuperstepNumber() == 1) {
                this.vertexUpdateFunction.init(getIterationRuntimeContext());
            }
            this.vertexUpdateFunction.preSuperstep();
        }

        public void close() throws Exception {
            this.vertexUpdateFunction.postSuperstep();
        }
    }

    private VertexCentricIteration(VertexUpdateFunction<VertexKey, VertexValue, Message> vertexUpdateFunction, MessagingFunction<VertexKey, VertexValue, Message, EdgeValue> messagingFunction, DataSet<Tuple2<VertexKey, VertexKey>> dataSet, int i) {
        TupleTypeInfo type = dataSet.getType();
        Validate.isTrue(type.isTupleType() && type.getArity() == 2, "The edges data set (for edges without edge values) must consist of 2-tuples.", new Object[0]);
        TupleTypeInfo tupleTypeInfo = type;
        Validate.isTrue(tupleTypeInfo.getTypeAt(0).equals(tupleTypeInfo.getTypeAt(1)) && Comparable.class.isAssignableFrom(tupleTypeInfo.getTypeAt(0).getTypeClass()), "Both tuple fields (source and target vertex id) must be of the data type that represents the vertex key and implement the java.lang.Comparable interface.", new Object[0]);
        this.updateFunction = vertexUpdateFunction;
        this.messagingFunction = messagingFunction;
        this.edgesWithoutValue = dataSet;
        this.edgesWithValue = null;
        this.maximumNumberOfIterations = i;
        this.aggregators = new HashMap();
        this.messageType = getMessageType(messagingFunction);
    }

    private VertexCentricIteration(VertexUpdateFunction<VertexKey, VertexValue, Message> vertexUpdateFunction, MessagingFunction<VertexKey, VertexValue, Message, EdgeValue> messagingFunction, DataSet<Tuple3<VertexKey, VertexKey, EdgeValue>> dataSet, int i, boolean z) {
        TupleTypeInfo type = dataSet.getType();
        Validate.isTrue(type.isTupleType() && type.getArity() == 3, "The edges data set (for edges with edge values) must consist of 3-tuples.", new Object[0]);
        TupleTypeInfo tupleTypeInfo = type;
        Validate.isTrue(tupleTypeInfo.getTypeAt(0).equals(tupleTypeInfo.getTypeAt(1)) && Comparable.class.isAssignableFrom(tupleTypeInfo.getTypeAt(0).getTypeClass()), "The first two tuple fields (source and target vertex id) must be of the data type that represents the vertex key and implement the java.lang.Comparable interface.", new Object[0]);
        this.updateFunction = vertexUpdateFunction;
        this.messagingFunction = messagingFunction;
        this.edgesWithoutValue = null;
        this.edgesWithValue = dataSet;
        this.maximumNumberOfIterations = i;
        this.aggregators = new HashMap();
        this.messageType = getMessageType(messagingFunction);
    }

    private TypeInformation<Message> getMessageType(MessagingFunction<VertexKey, VertexValue, Message, EdgeValue> messagingFunction) {
        return TypeExtractor.createTypeInfo(MessagingFunction.class, messagingFunction.getClass(), 2, (TypeInformation) null, (TypeInformation) null);
    }

    public void registerAggregator(String str, Class<? extends Aggregator<?>> cls) {
        this.aggregators.put(str, cls);
    }

    public void setInput(DataSet<Tuple2<VertexKey, VertexValue>> dataSet) {
        TupleTypeInfo type = dataSet.getType();
        Validate.isTrue(type.isTupleType() && type.getArity() == 2, "The input data set (the initial vertices) must consist of 2-tuples.", new Object[0]);
        TypeInformation typeAt = type.getTypeAt(0);
        TypeInformation typeAt2 = ((TupleTypeInfo) (this.edgesWithoutValue != null ? this.edgesWithoutValue.getType() : this.edgesWithValue.getType())).getTypeAt(0);
        Validate.isTrue(typeAt.equals(typeAt2), "The first tuple field (the vertex id) of the input data set (the initial vertices) must be the same data type as the first fields of the edge data set (the source vertex id). Here, the key type for the vertex ids is '%s' and the key type  for the edges is '%s'.", new Object[]{typeAt, typeAt2});
        this.initialVertices = dataSet;
    }

    /* renamed from: createOperator, reason: merged with bridge method [inline-methods] */
    public GraphIterationOperator<VertexKey, VertexValue, Message, ?> m0createOperator() {
        VertexUpdateUdf vertexUpdateUdf = new VertexUpdateUdf(this.updateFunction);
        if (this.edgesWithoutValue != null) {
            return new GraphIterationOperator<>(this.initialVertices, this.edgesWithoutValue, vertexUpdateUdf, new MessagingUdfNoEdgeValues(this.messagingFunction), this.messageType, this.aggregators, this.maximumNumberOfIterations);
        }
        return new GraphIterationOperator<>(this.initialVertices, this.edgesWithValue, vertexUpdateUdf, new MessagingUdfWithEdgeValues(this.messagingFunction), this.messageType, this.aggregators, this.maximumNumberOfIterations);
    }

    public static final <VertexKey extends Comparable<VertexKey>, VertexValue, Message> VertexCentricIteration<VertexKey, VertexValue, Message, ?> withPlainEdges(DataSet<Tuple2<VertexKey, VertexKey>> dataSet, VertexUpdateFunction<VertexKey, VertexValue, Message> vertexUpdateFunction, MessagingFunction<VertexKey, VertexValue, Message, ?> messagingFunction, int i) {
        return new VertexCentricIteration<>(vertexUpdateFunction, messagingFunction, dataSet, i);
    }

    public static final <VertexKey extends Comparable<VertexKey>, VertexValue, Message, EdgeValue> VertexCentricIteration<VertexKey, VertexValue, Message, EdgeValue> withValuedEdges(DataSet<Tuple3<VertexKey, VertexKey, EdgeValue>> dataSet, VertexUpdateFunction<VertexKey, VertexValue, Message> vertexUpdateFunction, MessagingFunction<VertexKey, VertexValue, Message, EdgeValue> messagingFunction, int i) {
        return new VertexCentricIteration<>(vertexUpdateFunction, messagingFunction, dataSet, i, true);
    }
}
