package org.apache.flink.optimizer;

import java.util.Iterator;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.operators.base.FlatMapOperatorBase;
import org.apache.flink.api.common.operators.base.GroupReduceOperatorBase;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.aggregation.Aggregations;
import org.apache.flink.api.java.io.DiscardingOutputFormat;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.operators.UnionOperator;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.optimizer.plan.Channel;
import org.apache.flink.optimizer.plan.NAryUnionPlanNode;
import org.apache.flink.optimizer.plan.OptimizedPlan;
import org.apache.flink.optimizer.plan.PlanNode;
import org.apache.flink.optimizer.plan.SingleInputPlanNode;
import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
import org.apache.flink.optimizer.testfunctions.IdentityGroupReducer;
import org.apache.flink.optimizer.util.CompilerTestBase;
import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
import org.apache.flink.util.Collector;
import org.apache.flink.util.Visitor;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/optimizer/UnionPropertyPropagationTest.class */
public class UnionPropertyPropagationTest extends CompilerTestBase {

    /* loaded from: input_file:org/apache/flink/optimizer/UnionPropertyPropagationTest$DummyFlatMap.class */
    public static final class DummyFlatMap extends RichFlatMapFunction<String, Tuple2<String, Integer>> {
        private static final long serialVersionUID = 1;

        public void flatMap(String str, Collector<Tuple2<String, Integer>> collector) {
            collector.collect(new Tuple2(str, 0));
        }

        public /* bridge */ /* synthetic */ void flatMap(Object obj, Collector collector) throws Exception {
            flatMap((String) obj, (Collector<Tuple2<String, Integer>>) collector);
        }
    }

    @Test
    public void testUnion1() {
        ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.setParallelism(8);
        executionEnvironment.generateSequence(0L, 1L).groupBy(new String[]{"*"}).reduceGroup(new IdentityGroupReducer()).union(executionEnvironment.generateSequence(0L, 1L).groupBy(new String[]{"*"}).reduceGroup(new IdentityGroupReducer())).groupBy(new String[]{"*"}).reduceGroup(new IdentityGroupReducer()).output(new DiscardingOutputFormat());
        OptimizedPlan compileNoStats = compileNoStats(executionEnvironment.createProgramPlan());
        new JobGraphGenerator().compileJobGraph(compileNoStats);
        compileNoStats.accept(new Visitor<PlanNode>() { // from class: org.apache.flink.optimizer.UnionPropertyPropagationTest.1
            public boolean preVisit(PlanNode planNode) {
                if (!(planNode instanceof SingleInputPlanNode) || !(planNode.getProgramOperator() instanceof GroupReduceOperatorBase)) {
                    return true;
                }
                Iterator it = planNode.getInputs().iterator();
                while (it.hasNext()) {
                    Assert.assertTrue("Reduce should just forward the input if it is already partitioned", ((Channel) it.next()).getShipStrategy() == ShipStrategyType.FORWARD);
                }
                return false;
            }

            public void postVisit(PlanNode planNode) {
            }
        });
    }

    @Test
    public void testUnion2() {
        ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();
        DataSource readTextFile = executionEnvironment.readTextFile(IN_FILE);
        UnionOperator flatMap = readTextFile.flatMap(new DummyFlatMap());
        for (int i = 1; i < 4; i++) {
            flatMap = flatMap.union(readTextFile.flatMap(new DummyFlatMap()));
        }
        flatMap.groupBy(new int[]{0}).aggregate(Aggregations.SUM, 1).writeAsText(OUT_FILE);
        OptimizedPlan compileNoStats = compileNoStats(executionEnvironment.createProgramPlan("Test union on new java-api"));
        new JobGraphGenerator().compileJobGraph(compileNoStats);
        compileNoStats.accept(new Visitor<PlanNode>() { // from class: org.apache.flink.optimizer.UnionPropertyPropagationTest.2
            public boolean preVisit(PlanNode planNode) {
                if ((planNode instanceof SingleInputPlanNode) && (planNode.getProgramOperator() instanceof GroupReduceOperatorBase)) {
                    Channel input = ((SingleInputPlanNode) planNode).getInput();
                    Assert.assertTrue("Union should just forward the Partitioning", input.getShipStrategy() == ShipStrategyType.FORWARD);
                    Assert.assertTrue("Union Node should be under Group operator", input.getSource() instanceof NAryUnionPlanNode);
                }
                if (!(planNode instanceof NAryUnionPlanNode)) {
                    return true;
                }
                int i2 = 0;
                for (Channel channel : planNode.getInputs()) {
                    Assert.assertTrue("Input of Union should be FlatMapOperators", channel.getSource().getProgramOperator() instanceof FlatMapOperatorBase);
                    Assert.assertTrue("Shipment strategy under union should partition the data", channel.getShipStrategy() == ShipStrategyType.PARTITION_HASH);
                    i2++;
                }
                Assert.assertTrue("NAryUnion should have 4 inputs", i2 == 4);
                return false;
            }

            public void postVisit(PlanNode planNode) {
            }
        });
    }
}
