package org.apache.flink.optimizer.dataexchange;

import java.util.Iterator;
import java.util.List;
import org.apache.flink.api.common.Plan;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.io.DiscardingOutputFormat;
import org.apache.flink.api.java.operators.FilterOperator;
import org.apache.flink.api.java.operators.JoinOperator;
import org.apache.flink.api.java.operators.MapOperator;
import org.apache.flink.api.java.operators.ReduceOperator;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.optimizer.DataStatistics;
import org.apache.flink.optimizer.dag.DataSinkNode;
import org.apache.flink.optimizer.dag.OptimizerNode;
import org.apache.flink.optimizer.dag.SingleInputNode;
import org.apache.flink.optimizer.dag.SinkJoiner;
import org.apache.flink.optimizer.dag.TwoInputNode;
import org.apache.flink.optimizer.testfunctions.DummyCoGroupFunction;
import org.apache.flink.optimizer.testfunctions.DummyFlatJoinFunction;
import org.apache.flink.optimizer.testfunctions.IdentityFlatMapper;
import org.apache.flink.optimizer.testfunctions.IdentityKeyExtractor;
import org.apache.flink.optimizer.testfunctions.SelectOneReducer;
import org.apache.flink.optimizer.testfunctions.Top1GroupReducer;
import org.apache.flink.optimizer.traversals.BranchesVisitor;
import org.apache.flink.optimizer.traversals.GraphCreatingVisitor;
import org.apache.flink.optimizer.traversals.IdAndEstimatesVisitor;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/optimizer/dataexchange/PipelineBreakingTest.class */
public class PipelineBreakingTest {
    @Test
    public void testSimpleForwardPlan() {
        try {
            ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();
            executionEnvironment.readTextFile("/never/accessed").map(new MapFunction<String, Integer>() { // from class: org.apache.flink.optimizer.dataexchange.PipelineBreakingTest.2
                public Integer map(String str) {
                    return 0;
                }
            }).filter(new FilterFunction<Integer>() { // from class: org.apache.flink.optimizer.dataexchange.PipelineBreakingTest.1
                public boolean filter(Integer num) {
                    return false;
                }
            }).groupBy(new IdentityKeyExtractor()).reduceGroup(new Top1GroupReducer()).output(new DiscardingOutputFormat());
            DataSinkNode dataSinkNode = convertPlan(executionEnvironment.createProgramPlan()).get(0);
            SingleInputNode predecessorNode = dataSinkNode.getPredecessorNode();
            SingleInputNode predecessorNode2 = predecessorNode.getPredecessorNode();
            SingleInputNode predecessorNode3 = predecessorNode2.getPredecessorNode();
            SingleInputNode predecessorNode4 = predecessorNode3.getPredecessorNode();
            Assert.assertFalse(dataSinkNode.getInputConnection().isBreakingPipeline());
            Assert.assertFalse(predecessorNode.getIncomingConnection().isBreakingPipeline());
            Assert.assertFalse(predecessorNode2.getIncomingConnection().isBreakingPipeline());
            Assert.assertFalse(predecessorNode3.getIncomingConnection().isBreakingPipeline());
            Assert.assertFalse(predecessorNode4.getIncomingConnection().isBreakingPipeline());
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail(e.getMessage());
        }
    }

    @Test
    public void testBranchingPlanNotReJoined() {
        try {
            ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();
            MapOperator map = executionEnvironment.readTextFile("/never/accessed").map(new MapFunction<String, Integer>() { // from class: org.apache.flink.optimizer.dataexchange.PipelineBreakingTest.3
                public Integer map(String str) {
                    return 0;
                }
            });
            map.filter(new FilterFunction<Integer>() { // from class: org.apache.flink.optimizer.dataexchange.PipelineBreakingTest.4
                public boolean filter(Integer num) {
                    return false;
                }
            }).output(new DiscardingOutputFormat());
            map.join(executionEnvironment.fromElements(new Integer[]{1, 2, 3, 4})).where(new IdentityKeyExtractor()).equalTo(new IdentityKeyExtractor()).output(new DiscardingOutputFormat());
            map.output(new DiscardingOutputFormat());
            List<DataSinkNode> convertPlan = convertPlan(executionEnvironment.createProgramPlan());
            DataSinkNode dataSinkNode = convertPlan.get(0);
            DataSinkNode dataSinkNode2 = convertPlan.get(1);
            DataSinkNode dataSinkNode3 = convertPlan.get(2);
            SingleInputNode predecessorNode = dataSinkNode.getPredecessorNode();
            SingleInputNode predecessorNode2 = predecessorNode.getPredecessorNode();
            TwoInputNode predecessorNode3 = dataSinkNode2.getPredecessorNode();
            SingleInputNode secondPredecessorNode = predecessorNode3.getSecondPredecessorNode();
            Assert.assertFalse(dataSinkNode.getInputConnection().isBreakingPipeline());
            Assert.assertFalse(dataSinkNode2.getInputConnection().isBreakingPipeline());
            Assert.assertFalse(dataSinkNode3.getInputConnection().isBreakingPipeline());
            Assert.assertFalse(predecessorNode.getIncomingConnection().isBreakingPipeline());
            Assert.assertFalse(predecessorNode2.getIncomingConnection().isBreakingPipeline());
            Assert.assertFalse(predecessorNode3.getFirstIncomingConnection().isBreakingPipeline());
            Assert.assertFalse(predecessorNode3.getSecondIncomingConnection().isBreakingPipeline());
            Assert.assertFalse(secondPredecessorNode.getIncomingConnection().isBreakingPipeline());
            Assert.assertEquals(predecessorNode2, predecessorNode3.getFirstPredecessorNode().getPredecessorNode());
            Assert.assertEquals(predecessorNode2, dataSinkNode3.getPredecessorNode());
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail(e.getMessage());
        }
    }

