package eu.stratosphere.test.iterative.nephele;

import eu.stratosphere.api.common.aggregators.LongSumAggregator;
import eu.stratosphere.api.common.operators.util.UserCodeClassWrapper;
import eu.stratosphere.api.common.typeutils.TypeComparatorFactory;
import eu.stratosphere.api.common.typeutils.TypePairComparatorFactory;
import eu.stratosphere.api.common.typeutils.TypeSerializerFactory;
import eu.stratosphere.api.java.record.functions.MapFunction;
import eu.stratosphere.api.java.record.io.CsvInputFormat;
import eu.stratosphere.api.java.record.io.CsvOutputFormat;
import eu.stratosphere.configuration.Configuration;
import eu.stratosphere.nephele.io.DistributionPattern;
import eu.stratosphere.nephele.io.channels.ChannelType;
import eu.stratosphere.nephele.jobgraph.JobGraph;
import eu.stratosphere.nephele.jobgraph.JobGraphDefinitionException;
import eu.stratosphere.nephele.jobgraph.JobInputVertex;
import eu.stratosphere.nephele.jobgraph.JobOutputVertex;
import eu.stratosphere.nephele.jobgraph.JobTaskVertex;
import eu.stratosphere.pact.runtime.iterative.convergence.WorksetEmptyConvergenceCriterion;
import eu.stratosphere.pact.runtime.iterative.task.IterationHeadPactTask;
import eu.stratosphere.pact.runtime.iterative.task.IterationIntermediatePactTask;
import eu.stratosphere.pact.runtime.iterative.task.IterationTailPactTask;
import eu.stratosphere.pact.runtime.plugable.pactrecord.RecordComparatorFactory;
import eu.stratosphere.pact.runtime.plugable.pactrecord.RecordPairComparatorFactory;
import eu.stratosphere.pact.runtime.plugable.pactrecord.RecordSerializerFactory;
import eu.stratosphere.pact.runtime.shipping.ShipStrategyType;
import eu.stratosphere.pact.runtime.task.BuildSecondCachedMatchDriver;
import eu.stratosphere.pact.runtime.task.CollectorMapDriver;
import eu.stratosphere.pact.runtime.task.DriverStrategy;
import eu.stratosphere.pact.runtime.task.GroupReduceDriver;
import eu.stratosphere.pact.runtime.task.JoinWithSolutionSetSecondDriver;
import eu.stratosphere.pact.runtime.task.chaining.ChainedCollectorMapDriver;
import eu.stratosphere.pact.runtime.task.util.LocalStrategy;
import eu.stratosphere.pact.runtime.task.util.TaskConfig;
import eu.stratosphere.test.recordJobs.graph.WorksetConnectedComponents;
import eu.stratosphere.test.testdata.ConnectedComponentsData;
import eu.stratosphere.test.util.RecordAPITestBase;
import eu.stratosphere.types.LongValue;
import eu.stratosphere.types.Record;
import eu.stratosphere.util.Collector;
import java.io.BufferedReader;
import java.util.Collection;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(Parameterized.class)
/* loaded from: input_file:eu/stratosphere/test/iterative/nephele/ConnectedComponentsNepheleITCase.class */
public class ConnectedComponentsNepheleITCase extends RecordAPITestBase {
    private static final long SEED = 3287269182979823L;
    private static final int NUM_VERTICES = 1000;
    private static final int NUM_EDGES = 10000;
    private static final int ITERATION_ID = 1;
    private static final long MEM_PER_CONSUMER = 3;
    protected String verticesPath;
    protected String edgesPath;
    protected String resultPath;

    /* loaded from: input_file:eu/stratosphere/test/iterative/nephele/ConnectedComponentsNepheleITCase$DummyMapper.class */
    public static final class DummyMapper extends MapFunction {
        private static final long serialVersionUID = 1;

        public void map(Record record, Collector<Record> collector) {
            collector.collect(record);
        }

        public /* bridge */ /* synthetic */ void map(Object obj, Collector collector) throws Exception {
            map((Record) obj, (Collector<Record>) collector);
        }
    }

    /* loaded from: input_file:eu/stratosphere/test/iterative/nephele/ConnectedComponentsNepheleITCase$IdDuplicator.class */
    public static final class IdDuplicator extends MapFunction {
        private static final long serialVersionUID = 1;

        public void map(Record record, Collector<Record> collector) throws Exception {
            record.setField(ConnectedComponentsNepheleITCase.ITERATION_ID, record.getField(0, LongValue.class));
            collector.collect(record);
        }

        public /* bridge */ /* synthetic */ void map(Object obj, Collector collector) throws Exception {
            map((Record) obj, (Collector<Record>) collector);
        }
    }

    public ConnectedComponentsNepheleITCase(Configuration configuration) {
        super(configuration);
    }

    @Parameterized.Parameters
    public static Collection<Object[]> getConfigurations() {
        Configuration configuration = new Configuration();
        configuration.setInteger("testcase", ITERATION_ID);
        Configuration configuration2 = new Configuration();
        configuration2.setInteger("testcase", 2);
        Configuration configuration3 = new Configuration();
        configuration3.setInteger("testcase", 3);
        Configuration configuration4 = new Configuration();
        configuration4.setInteger("testcase", 4);
        return toParameterList(new Configuration[]{configuration, configuration2, configuration3, configuration4});
    }

