package org.apache.flink.optimizer;

import org.apache.flink.api.common.Plan;
import org.apache.flink.api.common.operators.util.FieldList;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.io.DiscardingOutputFormat;
import org.apache.flink.api.java.operators.DeltaIteration;
import org.apache.flink.api.java.operators.JoinOperator;
import org.apache.flink.api.java.operators.Operator;
import org.apache.flink.optimizer.plan.Channel;
import org.apache.flink.optimizer.plan.DualInputPlanNode;
import org.apache.flink.optimizer.plan.OptimizedPlan;
import org.apache.flink.optimizer.plan.SingleInputPlanNode;
import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
import org.apache.flink.optimizer.testfunctions.IdentityGroupReducer;
import org.apache.flink.optimizer.testfunctions.IdentityJoiner;
import org.apache.flink.optimizer.testfunctions.IdentityMapper;
import org.apache.flink.optimizer.util.CompilerTestBase;
import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/optimizer/WorksetIterationsRecordApiCompilerTest.class */
public class WorksetIterationsRecordApiCompilerTest extends CompilerTestBase {
    private static final long serialVersionUID = 1;
    private static final String ITERATION_NAME = "Test Workset Iteration";
    private static final String JOIN_WITH_INVARIANT_NAME = "Test Join Invariant";
    private static final String JOIN_WITH_SOLUTION_SET = "Test Join SolutionSet";
    private static final String NEXT_WORKSET_REDUCER_NAME = "Test Reduce Workset";
    private static final String SOLUTION_DELTA_MAPPER_NAME = "Test Map Delta";
    private final FieldList list0 = new FieldList(0);

    @Test
    public void testRecordApiWithDeferredSoltionSetUpdateWithMapper() {
        try {
            OptimizedPlan compileNoStats = compileNoStats(getTestPlan(false, true));
            CompilerTestBase.OptimizerPlanNodeResolver optimizerPlanNodeResolver = getOptimizerPlanNodeResolver(compileNoStats);
            DualInputPlanNode node = optimizerPlanNodeResolver.getNode(JOIN_WITH_INVARIANT_NAME);
            DualInputPlanNode node2 = optimizerPlanNodeResolver.getNode(JOIN_WITH_SOLUTION_SET);
            SingleInputPlanNode node3 = optimizerPlanNodeResolver.getNode(NEXT_WORKSET_REDUCER_NAME);
            SingleInputPlanNode node4 = optimizerPlanNodeResolver.getNode(SOLUTION_DELTA_MAPPER_NAME);
            Assert.assertEquals(ShipStrategyType.FORWARD, node.getInput1().getShipStrategy());
            Assert.assertEquals(ShipStrategyType.PARTITION_HASH, node.getInput2().getShipStrategy());
            Assert.assertEquals(this.list0, node.getKeysForInput1());
            Assert.assertEquals(this.list0, node.getKeysForInput2());
            Assert.assertEquals(ShipStrategyType.FORWARD, node2.getInput1().getShipStrategy());
            Assert.assertEquals(ShipStrategyType.FORWARD, node2.getInput2().getShipStrategy());
            Assert.assertEquals(ShipStrategyType.PARTITION_HASH, node3.getInput().getShipStrategy());
            Assert.assertEquals(this.list0, node3.getKeys(0));
            ShipStrategyType shipStrategy = node4.getInput().getShipStrategy();
            ShipStrategyType shipStrategy2 = ((Channel) node4.getOutgoingChannels().get(0)).getShipStrategy();
            Assert.assertTrue((shipStrategy == ShipStrategyType.FORWARD && shipStrategy2 == ShipStrategyType.PARTITION_HASH) || (shipStrategy2 == ShipStrategyType.FORWARD && shipStrategy == ShipStrategyType.PARTITION_HASH));
            new JobGraphGenerator().compileJobGraph(compileNoStats);
        } catch (CompilerException e) {
            e.printStackTrace();
            Assert.fail("The pact compiler is unable to compile this plan correctly.");
        }
    }

