package org.apache.flink.optimizer;

import org.apache.flink.api.common.operators.Order;
import org.apache.flink.api.common.operators.Ordering;
import org.apache.flink.api.common.operators.util.FieldList;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.io.DiscardingOutputFormat;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.operators.UnionOperator;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.optimizer.dataproperties.PartitioningProperty;
import org.apache.flink.optimizer.plan.Channel;
import org.apache.flink.optimizer.plan.NAryUnionPlanNode;
import org.apache.flink.optimizer.plan.SingleInputPlanNode;
import org.apache.flink.optimizer.plan.SourcePlanNode;
import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
import org.apache.flink.optimizer.util.CompilerTestBase;
import org.apache.flink.runtime.operators.DriverStrategy;
import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/optimizer/UnionReplacementTest.class */
public class UnionReplacementTest extends CompilerTestBase {
    @Test
    public void testUnionReplacement() {
        try {
            ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();
            UnionOperator union = executionEnvironment.fromElements(new String[]{"test1"}).union(executionEnvironment.fromElements(new String[]{"test2"}));
            union.output(new DiscardingOutputFormat());
            union.output(new DiscardingOutputFormat());
            new JobGraphGenerator().compileJobGraph(compileNoStats(executionEnvironment.createProgramPlan()));
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail(e.getMessage());
        }
    }

    @Test
    public void testUnionWithTwoOutputs() throws Exception {
        ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.setParallelism(8);
        DataSource fromElements = executionEnvironment.fromElements(new Tuple2[]{new Tuple2(0L, 0L)});
        DataSource fromElements2 = executionEnvironment.fromElements(new Tuple2[]{new Tuple2(0L, 0L)});
        DataSource fromElements3 = executionEnvironment.fromElements(new Tuple2[]{new Tuple2(0L, 0L)});
        DataSource fromElements4 = executionEnvironment.fromElements(new Tuple2[]{new Tuple2(0L, 0L)});
        UnionOperator union = fromElements2.union(fromElements3);
        UnionOperator union2 = fromElements.union(union);
        UnionOperator union3 = fromElements4.union(union);
        union2.groupBy(new int[]{0}).sum(1).name("1").output(new DiscardingOutputFormat());
        union3.groupBy(new int[]{1}).sum(0).name("2").output(new DiscardingOutputFormat());
        CompilerTestBase.OptimizerPlanNodeResolver optimizerPlanNodeResolver = getOptimizerPlanNodeResolver(compileNoStats(executionEnvironment.createProgramPlan()));
        SingleInputPlanNode node = optimizerPlanNodeResolver.getNode("1");
        SingleInputPlanNode node2 = optimizerPlanNodeResolver.getNode("2");
        Assert.assertTrue("Reduce input should be partitioned on 0.", node.getInput().getGlobalProperties().getPartitioningFields().isExactMatch(new FieldList(0)));
        Assert.assertTrue("Reduce input should be partitioned on 1.", node2.getInput().getGlobalProperties().getPartitioningFields().isExactMatch(new FieldList(1)));
        Assert.assertTrue("Reduce input should be n-ary union with three inputs.", (node.getInput().getSource() instanceof NAryUnionPlanNode) && node.getInput().getSource().getListOfInputs().size() == 3);
        Assert.assertTrue("Reduce input should be n-ary union with three inputs.", (node2.getInput().getSource() instanceof NAryUnionPlanNode) && node2.getInput().getSource().getListOfInputs().size() == 3);
        Assert.assertTrue("Channel between union and group reduce should be forwarding", node.getInput().getShipStrategy().equals(ShipStrategyType.FORWARD));
        Assert.assertTrue("Channel between union and group reduce should be forwarding", node2.getInput().getShipStrategy().equals(ShipStrategyType.FORWARD));
        for (Channel channel : node.getInput().getSource().getListOfInputs()) {
            Assert.assertTrue("Union input channel should hash partition on 0", channel.getShipStrategy().equals(ShipStrategyType.PARTITION_HASH) && channel.getShipStrategyKeys().isExactMatch(new FieldList(0)));
        }
        for (Channel channel2 : node2.getInput().getSource().getListOfInputs()) {
            Assert.assertTrue("Union input channel should hash partition on 0", channel2.getShipStrategy().equals(ShipStrategyType.PARTITION_HASH) && channel2.getShipStrategyKeys().isExactMatch(new FieldList(1)));
        }
    }

