package org.apache.flink.optimizer;

import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.io.DiscardingOutputFormat;
import org.apache.flink.optimizer.plan.Channel;
import org.apache.flink.optimizer.plan.DualInputPlanNode;
import org.apache.flink.optimizer.plan.OptimizedPlan;
import org.apache.flink.optimizer.plan.PlanNode;
import org.apache.flink.optimizer.plan.SingleInputPlanNode;
import org.apache.flink.optimizer.plan.SinkPlanNode;
import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
import org.apache.flink.optimizer.testfunctions.IdentityGroupReducer;
import org.apache.flink.optimizer.testfunctions.IdentityJoiner;
import org.apache.flink.optimizer.testfunctions.IdentityMapper;
import org.apache.flink.optimizer.util.CompilerTestBase;
import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
import org.apache.flink.runtime.operators.util.LocalStrategy;
import org.apache.flink.util.Visitor;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/optimizer/ParallelismChangeTest.class */
public class ParallelismChangeTest extends CompilerTestBase {
    @Test
    public void checkPropertyHandlingWithIncreasingGlobalParallelism1() {
        ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.setParallelism(8);
        executionEnvironment.generateSequence(0L, 1L).setParallelism(8).map(new IdentityMapper()).withForwardedFields(new String[]{"*"}).setParallelism(8).name("Map1").groupBy(new String[]{"*"}).reduceGroup(new IdentityGroupReducer()).withForwardedFields(new String[]{"*"}).setParallelism(8).name("Reduce1").map(new IdentityMapper()).withForwardedFields(new String[]{"*"}).setParallelism(16).name("Map2").groupBy(new String[]{"*"}).reduceGroup(new IdentityGroupReducer()).withForwardedFields(new String[]{"*"}).setParallelism(16).name("Reduce2").output(new DiscardingOutputFormat()).setParallelism(16).name("Sink");
        SingleInputPlanNode predecessor = ((SinkPlanNode) compileNoStats(executionEnvironment.createProgramPlan()).getDataSinks().iterator().next()).getPredecessor();
        ShipStrategyType shipStrategy = predecessor.getPredecessor().getInput().getShipStrategy();
        ShipStrategyType shipStrategy2 = predecessor.getInput().getShipStrategy();
        Assert.assertEquals("Invalid ship strategy for an operator.", ShipStrategyType.PARTITION_HASH, shipStrategy);
        Assert.assertEquals("Invalid ship strategy for an operator.", ShipStrategyType.FORWARD, shipStrategy2);
    }

    @Test
    public void checkPropertyHandlingWithIncreasingGlobalParallelism2() {
        ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.setParallelism(8);
        executionEnvironment.generateSequence(0L, 1L).setParallelism(8).map(new IdentityMapper()).withForwardedFields(new String[]{"*"}).setParallelism(8).name("Map1").groupBy(new String[]{"*"}).reduceGroup(new IdentityGroupReducer()).withForwardedFields(new String[]{"*"}).setParallelism(8).name("Reduce1").map(new IdentityMapper()).withForwardedFields(new String[]{"*"}).setParallelism(8).name("Map2").groupBy(new String[]{"*"}).reduceGroup(new IdentityGroupReducer()).withForwardedFields(new String[]{"*"}).setParallelism(16).name("Reduce2").output(new DiscardingOutputFormat()).setParallelism(16).name("Sink");
        SingleInputPlanNode predecessor = ((SinkPlanNode) compileNoStats(executionEnvironment.createProgramPlan()).getDataSinks().iterator().next()).getPredecessor();
        ShipStrategyType shipStrategy = predecessor.getPredecessor().getInput().getShipStrategy();
        ShipStrategyType shipStrategy2 = predecessor.getInput().getShipStrategy();
        Assert.assertEquals("Invalid ship strategy for an operator.", ShipStrategyType.FORWARD, shipStrategy);
        Assert.assertEquals("Invalid ship strategy for an operator.", ShipStrategyType.PARTITION_HASH, shipStrategy2);
    }