    @Test
    public void testRecordApiWithDeferredSoltionSetUpdateWithNonPreservingJoin() {
        try {
            OptimizedPlan compileNoStats = compileNoStats(getTestPlan(false, false));
            CompilerTestBase.OptimizerPlanNodeResolver optimizerPlanNodeResolver = getOptimizerPlanNodeResolver(compileNoStats);
            DualInputPlanNode node = optimizerPlanNodeResolver.getNode(JOIN_WITH_INVARIANT_NAME);
            DualInputPlanNode node2 = optimizerPlanNodeResolver.getNode(JOIN_WITH_SOLUTION_SET);
            SingleInputPlanNode node3 = optimizerPlanNodeResolver.getNode(NEXT_WORKSET_REDUCER_NAME);
            Assert.assertEquals(ShipStrategyType.FORWARD, node.getInput1().getShipStrategy());
            Assert.assertEquals(ShipStrategyType.PARTITION_HASH, node.getInput2().getShipStrategy());
            Assert.assertEquals(this.list0, node.getKeysForInput1());
            Assert.assertEquals(this.list0, node.getKeysForInput2());
            Assert.assertEquals(ShipStrategyType.FORWARD, node2.getInput1().getShipStrategy());
            Assert.assertEquals(ShipStrategyType.FORWARD, node2.getInput2().getShipStrategy());
            Assert.assertEquals(ShipStrategyType.PARTITION_HASH, node3.getInput().getShipStrategy());
            Assert.assertEquals(this.list0, node3.getKeys(0));
            Assert.assertEquals(2L, node2.getOutgoingChannels().size());
            Assert.assertEquals(ShipStrategyType.PARTITION_HASH, ((Channel) node2.getOutgoingChannels().get(0)).getShipStrategy());
            Assert.assertEquals(ShipStrategyType.PARTITION_HASH, ((Channel) node2.getOutgoingChannels().get(1)).getShipStrategy());
            new JobGraphGenerator().compileJobGraph(compileNoStats);
        } catch (CompilerException e) {
            e.printStackTrace();
            Assert.fail("The pact compiler is unable to compile this plan correctly.");
        }
    }

    @Test
    public void testRecordApiWithDirectSoltionSetUpdate() {
        try {
            OptimizedPlan compileNoStats = compileNoStats(getTestPlan(true, false));
            CompilerTestBase.OptimizerPlanNodeResolver optimizerPlanNodeResolver = getOptimizerPlanNodeResolver(compileNoStats);
            DualInputPlanNode node = optimizerPlanNodeResolver.getNode(JOIN_WITH_INVARIANT_NAME);
            DualInputPlanNode node2 = optimizerPlanNodeResolver.getNode(JOIN_WITH_SOLUTION_SET);
            SingleInputPlanNode node3 = optimizerPlanNodeResolver.getNode(NEXT_WORKSET_REDUCER_NAME);
            Assert.assertEquals(ShipStrategyType.FORWARD, node.getInput1().getShipStrategy());
            Assert.assertEquals(ShipStrategyType.PARTITION_HASH, node.getInput2().getShipStrategy());
            Assert.assertEquals(this.list0, node.getKeysForInput1());
            Assert.assertEquals(this.list0, node.getKeysForInput2());
            Assert.assertEquals(ShipStrategyType.FORWARD, node2.getInput1().getShipStrategy());
            Assert.assertEquals(ShipStrategyType.FORWARD, node2.getInput2().getShipStrategy());
            Assert.assertEquals(ShipStrategyType.FORWARD, node3.getInput().getShipStrategy());
            Assert.assertEquals(this.list0, node3.getKeys(0));
            Assert.assertEquals(serialVersionUID, node2.getOutgoingChannels().size());
            Assert.assertEquals(ShipStrategyType.FORWARD, ((Channel) node2.getOutgoingChannels().get(0)).getShipStrategy());
            new JobGraphGenerator().compileJobGraph(compileNoStats);
        } catch (CompilerException e) {
            e.printStackTrace();
            Assert.fail("The pact compiler is unable to compile this plan correctly.");
        }
    }

    private Plan getTestPlan(boolean z, boolean z2) {
        ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.setParallelism(8);
        Operator name = executionEnvironment.readCsvFile("/tmp/sol.csv").types(Long.class, Long.class).name("Solution Set");
        Operator name2 = executionEnvironment.readCsvFile("/tmp/sol.csv").types(Long.class, Long.class).name("Workset");
        Operator name3 = executionEnvironment.readCsvFile("/tmp/sol.csv").types(Long.class, Long.class).name("Invariant Input");
        DeltaIteration name4 = name.iterateDelta(name2, 100, new int[]{0}).name(ITERATION_NAME);
        JoinOperator name5 = name4.getSolutionSet().join(name4.getWorkset().join(name3).where(new int[]{0}).equalTo(new int[]{0}).with(new IdentityJoiner()).withForwardedFieldsFirst(new String[]{"*"}).name(JOIN_WITH_INVARIANT_NAME)).where(new int[]{0}).equalTo(new int[]{0}).with(new IdentityJoiner()).name(JOIN_WITH_SOLUTION_SET);
        if (z) {
            name5.withForwardedFieldsFirst(new String[]{"*"});
        }
        Operator name6 = name5.groupBy(new int[]{0}).reduceGroup(new IdentityGroupReducer()).withForwardedFields(new String[]{"*"}).name(NEXT_WORKSET_REDUCER_NAME);
        if (z2) {
            name4.closeWith(name5.map(new IdentityMapper()).withForwardedFields(new String[]{"*"}).name(SOLUTION_DELTA_MAPPER_NAME), name6).output(new DiscardingOutputFormat());
        } else {
            name4.closeWith(name5, name6).output(new DiscardingOutputFormat());
        }
        return executionEnvironment.createProgramPlan();
    }
}