    @Test
    public void testReJoinedBranches() {
        try {
            ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();
            MapOperator map = executionEnvironment.fromElements(new Long[]{33L, 44L}).map(new MapFunction<Long, Tuple2<Long, Long>>() { // from class: org.apache.flink.optimizer.dataexchange.PipelineBreakingTest.5
                public Tuple2<Long, Long> map(Long l) {
                    return new Tuple2<>(l, l);
                }
            });
            ReduceOperator reduce = map.groupBy(new int[]{0}).reduce(new SelectOneReducer());
            reduce.output(new DiscardingOutputFormat());
            FilterOperator filter = map.filter(new FilterFunction<Tuple2<Long, Long>>() { // from class: org.apache.flink.optimizer.dataexchange.PipelineBreakingTest.6
                public boolean filter(Tuple2<Long, Long> tuple2) throws Exception {
                    return false;
                }
            });
            JoinOperator.EquiJoin with = reduce.join(filter).where(new int[]{1}).equalTo(new int[]{1}).with(new DummyFlatJoinFunction());
            with.flatMap(new IdentityFlatMapper()).output(new DiscardingOutputFormat());
            with.coGroup(filter.groupBy(new int[]{1}).reduceGroup(new Top1GroupReducer())).where(new int[]{0}).equalTo(new int[]{0}).with(new DummyCoGroupFunction()).output(new DiscardingOutputFormat());
            List<DataSinkNode> convertPlan = convertPlan(executionEnvironment.createProgramPlan());
            DataSinkNode dataSinkNode = convertPlan.get(0);
            DataSinkNode dataSinkNode2 = convertPlan.get(1);
            DataSinkNode dataSinkNode3 = convertPlan.get(2);
            SingleInputNode predecessorNode = dataSinkNode.getPredecessorNode();
            SingleInputNode predecessorNode2 = predecessorNode.getPredecessorNode();
            SingleInputNode predecessorNode3 = dataSinkNode2.getPredecessorNode();
            TwoInputNode predecessorNode4 = predecessorNode3.getPredecessorNode();
            SingleInputNode secondPredecessorNode = predecessorNode4.getSecondPredecessorNode();
            TwoInputNode predecessorNode5 = dataSinkNode3.getPredecessorNode();
            SingleInputNode secondPredecessorNode2 = predecessorNode5.getSecondPredecessorNode();
            Assert.assertEquals(predecessorNode, predecessorNode4.getFirstPredecessorNode());
            Assert.assertEquals(predecessorNode2, secondPredecessorNode.getPredecessorNode());
            Assert.assertEquals(predecessorNode4, predecessorNode5.getFirstPredecessorNode());
            Assert.assertEquals(secondPredecessorNode, secondPredecessorNode2.getPredecessorNode());
            Assert.assertFalse(dataSinkNode.getInputConnection().isBreakingPipeline());
            Assert.assertFalse(dataSinkNode2.getInputConnection().isBreakingPipeline());
            Assert.assertFalse(dataSinkNode3.getInputConnection().isBreakingPipeline());
            Assert.assertFalse(predecessorNode2.getIncomingConnection().isBreakingPipeline());
            Assert.assertFalse(predecessorNode3.getIncomingConnection().isBreakingPipeline());
            Assert.assertFalse(predecessorNode4.getFirstIncomingConnection().isBreakingPipeline());
            Assert.assertFalse(predecessorNode5.getFirstIncomingConnection().isBreakingPipeline());
            Assert.assertFalse(predecessorNode5.getSecondIncomingConnection().isBreakingPipeline());
            Assert.assertTrue(predecessorNode.getIncomingConnection().isBreakingPipeline());
            Assert.assertTrue(secondPredecessorNode.getIncomingConnection().isBreakingPipeline());
            Assert.assertTrue(secondPredecessorNode2.getIncomingConnection().isBreakingPipeline());
            Assert.assertTrue(predecessorNode4.getSecondIncomingConnection().isBreakingPipeline());
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail(e.getMessage());
        }
    }

    private static List<DataSinkNode> convertPlan(Plan plan) {
        OptimizerNode optimizerNode;
        GraphCreatingVisitor graphCreatingVisitor = new GraphCreatingVisitor(17, plan.getExecutionConfig().getExecutionMode());
        plan.accept(graphCreatingVisitor);
        List<DataSinkNode> sinks = graphCreatingVisitor.getSinks();
        if (sinks.size() != 1) {
            Iterator<DataSinkNode> it = sinks.iterator();
            OptimizerNode next = it.next();
            while (true) {
                optimizerNode = next;
                if (!it.hasNext()) {
                    break;
                }
                next = new SinkJoiner(optimizerNode, it.next());
            }
        } else {
            optimizerNode = (OptimizerNode) sinks.get(0);
        }
        optimizerNode.accept(new IdAndEstimatesVisitor((DataStatistics) null));
        optimizerNode.accept(new BranchesVisitor());
        return sinks;
    }
}
