package eu.stratosphere.test.iterative;

import eu.stratosphere.api.common.Plan;
import eu.stratosphere.api.common.operators.DeltaIteration;
import eu.stratosphere.api.common.operators.FileDataSink;
import eu.stratosphere.api.common.operators.FileDataSource;
import eu.stratosphere.api.common.operators.Operator;
import eu.stratosphere.api.java.record.functions.JoinFunction;
import eu.stratosphere.api.java.record.functions.MapFunction;
import eu.stratosphere.api.java.record.io.CsvInputFormat;
import eu.stratosphere.api.java.record.io.CsvOutputFormat;
import eu.stratosphere.api.java.record.operators.JoinOperator;
import eu.stratosphere.api.java.record.operators.MapOperator;
import eu.stratosphere.api.java.record.operators.ReduceOperator;
import eu.stratosphere.configuration.Configuration;
import eu.stratosphere.test.recordJobs.graph.WorksetConnectedComponents;
import eu.stratosphere.test.testdata.ConnectedComponentsData;
import eu.stratosphere.test.util.RecordAPITestBase;
import eu.stratosphere.types.LongValue;
import eu.stratosphere.types.Record;
import eu.stratosphere.util.Collector;
import java.io.BufferedReader;
import java.io.Serializable;
import java.util.Collection;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(Parameterized.class)
/* loaded from: input_file:eu/stratosphere/test/iterative/ConnectedComponentsWithDeferredUpdateITCase.class */
public class ConnectedComponentsWithDeferredUpdateITCase extends RecordAPITestBase {
    private static final long SEED = 3287269182979823L;
    private static final int NUM_VERTICES = 1000;
    private static final int NUM_EDGES = 10000;
    protected String verticesPath;
    protected String edgesPath;
    protected String resultPath;

    /* loaded from: input_file:eu/stratosphere/test/iterative/ConnectedComponentsWithDeferredUpdateITCase$IdentityMap.class */
    public static final class IdentityMap extends MapFunction {
        private static final long serialVersionUID = 1;

        public void map(Record record, Collector<Record> collector) throws Exception {
            collector.collect(record);
        }

        public /* bridge */ /* synthetic */ void map(Object obj, Collector collector) throws Exception {
            map((Record) obj, (Collector<Record>) collector);
        }
    }

    /* loaded from: input_file:eu/stratosphere/test/iterative/ConnectedComponentsWithDeferredUpdateITCase$UpdateComponentIdMatchNonPreserving.class */
    public static final class UpdateComponentIdMatchNonPreserving extends JoinFunction implements Serializable {
        private static final long serialVersionUID = 1;

        public void join(Record record, Record record2, Collector<Record> collector) {
            if (record.getField(1, LongValue.class).getValue() < record2.getField(1, LongValue.class).getValue()) {
                collector.collect(record);
            }
        }

        public /* bridge */ /* synthetic */ void join(Object obj, Object obj2, Collector collector) throws Exception {
            join((Record) obj, (Record) obj2, (Collector<Record>) collector);
        }
    }

    public ConnectedComponentsWithDeferredUpdateITCase(Configuration configuration) {
        super(configuration);
    }

    protected void preSubmit() throws Exception {
        this.verticesPath = createTempFile("vertices.txt", ConnectedComponentsData.getEnumeratingVertices(NUM_VERTICES));
        this.edgesPath = createTempFile("edges.txt", ConnectedComponentsData.getRandomOddEvenEdges(NUM_EDGES, NUM_VERTICES, SEED));
        this.resultPath = getTempFilePath("results");
    }

    protected Plan getTestJob() {
        return getPlan(4, this.verticesPath, this.edgesPath, this.resultPath, 100, this.config.getBoolean("ExtraMapper", false));
    }

    protected void postSubmit() throws Exception {
        for (BufferedReader bufferedReader : getResultReader(this.resultPath)) {
            ConnectedComponentsData.checkOddEvenResult(bufferedReader);
        }
    }

    @Parameterized.Parameters
    public static Collection<Object[]> getConfigurations() {
        Configuration configuration = new Configuration();
        configuration.setBoolean("ExtraMapper", false);
        Configuration configuration2 = new Configuration();
        configuration2.setBoolean("ExtraMapper", true);
        return toParameterList(new Configuration[]{configuration, configuration2});
    }

    public static Plan getPlan(int i, String str, String str2, String str3, int i2, boolean z) {
        MapOperator build = MapOperator.builder(WorksetConnectedComponents.DuplicateLongMap.class).input(new Operator[]{new FileDataSource(new CsvInputFormat(' ', new Class[]{LongValue.class}), str, "Vertices")}).name("Assign Vertex Ids").build();
        DeltaIteration deltaIteration = new DeltaIteration(0, "Connected Components Iteration");
        deltaIteration.setInitialSolutionSet(build);
        deltaIteration.setInitialWorkset(build);
        deltaIteration.setMaximumNumberOfIterations(i2);
        Operator build2 = JoinOperator.builder(new UpdateComponentIdMatchNonPreserving(), LongValue.class, 0, 0).input1(new Operator[]{ReduceOperator.builder(new WorksetConnectedComponents.MinimumComponentIDReduce(), LongValue.class, 0).input(new Operator[]{JoinOperator.builder(new WorksetConnectedComponents.NeighborWithComponentIDJoin(), LongValue.class, 0, 0).input1(new Operator[]{deltaIteration.getWorkset()}).input2(new Operator[]{new FileDataSource(new CsvInputFormat(' ', new Class[]{LongValue.class, LongValue.class}), str2, "Edges")}).name("Join Candidate Id With Neighbor").build()}).name("Find Minimum Candidate Id").build()}).input2(new Operator[]{deltaIteration.getSolutionSet()}).name("Update Component Id").build();
        if (z) {
            deltaIteration.setSolutionSetDelta(MapOperator.builder(IdentityMap.class).input(new Operator[]{build2}).name("idmap").build());
        } else {
            deltaIteration.setSolutionSetDelta(build2);
        }
        deltaIteration.setNextWorkset(build2);
        Plan plan = new Plan(new FileDataSink(new CsvOutputFormat("\n", " ", new Class[]{LongValue.class, LongValue.class}), str3, deltaIteration, "Result"), "Workset Connected Components");
        plan.setDefaultParallelism(i);
        return plan;
    }
}
