package eu.stratosphere.test.compiler.iterations;

import eu.stratosphere.api.common.Plan;
import eu.stratosphere.api.common.operators.util.FieldList;
import eu.stratosphere.api.java.record.functions.FunctionAnnotation;
import eu.stratosphere.api.java.record.functions.JoinFunction;
import eu.stratosphere.api.java.record.io.CsvInputFormat;
import eu.stratosphere.api.java.record.io.CsvOutputFormat;
import eu.stratosphere.api.java.record.operators.DeltaIteration;
import eu.stratosphere.api.java.record.operators.FileDataSink;
import eu.stratosphere.api.java.record.operators.FileDataSource;
import eu.stratosphere.api.java.record.operators.JoinOperator;
import eu.stratosphere.api.java.record.operators.MapOperator;
import eu.stratosphere.api.java.record.operators.ReduceOperator;
import eu.stratosphere.compiler.dag.TempMode;
import eu.stratosphere.compiler.plan.DualInputPlanNode;
import eu.stratosphere.compiler.plan.OptimizedPlan;
import eu.stratosphere.compiler.plan.SingleInputPlanNode;
import eu.stratosphere.compiler.plan.SinkPlanNode;
import eu.stratosphere.compiler.plan.SourcePlanNode;
import eu.stratosphere.compiler.plan.WorksetIterationPlanNode;
import eu.stratosphere.compiler.plantranslate.NepheleJobGraphGenerator;
import eu.stratosphere.pact.runtime.shipping.ShipStrategyType;
import eu.stratosphere.pact.runtime.task.DriverStrategy;
import eu.stratosphere.pact.runtime.task.util.LocalStrategy;
import eu.stratosphere.test.compiler.util.CompilerTestBase;
import eu.stratosphere.test.recordJobs.graph.WorksetConnectedComponents;
import eu.stratosphere.types.LongValue;
import eu.stratosphere.types.Record;
import eu.stratosphere.util.Collector;
import java.io.Serializable;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:eu/stratosphere/test/compiler/iterations/ConnectedComponentsTest.class */
public class ConnectedComponentsTest extends CompilerTestBase {
    private static final String VERTEX_SOURCE = "Vertices";
    private static final String ITERATION_NAME = "Connected Components Iteration";
    private static final String EDGES_SOURCE = "Edges";
    private static final String JOIN_NEIGHBORS_MATCH = "Join Candidate Id With Neighbor";
    private static final String MIN_ID_REDUCER = "Find Minimum Candidate Id";
    private static final String UPDATE_ID_MATCH = "Update Component Id";
    private static final String SINK = "Result";
    private static final boolean PRINT_PLAN = false;
    private final FieldList set0 = new FieldList(PRINT_PLAN);

    @FunctionAnnotation.ConstantFieldsSecond({ConnectedComponentsTest.PRINT_PLAN})
    /* loaded from: input_file:eu/stratosphere/test/compiler/iterations/ConnectedComponentsTest$UpdateComponentIdMatchMirrored.class */
    public static final class UpdateComponentIdMatchMirrored extends JoinFunction implements Serializable {
        private static final long serialVersionUID = 1;

        public void join(Record record, Record record2, Collector<Record> collector) {
            if (record2.getField(1, LongValue.class).getValue() < record.getField(1, LongValue.class).getValue()) {
                collector.collect(record2);
            }
        }

        public /* bridge */ /* synthetic */ void join(Object obj, Object obj2, Collector collector) throws Exception {
            join((Record) obj, (Record) obj2, (Collector<Record>) collector);
        }
    }