    @Test
    public void testConsecutiveUnionsWithHashPartitioning() throws Exception {
        ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.setParallelism(8);
        executionEnvironment.fromElements(new Tuple2[]{new Tuple2(0L, 0L)}).union(executionEnvironment.fromElements(new Tuple2[]{new Tuple2(0L, 0L)})).union(executionEnvironment.fromElements(new Tuple2[]{new Tuple2(0L, 0L)})).partitionByHash(new int[]{1}).output(new DiscardingOutputFormat()).name("out");
        SingleInputPlanNode node = getOptimizerPlanNodeResolver(compileNoStats(executionEnvironment.createProgramPlan())).getNode("out");
        Assert.assertEquals("Sink input should be hash partitioned.", PartitioningProperty.HASH_PARTITIONED, node.getInput().getGlobalProperties().getPartitioning());
        Assert.assertEquals("Sink input should be hash partitioned on 1.", new FieldList(1), node.getInput().getGlobalProperties().getPartitioningFields());
        SingleInputPlanNode source = node.getInput().getSource();
        Assert.assertTrue(source.getDriverStrategy() == DriverStrategy.UNARY_NO_OP);
        Assert.assertEquals("Partitioner input should be hash partitioned.", PartitioningProperty.HASH_PARTITIONED, source.getInput().getGlobalProperties().getPartitioning());
        Assert.assertEquals("Partitioner input should be hash partitioned on 1.", new FieldList(1), source.getInput().getGlobalProperties().getPartitioningFields());
        Assert.assertEquals("Partitioner input channel should be forwarding", ShipStrategyType.FORWARD, source.getInput().getShipStrategy());
        for (Channel channel : source.getInput().getSource().getInputs()) {
            Assert.assertEquals("Union input should be hash partitioned", PartitioningProperty.HASH_PARTITIONED, channel.getGlobalProperties().getPartitioning());
            Assert.assertEquals("Union input channel should be hash partitioning", ShipStrategyType.PARTITION_HASH, channel.getShipStrategy());
            Assert.assertTrue("Union input should be data source", channel.getSource() instanceof SourcePlanNode);
        }
    }

    @Test
    public void testConsecutiveUnionsWithRebalance() throws Exception {
        ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.setParallelism(8);
        executionEnvironment.fromElements(new Tuple2[]{new Tuple2(0L, 0L)}).union(executionEnvironment.fromElements(new Tuple2[]{new Tuple2(0L, 0L)})).union(executionEnvironment.fromElements(new Tuple2[]{new Tuple2(0L, 0L)})).rebalance().output(new DiscardingOutputFormat()).name("out");
        SingleInputPlanNode node = getOptimizerPlanNodeResolver(compileNoStats(executionEnvironment.createProgramPlan())).getNode("out");
        Assert.assertEquals("Sink input should be force rebalanced.", PartitioningProperty.FORCED_REBALANCED, node.getInput().getGlobalProperties().getPartitioning());
        SingleInputPlanNode source = node.getInput().getSource();
        Assert.assertTrue(source.getDriverStrategy() == DriverStrategy.UNARY_NO_OP);
        Assert.assertEquals("Partitioner input should be force rebalanced.", PartitioningProperty.FORCED_REBALANCED, source.getInput().getGlobalProperties().getPartitioning());
        Assert.assertEquals("Partitioner input channel should be forwarding", ShipStrategyType.FORWARD, source.getInput().getShipStrategy());
        for (Channel channel : source.getInput().getSource().getInputs()) {
            Assert.assertEquals("Union input should be force rebalanced", PartitioningProperty.FORCED_REBALANCED, channel.getGlobalProperties().getPartitioning());
            Assert.assertEquals("Union input channel should be rebalancing", ShipStrategyType.PARTITION_FORCED_REBALANCE, channel.getShipStrategy());
            Assert.assertTrue("Union input should be data source", channel.getSource() instanceof SourcePlanNode);
        }
    }

    @Test
    public void testConsecutiveUnionsWithRangePartitioning() throws Exception {
        ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.setParallelism(8);
        executionEnvironment.fromElements(new Tuple2[]{new Tuple2(0L, 0L)}).union(executionEnvironment.fromElements(new Tuple2[]{new Tuple2(0L, 0L)})).union(executionEnvironment.fromElements(new Tuple2[]{new Tuple2(0L, 0L)})).partitionByRange(new int[]{1}).output(new DiscardingOutputFormat()).name("out");
        SingleInputPlanNode node = getOptimizerPlanNodeResolver(compileNoStats(executionEnvironment.createProgramPlan())).getNode("out");
        Assert.assertEquals("Sink input should be range partitioned.", PartitioningProperty.RANGE_PARTITIONED, node.getInput().getGlobalProperties().getPartitioning());
        Assert.assertEquals("Sink input should be range partitioned on 1", new Ordering(1, (Class) null, Order.ASCENDING), node.getInput().getGlobalProperties().getPartitioningOrdering());
        SingleInputPlanNode source = node.getInput().getSource();
        Assert.assertTrue(source.getDriverStrategy() == DriverStrategy.UNARY_NO_OP);
        Assert.assertEquals("Partitioner input should be range partitioned.", PartitioningProperty.RANGE_PARTITIONED, source.getInput().getGlobalProperties().getPartitioning());
        Assert.assertEquals("Partitioner input should be range partitioned on 1", new Ordering(1, (Class) null, Order.ASCENDING), source.getInput().getGlobalProperties().getPartitioningOrdering());
        Assert.assertEquals("Partitioner input channel should be forwarding", ShipStrategyType.FORWARD, source.getInput().getShipStrategy());
        for (Channel channel : source.getInput().getSource().getInputs()) {
            Assert.assertEquals("Union input should be force rebalanced", PartitioningProperty.RANGE_PARTITIONED, channel.getGlobalProperties().getPartitioning());
            Assert.assertEquals("Union input channel should be rebalancing", ShipStrategyType.FORWARD, channel.getShipStrategy());
            SingleInputPlanNode source2 = channel.getSource();
            Assert.assertEquals(DriverStrategy.MAP, source2.getDriverStrategy());
            Assert.assertEquals(ShipStrategyType.PARTITION_CUSTOM, source2.getInput().getShipStrategy());
        }
    }
}