    protected void preSubmit() throws Exception {
        this.verticesPath = createTempFile("vertices.txt", ConnectedComponentsData.getEnumeratingVertices(NUM_VERTICES));
        this.edgesPath = createTempFile("edges.txt", ConnectedComponentsData.getRandomOddEvenEdges(NUM_EDGES, NUM_VERTICES, SEED));
        this.resultPath = getTempFilePath("results");
    }

    protected JobGraph getJobGraph() throws Exception {
        switch (this.config.getInteger("testcase", 0)) {
            case ITERATION_ID /* 1 */:
                return createJobGraphUnifiedTails(this.verticesPath, this.edgesPath, this.resultPath, 4, 100);
            case 2:
                return createJobGraphSeparateTails(this.verticesPath, this.edgesPath, this.resultPath, 4, 100);
            case 3:
                return createJobGraphIntermediateWorksetUpdateAndSolutionSetTail(this.verticesPath, this.edgesPath, this.resultPath, 4, 100);
            case 4:
                return createJobGraphSolutionSetUpdateAndWorksetTail(this.verticesPath, this.edgesPath, this.resultPath, 4, 100);
            default:
                throw new RuntimeException("Broken test configuration");
        }
    }

    protected void postSubmit() throws Exception {
        BufferedReader[] resultReader = getResultReader(this.resultPath);
        int length = resultReader.length;
        for (int i = 0; i < length; i += ITERATION_ID) {
            ConnectedComponentsData.checkOddEvenResult(resultReader[i]);
        }
    }

    private static JobInputVertex createVerticesInput(JobGraph jobGraph, String str, int i, TypeSerializerFactory<?> typeSerializerFactory, TypeComparatorFactory<?> typeComparatorFactory) {
        JobInputVertex createInput = JobGraphUtils.createInput(new CsvInputFormat(' ', new Class[]{LongValue.class}), str, "VerticesInput", jobGraph, i, i);
        TaskConfig taskConfig = new TaskConfig(createInput.getConfiguration());
        taskConfig.addOutputShipStrategy(ShipStrategyType.FORWARD);
        taskConfig.setOutputSerializer(typeSerializerFactory);
        TaskConfig taskConfig2 = new TaskConfig(new Configuration());
        taskConfig2.setStubWrapper(new UserCodeClassWrapper(IdDuplicator.class));
        taskConfig2.setDriverStrategy(DriverStrategy.COLLECTOR_MAP);
        taskConfig2.setInputLocalStrategy(0, LocalStrategy.NONE);
        taskConfig2.setInputSerializer(typeSerializerFactory, 0);
        taskConfig2.setOutputSerializer(typeSerializerFactory);
        taskConfig2.addOutputShipStrategy(ShipStrategyType.PARTITION_HASH);
        taskConfig2.addOutputShipStrategy(ShipStrategyType.PARTITION_HASH);
        taskConfig2.setOutputComparator(typeComparatorFactory, 0);
        taskConfig2.setOutputComparator(typeComparatorFactory, ITERATION_ID);
        taskConfig.addChainedTask(ChainedCollectorMapDriver.class, taskConfig2, "ID Duplicator");
        return createInput;
    }

    private static JobInputVertex createEdgesInput(JobGraph jobGraph, String str, int i, TypeSerializerFactory<?> typeSerializerFactory, TypeComparatorFactory<?> typeComparatorFactory) {
        JobInputVertex createInput = JobGraphUtils.createInput(new CsvInputFormat(' ', new Class[]{LongValue.class, LongValue.class}), str, "EdgesInput", jobGraph, i, i);
        TaskConfig taskConfig = new TaskConfig(createInput.getConfiguration());
        taskConfig.setOutputSerializer(typeSerializerFactory);
        taskConfig.addOutputShipStrategy(ShipStrategyType.PARTITION_HASH);
        taskConfig.setOutputComparator(typeComparatorFactory, 0);
        return createInput;
    }