    @Test
    public void checkPropertyHandlingWithIncreasingLocalParallelism() {
        ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.setParallelism(16);
        executionEnvironment.generateSequence(0L, 1L).setParallelism(16).map(new IdentityMapper()).withForwardedFields(new String[]{"*"}).setParallelism(16).name("Map1").groupBy(new String[]{"*"}).reduceGroup(new IdentityGroupReducer()).withForwardedFields(new String[]{"*"}).setParallelism(16).name("Reduce1").map(new IdentityMapper()).withForwardedFields(new String[]{"*"}).setParallelism(32).name("Map2").groupBy(new String[]{"*"}).reduceGroup(new IdentityGroupReducer()).withForwardedFields(new String[]{"*"}).setParallelism(32).name("Reduce2").output(new DiscardingOutputFormat()).setParallelism(32).name("Sink");
        SingleInputPlanNode predecessor = ((SinkPlanNode) compileNoStats(executionEnvironment.createProgramPlan()).getDataSinks().iterator().next()).getPredecessor();
        ShipStrategyType shipStrategy = predecessor.getPredecessor().getInput().getShipStrategy();
        ShipStrategyType shipStrategy2 = predecessor.getInput().getShipStrategy();
        Assert.assertTrue("Invalid ship strategy for an operator.", (ShipStrategyType.PARTITION_RANDOM == shipStrategy && ShipStrategyType.PARTITION_HASH == shipStrategy2) || (ShipStrategyType.PARTITION_HASH == shipStrategy && ShipStrategyType.FORWARD == shipStrategy2));
    }

    @Test
    public void checkPropertyHandlingWithDecreasingParallelism() {
        ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.setParallelism(8);
        executionEnvironment.generateSequence(0L, 1L).setParallelism(16).map(new IdentityMapper()).withForwardedFields(new String[]{"*"}).setParallelism(16).name("Map1").groupBy(new String[]{"*"}).reduceGroup(new IdentityGroupReducer()).withForwardedFields(new String[]{"*"}).setParallelism(16).name("Reduce1").map(new IdentityMapper()).withForwardedFields(new String[]{"*"}).setParallelism(8).name("Map2").groupBy(new String[]{"*"}).reduceGroup(new IdentityGroupReducer()).withForwardedFields(new String[]{"*"}).setParallelism(8).name("Reduce2").output(new DiscardingOutputFormat()).setParallelism(8).name("Sink");
        SingleInputPlanNode predecessor = ((SinkPlanNode) compileNoStats(executionEnvironment.createProgramPlan()).getDataSinks().iterator().next()).getPredecessor();
        SingleInputPlanNode predecessor2 = predecessor.getPredecessor();
        Assert.assertTrue("The no sorting local strategy.", LocalStrategy.SORT == predecessor.getInput().getLocalStrategy() || LocalStrategy.SORT == predecessor2.getInput().getLocalStrategy());
        Assert.assertTrue("The no partitioning ship strategy.", ShipStrategyType.PARTITION_HASH == predecessor.getInput().getShipStrategy() || ShipStrategyType.PARTITION_HASH == predecessor2.getInput().getShipStrategy());
    }

    @Test
    public void checkPropertyHandlingWithTwoInputs() {
        ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.setParallelism(8);
        executionEnvironment.generateSequence(0L, 1L).setParallelism(5).groupBy(new String[]{"*"}).reduceGroup(new IdentityGroupReducer()).withForwardedFields(new String[]{"*"}).setParallelism(5).join(executionEnvironment.generateSequence(0L, 1L).setParallelism(7).groupBy(new String[]{"*"}).reduceGroup(new IdentityGroupReducer()).withForwardedFields(new String[]{"*"}).setParallelism(7)).where(new String[]{"*"}).equalTo(new String[]{"*"}).with(new IdentityJoiner()).setParallelism(5).output(new DiscardingOutputFormat()).setParallelism(5);
        OptimizedPlan compileNoStats = compileNoStats(executionEnvironment.createProgramPlan());
        new JobGraphGenerator().compileJobGraph(compileNoStats);
        compileNoStats.accept(new Visitor<PlanNode>() { // from class: org.apache.flink.optimizer.ParallelismChangeTest.1
            public boolean preVisit(PlanNode planNode) {
                if (!(planNode instanceof DualInputPlanNode)) {
                    return true;
                }
                DualInputPlanNode dualInputPlanNode = (DualInputPlanNode) planNode;
                Channel input1 = dualInputPlanNode.getInput1();
                Channel input2 = dualInputPlanNode.getInput2();
                Assert.assertEquals("Incompatible shipping strategy chosen for match", ShipStrategyType.FORWARD, input1.getShipStrategy());
                Assert.assertEquals("Incompatible shipping strategy chosen for match", ShipStrategyType.PARTITION_HASH, input2.getShipStrategy());
                return false;
            }

            public void postVisit(PlanNode planNode) {
            }
        });
    }
}