    @Test
    public void testWorksetConnectedComponents() {
        OptimizedPlan compileNoStats = compileNoStats(new WorksetConnectedComponents().getPlan(String.valueOf(8), IN_FILE, IN_FILE, OUT_FILE, String.valueOf(100)));
        CompilerTestBase.OptimizerPlanNodeResolver optimizerPlanNodeResolver = getOptimizerPlanNodeResolver(compileNoStats);
        SourcePlanNode node = optimizerPlanNodeResolver.getNode(VERTEX_SOURCE);
        SourcePlanNode node2 = optimizerPlanNodeResolver.getNode(EDGES_SOURCE);
        SinkPlanNode node3 = optimizerPlanNodeResolver.getNode(SINK);
        WorksetIterationPlanNode node4 = optimizerPlanNodeResolver.getNode(ITERATION_NAME);
        DualInputPlanNode node5 = optimizerPlanNodeResolver.getNode(JOIN_NEIGHBORS_MATCH);
        SingleInputPlanNode node6 = optimizerPlanNodeResolver.getNode(MIN_ID_REDUCER);
        SingleInputPlanNode predecessor = node6.getPredecessor();
        DualInputPlanNode node7 = optimizerPlanNodeResolver.getNode(UPDATE_ID_MATCH);
        Assert.assertEquals(DriverStrategy.NONE, node3.getDriverStrategy());
        Assert.assertEquals(DriverStrategy.NONE, node.getDriverStrategy());
        Assert.assertEquals(DriverStrategy.NONE, node2.getDriverStrategy());
        Assert.assertEquals(this.set0, node5.getKeysForInput1());
        Assert.assertEquals(this.set0, node5.getKeysForInput2());
        Assert.assertEquals(DriverStrategy.HYBRIDHASH_BUILD_SECOND, node7.getDriverStrategy());
        Assert.assertEquals(this.set0, node7.getKeysForInput1());
        Assert.assertEquals(this.set0, node7.getKeysForInput2());
        Assert.assertEquals(ShipStrategyType.FORWARD, node3.getInput().getShipStrategy());
        Assert.assertEquals(ShipStrategyType.PARTITION_HASH, node4.getInitialSolutionSetInput().getShipStrategy());
        Assert.assertEquals(this.set0, node4.getInitialSolutionSetInput().getShipStrategyKeys());
        Assert.assertEquals(ShipStrategyType.PARTITION_HASH, node4.getInitialWorksetInput().getShipStrategy());
        Assert.assertEquals(this.set0, node4.getInitialWorksetInput().getShipStrategyKeys());
        Assert.assertEquals(ShipStrategyType.FORWARD, node5.getInput1().getShipStrategy());
        Assert.assertEquals(ShipStrategyType.PARTITION_HASH, node5.getInput2().getShipStrategy());
        Assert.assertEquals(this.set0, node5.getInput2().getShipStrategyKeys());
        Assert.assertTrue(node5.getInput2().getTempMode().isCached());
        Assert.assertEquals(ShipStrategyType.PARTITION_HASH, node6.getInput().getShipStrategy());
        Assert.assertEquals(this.set0, node6.getInput().getShipStrategyKeys());
        Assert.assertEquals(ShipStrategyType.FORWARD, predecessor.getInput().getShipStrategy());
        Assert.assertEquals(ShipStrategyType.FORWARD, node7.getInput1().getShipStrategy());
        Assert.assertEquals(ShipStrategyType.FORWARD, node7.getInput2().getShipStrategy());
        Assert.assertEquals(LocalStrategy.NONE, node3.getInput().getLocalStrategy());
        Assert.assertEquals(LocalStrategy.NONE, node4.getInitialSolutionSetInput().getLocalStrategy());
        Assert.assertEquals(LocalStrategy.NONE, node5.getInput1().getLocalStrategy());
        Assert.assertEquals(LocalStrategy.NONE, node5.getInput2().getLocalStrategy());
        Assert.assertEquals(LocalStrategy.COMBININGSORT, node6.getInput().getLocalStrategy());
        Assert.assertEquals(this.set0, node6.getInput().getLocalStrategyKeys());
        Assert.assertEquals(LocalStrategy.NONE, predecessor.getInput().getLocalStrategy());
        Assert.assertEquals(LocalStrategy.NONE, node7.getInput1().getLocalStrategy());
        Assert.assertEquals(LocalStrategy.NONE, node7.getInput2().getLocalStrategy());
        Assert.assertTrue(TempMode.PIPELINE_BREAKER == node4.getInitialWorksetInput().getTempMode() || LocalStrategy.SORT == node4.getInitialWorksetInput().getLocalStrategy());
        new NepheleJobGraphGenerator().compileJobGraph(compileNoStats);
    }