    private static JobTaskVertex createIterationHead(JobGraph jobGraph, int i, TypeSerializerFactory<?> typeSerializerFactory, TypeComparatorFactory<?> typeComparatorFactory, TypePairComparatorFactory<?, ?> typePairComparatorFactory) {
        JobTaskVertex createTask = JobGraphUtils.createTask(IterationHeadPactTask.class, "Join With Edges (Iteration Head)", jobGraph, i, i);
        TaskConfig taskConfig = new TaskConfig(createTask.getConfiguration());
        taskConfig.setIterationId(ITERATION_ID);
        taskConfig.addInputToGroup(0);
        taskConfig.setInputSerializer(typeSerializerFactory, 0);
        taskConfig.setInputComparator(typeComparatorFactory, 0);
        taskConfig.setInputLocalStrategy(0, LocalStrategy.NONE);
        taskConfig.setIterationHeadPartialSolutionOrWorksetInputIndex(0);
        taskConfig.addInputToGroup(ITERATION_ID);
        taskConfig.setInputSerializer(typeSerializerFactory, ITERATION_ID);
        taskConfig.setInputComparator(typeComparatorFactory, ITERATION_ID);
        taskConfig.setInputLocalStrategy(ITERATION_ID, LocalStrategy.NONE);
        taskConfig.setInputCached(ITERATION_ID, true);
        taskConfig.setInputMaterializationMemory(ITERATION_ID, 3145728L);
        taskConfig.addInputToGroup(2);
        taskConfig.setInputSerializer(typeSerializerFactory, 2);
        taskConfig.setInputComparator(typeComparatorFactory, 2);
        taskConfig.setInputLocalStrategy(2, LocalStrategy.NONE);
        taskConfig.setIterationHeadSolutionSetInputIndex(2);
        taskConfig.setSolutionSetSerializer(typeSerializerFactory);
        taskConfig.setSolutionSetComparator(typeComparatorFactory);
        taskConfig.setIsWorksetIteration();
        taskConfig.setBackChannelMemory(3145728L);
        taskConfig.setSolutionSetMemory(3145728L);
        taskConfig.setOutputSerializer(typeSerializerFactory);
        taskConfig.addOutputShipStrategy(ShipStrategyType.PARTITION_HASH);
        taskConfig.setOutputComparator(typeComparatorFactory, 0);
        TaskConfig taskConfig2 = new TaskConfig(new Configuration());
        taskConfig2.setOutputSerializer(typeSerializerFactory);
        taskConfig2.addOutputShipStrategy(ShipStrategyType.FORWARD);
        taskConfig.setIterationHeadFinalOutputConfig(taskConfig2);
        taskConfig.setIterationHeadIndexOfSyncOutput(2);
        taskConfig.setDriver(BuildSecondCachedMatchDriver.class);
        taskConfig.setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_SECOND);
        taskConfig.setStubWrapper(new UserCodeClassWrapper(WorksetConnectedComponents.NeighborWithComponentIDJoin.class));
        taskConfig.setDriverComparator(typeComparatorFactory, 0);
        taskConfig.setDriverComparator(typeComparatorFactory, ITERATION_ID);
        taskConfig.setDriverPairComparator(typePairComparatorFactory);
        taskConfig.setMemoryDriver(3145728L);
        taskConfig.addIterationAggregator("pact.runtime.workset-empty-aggregator", LongSumAggregator.class);
        return createTask;
    }

    private static JobTaskVertex createIterationIntermediate(JobGraph jobGraph, int i, TypeSerializerFactory<?> typeSerializerFactory, TypeComparatorFactory<?> typeComparatorFactory) {
        JobTaskVertex createTask = JobGraphUtils.createTask(IterationIntermediatePactTask.class, "Find Min Component-ID", jobGraph, i, i);
        TaskConfig taskConfig = new TaskConfig(createTask.getConfiguration());
        taskConfig.setIterationId(ITERATION_ID);
        taskConfig.addInputToGroup(0);
        taskConfig.setInputSerializer(typeSerializerFactory, 0);
        taskConfig.setInputComparator(typeComparatorFactory, 0);
        taskConfig.setInputLocalStrategy(0, LocalStrategy.SORT);
        taskConfig.setMemoryInput(0, 3145728L);
        taskConfig.setFilehandlesInput(0, 64);
        taskConfig.setSpillingThresholdInput(0, 0.85f);
        taskConfig.setOutputSerializer(typeSerializerFactory);
        taskConfig.addOutputShipStrategy(ShipStrategyType.FORWARD);
        taskConfig.setDriver(GroupReduceDriver.class);
        taskConfig.setDriverStrategy(DriverStrategy.SORTED_GROUP_REDUCE);
        taskConfig.setDriverComparator(typeComparatorFactory, 0);
        taskConfig.setStubWrapper(new UserCodeClassWrapper(WorksetConnectedComponents.MinimumComponentIDReduce.class));
        return createTask;
    }

    private static JobOutputVertex createOutput(JobGraph jobGraph, String str, int i, TypeSerializerFactory<?> typeSerializerFactory) {
        JobOutputVertex createFileOutput = JobGraphUtils.createFileOutput(jobGraph, "Final Output", i, i);
        TaskConfig taskConfig = new TaskConfig(createFileOutput.getConfiguration());
        taskConfig.addInputToGroup(0);
        taskConfig.setInputSerializer(typeSerializerFactory, 0);
        taskConfig.setStubWrapper(new UserCodeClassWrapper(CsvOutputFormat.class));
        taskConfig.setStubParameter("stratosphere.output.file", str);
        Configuration stubParameters = taskConfig.getStubParameters();
        stubParameters.setString("output.record.delimiter", "\n");
        stubParameters.setString("output.record.field-delimiter", " ");
        stubParameters.setClass("output.record.type_0", LongValue.class);
        stubParameters.setInteger("output.record.position_0", 0);
        stubParameters.setClass("output.record.type_1", LongValue.class);
        stubParameters.setInteger("output.record.position_1", ITERATION_ID);
        stubParameters.setInteger("output.record.num-fields", 2);
        return createFileOutput;
    }

    private static JobOutputVertex createFakeTail(JobGraph jobGraph, int i) {
        return JobGraphUtils.createFakeOutput(jobGraph, "FakeTailOutput", i, i);
    }

    private static JobOutputVertex createSync(JobGraph jobGraph, int i, int i2) {
        JobOutputVertex createSync = JobGraphUtils.createSync(jobGraph, i);
        TaskConfig taskConfig = new TaskConfig(createSync.getConfiguration());
        taskConfig.setNumberOfIterations(i2);
        taskConfig.setIterationId(ITERATION_ID);
        taskConfig.addIterationAggregator("pact.runtime.workset-empty-aggregator", LongSumAggregator.class);
        taskConfig.setConvergenceCriterion("pact.runtime.workset-empty-aggregator", WorksetEmptyConvergenceCriterion.class);
        return createSync;
    }

    public JobGraph createJobGraphUnifiedTails(String str, String str2, String str3, int i, int i2) throws JobGraphDefinitionException {
        RecordSerializerFactory recordSerializerFactory = RecordSerializerFactory.get();
        RecordComparatorFactory recordComparatorFactory = new RecordComparatorFactory(new int[]{0}, new Class[]{LongValue.class}, new boolean[]{ITERATION_ID});
        RecordPairComparatorFactory recordPairComparatorFactory = RecordPairComparatorFactory.get();
        JobGraph jobGraph = new JobGraph("Connected Components (Unified Tails)");
        JobInputVertex createVerticesInput = createVerticesInput(jobGraph, str, i, recordSerializerFactory, recordComparatorFactory);
        JobInputVertex createEdgesInput = createEdgesInput(jobGraph, str2, i, recordSerializerFactory, recordComparatorFactory);
        JobTaskVertex createIterationHead = createIterationHead(jobGraph, i, recordSerializerFactory, recordComparatorFactory, recordPairComparatorFactory);
        JobTaskVertex createIterationIntermediate = createIterationIntermediate(jobGraph, i, recordSerializerFactory, recordComparatorFactory);
        TaskConfig taskConfig = new TaskConfig(createIterationIntermediate.getConfiguration());
        JobOutputVertex createOutput = createOutput(jobGraph, str3, i, recordSerializerFactory);
        JobOutputVertex createFakeTail = createFakeTail(jobGraph, i);
        JobOutputVertex createSync = createSync(jobGraph, i, i2);
        JobTaskVertex createTask = JobGraphUtils.createTask(IterationTailPactTask.class, "IterationTail", jobGraph, i, i);
        TaskConfig taskConfig2 = new TaskConfig(createTask.getConfiguration());
        taskConfig2.setIterationId(ITERATION_ID);
        taskConfig2.setIsWorksetIteration();
        taskConfig2.setIsWorksetUpdate();
        taskConfig2.setIsSolutionSetUpdate();
        taskConfig2.setIsSolutionSetUpdateWithoutReprobe();
        taskConfig2.addInputToGroup(0);
        taskConfig2.setInputSerializer(recordSerializerFactory, 0);
        taskConfig2.addOutputShipStrategy(ShipStrategyType.FORWARD);
        taskConfig2.setOutputSerializer(recordSerializerFactory);
        taskConfig2.setDriver(JoinWithSolutionSetSecondDriver.class);
        taskConfig2.setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_SECOND);
        taskConfig2.setDriverComparator(recordComparatorFactory, 0);
        taskConfig2.setDriverPairComparator(recordPairComparatorFactory);
        taskConfig2.setStubWrapper(new UserCodeClassWrapper(WorksetConnectedComponents.UpdateComponentIdMatch.class));
        JobGraphUtils.connect(createVerticesInput, createIterationHead, ChannelType.NETWORK, DistributionPattern.BIPARTITE);
        JobGraphUtils.connect(createEdgesInput, createIterationHead, ChannelType.NETWORK, DistributionPattern.BIPARTITE);
        JobGraphUtils.connect(createVerticesInput, createIterationHead, ChannelType.NETWORK, DistributionPattern.BIPARTITE);
        JobGraphUtils.connect(createIterationHead, createIterationIntermediate, ChannelType.NETWORK, DistributionPattern.BIPARTITE);
        taskConfig.setGateIterativeWithNumberOfEventsUntilInterrupt(0, i);
        JobGraphUtils.connect(createIterationIntermediate, createTask, ChannelType.INMEMORY, DistributionPattern.POINTWISE);
        taskConfig2.setGateIterativeWithNumberOfEventsUntilInterrupt(0, ITERATION_ID);
        JobGraphUtils.connect(createIterationHead, createOutput, ChannelType.INMEMORY, DistributionPattern.POINTWISE);
        JobGraphUtils.connect(createTask, createFakeTail, ChannelType.INMEMORY, DistributionPattern.POINTWISE);
        JobGraphUtils.connect(createIterationHead, createSync, ChannelType.NETWORK, DistributionPattern.POINTWISE);
        createVerticesInput.setVertexToShareInstancesWith(createIterationHead);
        createEdgesInput.setVertexToShareInstancesWith(createIterationHead);
        createIterationIntermediate.setVertexToShareInstancesWith(createIterationHead);
        createTask.setVertexToShareInstancesWith(createIterationHead);
        createOutput.setVertexToShareInstancesWith(createIterationHead);
        createSync.setVertexToShareInstancesWith(createIterationHead);
        createFakeTail.setVertexToShareInstancesWith(createTask);
        return jobGraph;
    }

    public JobGraph createJobGraphSeparateTails(String str, String str2, String str3, int i, int i2) throws JobGraphDefinitionException {
        RecordSerializerFactory recordSerializerFactory = RecordSerializerFactory.get();
        RecordComparatorFactory recordComparatorFactory = new RecordComparatorFactory(new int[]{0}, new Class[]{LongValue.class}, new boolean[]{ITERATION_ID});
        RecordPairComparatorFactory recordPairComparatorFactory = RecordPairComparatorFactory.get();
        JobGraph jobGraph = new JobGraph("Connected Components (Unified Tails)");
        JobInputVertex createVerticesInput = createVerticesInput(jobGraph, str, i, recordSerializerFactory, recordComparatorFactory);
        JobInputVertex createEdgesInput = createEdgesInput(jobGraph, str2, i, recordSerializerFactory, recordComparatorFactory);
        JobTaskVertex createIterationHead = createIterationHead(jobGraph, i, recordSerializerFactory, recordComparatorFactory, recordPairComparatorFactory);
        new TaskConfig(createIterationHead.getConfiguration()).setWaitForSolutionSetUpdate();
        JobTaskVertex createIterationIntermediate = createIterationIntermediate(jobGraph, i, recordSerializerFactory, recordComparatorFactory);
        TaskConfig taskConfig = new TaskConfig(createIterationIntermediate.getConfiguration());
        JobOutputVertex createOutput = createOutput(jobGraph, str3, i, recordSerializerFactory);
        JobOutputVertex createFakeTail = createFakeTail(jobGraph, i);
        JobOutputVertex createFakeTail2 = createFakeTail(jobGraph, i);
        JobOutputVertex createSync = createSync(jobGraph, i, i2);
        JobTaskVertex createTask = JobGraphUtils.createTask(IterationIntermediatePactTask.class, "Solution Set Join", jobGraph, i, i);
        TaskConfig taskConfig2 = new TaskConfig(createTask.getConfiguration());
        taskConfig2.setIterationId(ITERATION_ID);
        taskConfig2.addInputToGroup(0);
        taskConfig2.setInputSerializer(recordSerializerFactory, 0);
        taskConfig2.addOutputShipStrategy(ShipStrategyType.FORWARD);
        taskConfig2.addOutputShipStrategy(ShipStrategyType.FORWARD);
        taskConfig2.setOutputComparator(recordComparatorFactory, 0);
        taskConfig2.setOutputComparator(recordComparatorFactory, ITERATION_ID);
        taskConfig2.setOutputSerializer(recordSerializerFactory);
        taskConfig2.setDriver(JoinWithSolutionSetSecondDriver.class);
        taskConfig2.setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_SECOND);
        taskConfig2.setDriverComparator(recordComparatorFactory, 0);
        taskConfig2.setDriverPairComparator(recordPairComparatorFactory);
        taskConfig2.setStubWrapper(new UserCodeClassWrapper(WorksetConnectedComponents.UpdateComponentIdMatch.class));
        JobTaskVertex createTask2 = JobGraphUtils.createTask(IterationTailPactTask.class, "IterationSolutionSetTail", jobGraph, i, i);
        TaskConfig taskConfig3 = new TaskConfig(createTask2.getConfiguration());
        taskConfig3.setIterationId(ITERATION_ID);
        taskConfig3.setIsSolutionSetUpdate();
        taskConfig3.setIsWorksetIteration();
        taskConfig3.addInputToGroup(0);
        taskConfig3.setInputSerializer(recordSerializerFactory, 0);
        taskConfig3.setInputAsynchronouslyMaterialized(0, true);
        taskConfig3.setInputMaterializationMemory(0, 3145728L);
        taskConfig3.addOutputShipStrategy(ShipStrategyType.FORWARD);
        taskConfig3.setOutputSerializer(recordSerializerFactory);
        taskConfig3.setDriver(CollectorMapDriver.class);
        taskConfig3.setDriverStrategy(DriverStrategy.COLLECTOR_MAP);
        taskConfig3.setStubWrapper(new UserCodeClassWrapper(DummyMapper.class));
        JobTaskVertex createTask3 = JobGraphUtils.createTask(IterationTailPactTask.class, "IterationWorksetTail", jobGraph, i, i);
        TaskConfig taskConfig4 = new TaskConfig(createTask3.getConfiguration());
        taskConfig4.setIterationId(ITERATION_ID);
        taskConfig4.setIsWorksetIteration();
        taskConfig4.setIsWorksetUpdate();
        taskConfig4.addInputToGroup(0);
        taskConfig4.setInputSerializer(recordSerializerFactory, 0);
        taskConfig4.addOutputShipStrategy(ShipStrategyType.FORWARD);
        taskConfig4.setOutputSerializer(recordSerializerFactory);
        taskConfig4.setDriver(CollectorMapDriver.class);
        taskConfig4.setDriverStrategy(DriverStrategy.COLLECTOR_MAP);
        taskConfig4.setStubWrapper(new UserCodeClassWrapper(DummyMapper.class));
        JobGraphUtils.connect(createVerticesInput, createIterationHead, ChannelType.NETWORK, DistributionPattern.BIPARTITE);
        JobGraphUtils.connect(createEdgesInput, createIterationHead, ChannelType.NETWORK, DistributionPattern.BIPARTITE);
        JobGraphUtils.connect(createVerticesInput, createIterationHead, ChannelType.NETWORK, DistributionPattern.BIPARTITE);
        JobGraphUtils.connect(createIterationHead, createIterationIntermediate, ChannelType.NETWORK, DistributionPattern.BIPARTITE);
        taskConfig.setGateIterativeWithNumberOfEventsUntilInterrupt(0, i);
        JobGraphUtils.connect(createIterationIntermediate, createTask, ChannelType.NETWORK, DistributionPattern.POINTWISE);
        taskConfig2.setGateIterativeWithNumberOfEventsUntilInterrupt(0, ITERATION_ID);
        JobGraphUtils.connect(createTask, createTask2, ChannelType.INMEMORY, DistributionPattern.POINTWISE);
        taskConfig3.setGateIterativeWithNumberOfEventsUntilInterrupt(0, ITERATION_ID);
        JobGraphUtils.connect(createTask, createTask3, ChannelType.INMEMORY, DistributionPattern.POINTWISE);
        taskConfig4.setGateIterativeWithNumberOfEventsUntilInterrupt(0, ITERATION_ID);
        JobGraphUtils.connect(createIterationHead, createOutput, ChannelType.INMEMORY, DistributionPattern.POINTWISE);
        JobGraphUtils.connect(createTask2, createFakeTail, ChannelType.INMEMORY, DistributionPattern.POINTWISE);
        JobGraphUtils.connect(createTask3, createFakeTail2, ChannelType.INMEMORY, DistributionPattern.POINTWISE);
        JobGraphUtils.connect(createIterationHead, createSync, ChannelType.NETWORK, DistributionPattern.POINTWISE);
        createVerticesInput.setVertexToShareInstancesWith(createIterationHead);
        createEdgesInput.setVertexToShareInstancesWith(createIterationHead);
        createIterationIntermediate.setVertexToShareInstancesWith(createIterationHead);
        createTask.setVertexToShareInstancesWith(createIterationHead);
        createTask3.setVertexToShareInstancesWith(createIterationHead);
        createOutput.setVertexToShareInstancesWith(createIterationHead);
        createSync.setVertexToShareInstancesWith(createIterationHead);
        createTask2.setVertexToShareInstancesWith(createTask3);
        createFakeTail.setVertexToShareInstancesWith(createTask2);
        createFakeTail2.setVertexToShareInstancesWith(createTask3);
        return jobGraph;
    }

    public JobGraph createJobGraphIntermediateWorksetUpdateAndSolutionSetTail(String str, String str2, String str3, int i, int i2) throws JobGraphDefinitionException {
        RecordSerializerFactory recordSerializerFactory = RecordSerializerFactory.get();
        RecordComparatorFactory recordComparatorFactory = new RecordComparatorFactory(new int[]{0}, new Class[]{LongValue.class}, new boolean[]{ITERATION_ID});
        RecordPairComparatorFactory recordPairComparatorFactory = RecordPairComparatorFactory.get();
        JobGraph jobGraph = new JobGraph("Connected Components (Intermediate Workset Update, Solution Set Tail)");
        JobInputVertex createVerticesInput = createVerticesInput(jobGraph, str, i, recordSerializerFactory, recordComparatorFactory);
        JobInputVertex createEdgesInput = createEdgesInput(jobGraph, str2, i, recordSerializerFactory, recordComparatorFactory);
        JobTaskVertex createIterationHead = createIterationHead(jobGraph, i, recordSerializerFactory, recordComparatorFactory, recordPairComparatorFactory);
        new TaskConfig(createIterationHead.getConfiguration()).setWaitForSolutionSetUpdate();
        JobTaskVertex createIterationIntermediate = createIterationIntermediate(jobGraph, i, recordSerializerFactory, recordComparatorFactory);
        TaskConfig taskConfig = new TaskConfig(createIterationIntermediate.getConfiguration());
        JobOutputVertex createOutput = createOutput(jobGraph, str3, i, recordSerializerFactory);
        JobOutputVertex createFakeTail = createFakeTail(jobGraph, i);
        JobOutputVertex createSync = createSync(jobGraph, i, i2);
        JobTaskVertex createTask = JobGraphUtils.createTask(IterationIntermediatePactTask.class, "WorksetUpdate", jobGraph, i, i);
        TaskConfig taskConfig2 = new TaskConfig(createTask.getConfiguration());
        taskConfig2.setIterationId(ITERATION_ID);
        taskConfig2.setIsWorksetIteration();
        taskConfig2.setIsWorksetUpdate();
        taskConfig2.addInputToGroup(0);
        taskConfig2.setInputSerializer(recordSerializerFactory, 0);
        taskConfig2.addOutputShipStrategy(ShipStrategyType.FORWARD);
        taskConfig2.setOutputComparator(recordComparatorFactory, 0);
        taskConfig2.setOutputSerializer(recordSerializerFactory);
        taskConfig2.setDriver(JoinWithSolutionSetSecondDriver.class);
        taskConfig2.setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_SECOND);
        taskConfig2.setDriverComparator(recordComparatorFactory, 0);
        taskConfig2.setDriverPairComparator(recordPairComparatorFactory);
        taskConfig2.setStubWrapper(new UserCodeClassWrapper(WorksetConnectedComponents.UpdateComponentIdMatch.class));
        JobTaskVertex createTask2 = JobGraphUtils.createTask(IterationTailPactTask.class, "IterationSolutionSetTail", jobGraph, i, i);
        TaskConfig taskConfig3 = new TaskConfig(createTask2.getConfiguration());
        taskConfig3.setIterationId(ITERATION_ID);
        taskConfig3.setIsSolutionSetUpdate();
        taskConfig3.setIsWorksetIteration();
        taskConfig3.addInputToGroup(0);
        taskConfig3.setInputSerializer(recordSerializerFactory, 0);
        taskConfig3.addOutputShipStrategy(ShipStrategyType.FORWARD);
        taskConfig3.setOutputSerializer(recordSerializerFactory);
        taskConfig3.setDriver(CollectorMapDriver.class);
        taskConfig3.setDriverStrategy(DriverStrategy.COLLECTOR_MAP);
        taskConfig3.setStubWrapper(new UserCodeClassWrapper(DummyMapper.class));
        JobGraphUtils.connect(createVerticesInput, createIterationHead, ChannelType.NETWORK, DistributionPattern.BIPARTITE);
        JobGraphUtils.connect(createEdgesInput, createIterationHead, ChannelType.NETWORK, DistributionPattern.BIPARTITE);
        JobGraphUtils.connect(createVerticesInput, createIterationHead, ChannelType.NETWORK, DistributionPattern.BIPARTITE);
        JobGraphUtils.connect(createIterationHead, createIterationIntermediate, ChannelType.NETWORK, DistributionPattern.BIPARTITE);
        taskConfig.setGateIterativeWithNumberOfEventsUntilInterrupt(0, i);
        JobGraphUtils.connect(createIterationIntermediate, createTask, ChannelType.NETWORK, DistributionPattern.POINTWISE);
        taskConfig2.setGateIterativeWithNumberOfEventsUntilInterrupt(0, ITERATION_ID);
        JobGraphUtils.connect(createTask, createTask2, ChannelType.INMEMORY, DistributionPattern.POINTWISE);
        taskConfig3.setGateIterativeWithNumberOfEventsUntilInterrupt(0, ITERATION_ID);
        JobGraphUtils.connect(createIterationHead, createOutput, ChannelType.INMEMORY, DistributionPattern.POINTWISE);
        JobGraphUtils.connect(createTask2, createFakeTail, ChannelType.INMEMORY, DistributionPattern.POINTWISE);
        JobGraphUtils.connect(createIterationHead, createSync, ChannelType.NETWORK, DistributionPattern.POINTWISE);
        createVerticesInput.setVertexToShareInstancesWith(createIterationHead);
        createEdgesInput.setVertexToShareInstancesWith(createIterationHead);
        createIterationIntermediate.setVertexToShareInstancesWith(createIterationHead);
        createTask.setVertexToShareInstancesWith(createIterationHead);
        createTask2.setVertexToShareInstancesWith(createIterationHead);
        createOutput.setVertexToShareInstancesWith(createIterationHead);
        createSync.setVertexToShareInstancesWith(createIterationHead);
        createFakeTail.setVertexToShareInstancesWith(createTask2);
        return jobGraph;
    }

    public JobGraph createJobGraphSolutionSetUpdateAndWorksetTail(String str, String str2, String str3, int i, int i2) throws JobGraphDefinitionException {
        RecordSerializerFactory recordSerializerFactory = RecordSerializerFactory.get();
        RecordComparatorFactory recordComparatorFactory = new RecordComparatorFactory(new int[]{0}, new Class[]{LongValue.class}, new boolean[]{ITERATION_ID});
        RecordPairComparatorFactory recordPairComparatorFactory = RecordPairComparatorFactory.get();
        JobGraph jobGraph = new JobGraph("Connected Components (Intermediate Solution Set Update, Workset Tail)");
        JobInputVertex createVerticesInput = createVerticesInput(jobGraph, str, i, recordSerializerFactory, recordComparatorFactory);
        JobInputVertex createEdgesInput = createEdgesInput(jobGraph, str2, i, recordSerializerFactory, recordComparatorFactory);
        JobTaskVertex createIterationHead = createIterationHead(jobGraph, i, recordSerializerFactory, recordComparatorFactory, recordPairComparatorFactory);
        JobTaskVertex createIterationIntermediate = createIterationIntermediate(jobGraph, i, recordSerializerFactory, recordComparatorFactory);
        TaskConfig taskConfig = new TaskConfig(createIterationIntermediate.getConfiguration());
        JobOutputVertex createOutput = createOutput(jobGraph, str3, i, recordSerializerFactory);
        JobOutputVertex createFakeTail = createFakeTail(jobGraph, i);
        JobOutputVertex createSync = createSync(jobGraph, i, i2);
        JobTaskVertex createTask = JobGraphUtils.createTask(IterationIntermediatePactTask.class, "Solution Set Update", jobGraph, i, i);
        TaskConfig taskConfig2 = new TaskConfig(createTask.getConfiguration());
        taskConfig2.setIterationId(ITERATION_ID);
        taskConfig2.setIsSolutionSetUpdate();
        taskConfig2.setIsSolutionSetUpdateWithoutReprobe();
        taskConfig2.addInputToGroup(0);
        taskConfig2.setInputSerializer(recordSerializerFactory, 0);
        taskConfig2.addOutputShipStrategy(ShipStrategyType.FORWARD);
        taskConfig2.setOutputComparator(recordComparatorFactory, 0);
        taskConfig2.setOutputSerializer(recordSerializerFactory);
        taskConfig2.setDriver(JoinWithSolutionSetSecondDriver.class);
        taskConfig2.setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_SECOND);
        taskConfig2.setDriverComparator(recordComparatorFactory, 0);
        taskConfig2.setDriverPairComparator(recordPairComparatorFactory);
        taskConfig2.setStubWrapper(new UserCodeClassWrapper(WorksetConnectedComponents.UpdateComponentIdMatch.class));
        JobTaskVertex createTask2 = JobGraphUtils.createTask(IterationTailPactTask.class, "IterationWorksetTail", jobGraph, i, i);
        TaskConfig taskConfig3 = new TaskConfig(createTask2.getConfiguration());
        taskConfig3.setIterationId(ITERATION_ID);
        taskConfig3.setIsWorksetIteration();
        taskConfig3.setIsWorksetUpdate();
        taskConfig3.addInputToGroup(0);
        taskConfig3.setInputSerializer(recordSerializerFactory, 0);
        taskConfig3.addOutputShipStrategy(ShipStrategyType.FORWARD);
        taskConfig3.setOutputSerializer(recordSerializerFactory);
        taskConfig3.setDriver(CollectorMapDriver.class);
        taskConfig3.setDriverStrategy(DriverStrategy.COLLECTOR_MAP);
        taskConfig3.setStubWrapper(new UserCodeClassWrapper(DummyMapper.class));
        JobGraphUtils.connect(createVerticesInput, createIterationHead, ChannelType.NETWORK, DistributionPattern.BIPARTITE);
        JobGraphUtils.connect(createEdgesInput, createIterationHead, ChannelType.NETWORK, DistributionPattern.BIPARTITE);
        JobGraphUtils.connect(createVerticesInput, createIterationHead, ChannelType.NETWORK, DistributionPattern.BIPARTITE);
        JobGraphUtils.connect(createIterationHead, createIterationIntermediate, ChannelType.NETWORK, DistributionPattern.BIPARTITE);
        taskConfig.setGateIterativeWithNumberOfEventsUntilInterrupt(0, i);
        JobGraphUtils.connect(createIterationIntermediate, createTask, ChannelType.NETWORK, DistributionPattern.POINTWISE);
        taskConfig2.setGateIterativeWithNumberOfEventsUntilInterrupt(0, ITERATION_ID);
        JobGraphUtils.connect(createTask, createTask2, ChannelType.INMEMORY, DistributionPattern.POINTWISE);
        taskConfig3.setGateIterativeWithNumberOfEventsUntilInterrupt(0, ITERATION_ID);
        JobGraphUtils.connect(createIterationHead, createOutput, ChannelType.INMEMORY, DistributionPattern.POINTWISE);
        JobGraphUtils.connect(createTask2, createFakeTail, ChannelType.INMEMORY, DistributionPattern.POINTWISE);
        JobGraphUtils.connect(createIterationHead, createSync, ChannelType.NETWORK, DistributionPattern.POINTWISE);
        createVerticesInput.setVertexToShareInstancesWith(createIterationHead);
        createEdgesInput.setVertexToShareInstancesWith(createIterationHead);
        createIterationIntermediate.setVertexToShareInstancesWith(createIterationHead);
        createTask.setVertexToShareInstancesWith(createIterationHead);
        createTask2.setVertexToShareInstancesWith(createIterationHead);
        createOutput.setVertexToShareInstancesWith(createIterationHead);
        createSync.setVertexToShareInstancesWith(createIterationHead);
        createFakeTail.setVertexToShareInstancesWith(createTask2);
        return jobGraph;
    }
}