    @Test
    public void testWorksetConnectedComponentsWithSolutionSetAsFirstInput() {
        OptimizedPlan compileNoStats = compileNoStats(getPlanForWorksetConnectedComponentsWithSolutionSetAsFirstInput(8, IN_FILE, IN_FILE, OUT_FILE, 100));
        CompilerTestBase.OptimizerPlanNodeResolver optimizerPlanNodeResolver = getOptimizerPlanNodeResolver(compileNoStats);
        SourcePlanNode node = optimizerPlanNodeResolver.getNode(VERTEX_SOURCE);
        SourcePlanNode node2 = optimizerPlanNodeResolver.getNode(EDGES_SOURCE);
        SinkPlanNode node3 = optimizerPlanNodeResolver.getNode(SINK);
        WorksetIterationPlanNode node4 = optimizerPlanNodeResolver.getNode(ITERATION_NAME);
        DualInputPlanNode node5 = optimizerPlanNodeResolver.getNode(JOIN_NEIGHBORS_MATCH);
        SingleInputPlanNode node6 = optimizerPlanNodeResolver.getNode(MIN_ID_REDUCER);
        SingleInputPlanNode predecessor = node6.getPredecessor();
        DualInputPlanNode node7 = optimizerPlanNodeResolver.getNode(UPDATE_ID_MATCH);
        Assert.assertEquals(DriverStrategy.NONE, node3.getDriverStrategy());
        Assert.assertEquals(DriverStrategy.NONE, node.getDriverStrategy());
        Assert.assertEquals(DriverStrategy.NONE, node2.getDriverStrategy());
        Assert.assertEquals(this.set0, node5.getKeysForInput1());
        Assert.assertEquals(this.set0, node5.getKeysForInput2());
        Assert.assertEquals(DriverStrategy.HYBRIDHASH_BUILD_FIRST, node7.getDriverStrategy());
        Assert.assertEquals(this.set0, node7.getKeysForInput1());
        Assert.assertEquals(this.set0, node7.getKeysForInput2());
        Assert.assertEquals(ShipStrategyType.FORWARD, node3.getInput().getShipStrategy());
        Assert.assertEquals(ShipStrategyType.PARTITION_HASH, node4.getInitialSolutionSetInput().getShipStrategy());
        Assert.assertEquals(this.set0, node4.getInitialSolutionSetInput().getShipStrategyKeys());
        Assert.assertEquals(ShipStrategyType.PARTITION_HASH, node4.getInitialWorksetInput().getShipStrategy());
        Assert.assertEquals(this.set0, node4.getInitialWorksetInput().getShipStrategyKeys());
        Assert.assertEquals(ShipStrategyType.FORWARD, node5.getInput1().getShipStrategy());
        Assert.assertEquals(ShipStrategyType.PARTITION_HASH, node5.getInput2().getShipStrategy());
        Assert.assertEquals(this.set0, node5.getInput2().getShipStrategyKeys());
        Assert.assertTrue(node5.getInput2().getTempMode().isCached());
        Assert.assertEquals(ShipStrategyType.PARTITION_HASH, node6.getInput().getShipStrategy());
        Assert.assertEquals(this.set0, node6.getInput().getShipStrategyKeys());
        Assert.assertEquals(ShipStrategyType.FORWARD, predecessor.getInput().getShipStrategy());
        Assert.assertEquals(ShipStrategyType.FORWARD, node7.getInput1().getShipStrategy());
        Assert.assertEquals(ShipStrategyType.FORWARD, node7.getInput2().getShipStrategy());
        Assert.assertEquals(LocalStrategy.NONE, node3.getInput().getLocalStrategy());
        Assert.assertEquals(LocalStrategy.NONE, node4.getInitialSolutionSetInput().getLocalStrategy());
        Assert.assertEquals(LocalStrategy.NONE, node5.getInput1().getLocalStrategy());
        Assert.assertEquals(LocalStrategy.NONE, node5.getInput2().getLocalStrategy());
        Assert.assertEquals(LocalStrategy.COMBININGSORT, node6.getInput().getLocalStrategy());
        Assert.assertEquals(this.set0, node6.getInput().getLocalStrategyKeys());
        Assert.assertEquals(LocalStrategy.NONE, predecessor.getInput().getLocalStrategy());
        Assert.assertEquals(LocalStrategy.NONE, node7.getInput1().getLocalStrategy());
        Assert.assertEquals(LocalStrategy.NONE, node7.getInput2().getLocalStrategy());
        Assert.assertTrue(TempMode.PIPELINE_BREAKER == node4.getInitialWorksetInput().getTempMode() || LocalStrategy.SORT == node4.getInitialWorksetInput().getLocalStrategy());
        new NepheleJobGraphGenerator().compileJobGraph(compileNoStats);
    }

    private static Plan getPlanForWorksetConnectedComponentsWithSolutionSetAsFirstInput(int i, String str, String str2, String str3, int i2) {
        MapOperator build = MapOperator.builder(WorksetConnectedComponents.DuplicateLongMap.class).input(new FileDataSource(new CsvInputFormat(' ', new Class[]{LongValue.class}), str, VERTEX_SOURCE)).name("Assign Vertex Ids").build();
        DeltaIteration deltaIteration = new DeltaIteration(PRINT_PLAN, ITERATION_NAME);
        deltaIteration.setInitialSolutionSet(build);
        deltaIteration.setInitialWorkset(build);
        deltaIteration.setMaximumNumberOfIterations(i2);
        JoinOperator build2 = JoinOperator.builder(new UpdateComponentIdMatchMirrored(), LongValue.class, PRINT_PLAN, PRINT_PLAN).input1(deltaIteration.getSolutionSet()).input2(ReduceOperator.builder(new WorksetConnectedComponents.MinimumComponentIDReduce(), LongValue.class, PRINT_PLAN).input(JoinOperator.builder(new WorksetConnectedComponents.NeighborWithComponentIDJoin(), LongValue.class, PRINT_PLAN, PRINT_PLAN).input1(deltaIteration.getWorkset()).input2(new FileDataSource(new CsvInputFormat(' ', new Class[]{LongValue.class, LongValue.class}), str2, EDGES_SOURCE)).name(JOIN_NEIGHBORS_MATCH).build()).name(MIN_ID_REDUCER).build()).name(UPDATE_ID_MATCH).build();
        deltaIteration.setNextWorkset(build2);
        deltaIteration.setSolutionSetDelta(build2);
        FileDataSink fileDataSink = new FileDataSink(new CsvOutputFormat(), str3, deltaIteration, SINK);
        ((CsvOutputFormat.ConfigBuilder) ((CsvOutputFormat.ConfigBuilder) ((CsvOutputFormat.ConfigBuilder) CsvOutputFormat.configureRecordFormat(fileDataSink).recordDelimiter('\n')).fieldDelimiter(' ')).field(LongValue.class, PRINT_PLAN)).field(LongValue.class, 1);
        Plan plan = new Plan(fileDataSink, "Workset Connected Components");
        plan.setDefaultParallelism(i);
        return plan;
    }
}
