package org.apache.flink.optimizer.plantranslate;

import java.io.IOException;
import java.lang.reflect.Method;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.attribute.FileAttribute;
import java.util.Arrays;
import java.util.Map;
import org.apache.flink.api.common.aggregators.LongSumAggregator;
import org.apache.flink.api.common.cache.DistributedCache;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.operators.ResourceSpec;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.io.BlockingShuffleOutputFormat;
import org.apache.flink.api.java.io.DiscardingOutputFormat;
import org.apache.flink.api.java.operators.DataSink;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.operators.DeltaIteration;
import org.apache.flink.api.java.operators.FilterOperator;
import org.apache.flink.api.java.operators.IterativeDataSet;
import org.apache.flink.api.java.operators.MapOperator;
import org.apache.flink.api.java.operators.Operator;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.optimizer.Optimizer;
import org.apache.flink.optimizer.testfunctions.IdentityMapper;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.util.AbstractID;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

/* loaded from: input_file:org/apache/flink/optimizer/plantranslate/JobGraphGeneratorTest.class */
public class JobGraphGeneratorTest {

    @Rule
    public final TemporaryFolder tmp = new TemporaryFolder();

    @Test
    public void testResourcesForChainedOperators() throws Exception {
        ResourceSpec build = ResourceSpec.newBuilder(0.1d, 100).build();
        ResourceSpec build2 = ResourceSpec.newBuilder(0.2d, 200).build();
        ResourceSpec build3 = ResourceSpec.newBuilder(0.3d, 300).build();
        ResourceSpec build4 = ResourceSpec.newBuilder(0.4d, 400).build();
        ResourceSpec build5 = ResourceSpec.newBuilder(0.5d, 500).build();
        ResourceSpec build6 = ResourceSpec.newBuilder(0.6d, 600).build();
        ResourceSpec build7 = ResourceSpec.newBuilder(0.7d, 700).build();
        Method declaredMethod = Operator.class.getDeclaredMethod("setResources", ResourceSpec.class);
        declaredMethod.setAccessible(true);
        Method declaredMethod2 = DataSink.class.getDeclaredMethod("setResources", ResourceSpec.class);
        declaredMethod2.setAccessible(true);
        MapFunction<Long, Long> mapFunction = new MapFunction<Long, Long>() { // from class: org.apache.flink.optimizer.plantranslate.JobGraphGeneratorTest.1
            public Long map(Long l) throws Exception {
                return l;
            }
        };
        FilterFunction<Long> filterFunction = new FilterFunction<Long>() { // from class: org.apache.flink.optimizer.plantranslate.JobGraphGeneratorTest.2
            public boolean filter(Long l) throws Exception {
                return false;
            }
        };
        ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();
        DataSource fromElements = executionEnvironment.fromElements(new Long[]{1L, 2L, 3L});
        declaredMethod.invoke(fromElements, build);
        MapOperator map = fromElements.map(mapFunction);
        declaredMethod.invoke(map, build2);
        FilterOperator filter = map.filter(filterFunction);
        declaredMethod.invoke(filter, build3);
        IterativeDataSet iterate = filter.iterate(10);
        declaredMethod.invoke(iterate, build4);
        MapOperator map2 = iterate.map(mapFunction);
        declaredMethod.invoke(map2, build5);
        FilterOperator filter2 = map2.filter(filterFunction);
        declaredMethod.invoke(filter2, build6);
        declaredMethod2.invoke(iterate.closeWith(filter2).output(new DiscardingOutputFormat()), build7);
        JobGraph compileJob = compileJob(executionEnvironment);
        JobVertex jobVertex = (JobVertex) compileJob.getVerticesSortedTopologicallyFromSources().get(0);
        JobVertex jobVertex2 = (JobVertex) compileJob.getVerticesSortedTopologicallyFromSources().get(1);
        JobVertex jobVertex3 = (JobVertex) compileJob.getVerticesSortedTopologicallyFromSources().get(2);
        JobVertex jobVertex4 = (JobVertex) compileJob.getVerticesSortedTopologicallyFromSources().get(3);
        JobVertex jobVertex5 = (JobVertex) compileJob.getVerticesSortedTopologicallyFromSources().get(4);
        Assert.assertTrue(jobVertex.getMinResources().equals(build.merge(build2).merge(build3)));
        Assert.assertTrue(jobVertex2.getPreferredResources().equals(build4));
        Assert.assertTrue(jobVertex3.getMinResources().equals(build5.merge(build6)));
        Assert.assertTrue(jobVertex4.getPreferredResources().equals(build7));
        Assert.assertTrue(jobVertex5.getMinResources().equals(build4));
    }

    @Test
    public void testResourcesForDeltaIteration() throws Exception {
        ResourceSpec build = ResourceSpec.newBuilder(0.1d, 100).build();
        ResourceSpec build2 = ResourceSpec.newBuilder(0.2d, 200).build();
        ResourceSpec build3 = ResourceSpec.newBuilder(0.3d, 300).build();
        ResourceSpec build4 = ResourceSpec.newBuilder(0.4d, 400).build();
        ResourceSpec build5 = ResourceSpec.newBuilder(0.5d, 500).build();
        ResourceSpec build6 = ResourceSpec.newBuilder(0.6d, 600).build();
        Method declaredMethod = Operator.class.getDeclaredMethod("setResources", ResourceSpec.class);
        declaredMethod.setAccessible(true);
        Method declaredMethod2 = DeltaIteration.class.getDeclaredMethod("setResources", ResourceSpec.class);
        declaredMethod2.setAccessible(true);
        Method declaredMethod3 = DataSink.class.getDeclaredMethod("setResources", ResourceSpec.class);
        declaredMethod3.setAccessible(true);
        MapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> mapFunction = new MapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>>() { // from class: org.apache.flink.optimizer.plantranslate.JobGraphGeneratorTest.3
            public Tuple2<Long, Long> map(Tuple2<Long, Long> tuple2) throws Exception {
                return tuple2;
            }
        };
        FilterFunction<Tuple2<Long, Long>> filterFunction = new FilterFunction<Tuple2<Long, Long>>() { // from class: org.apache.flink.optimizer.plantranslate.JobGraphGeneratorTest.4
            public boolean filter(Tuple2<Long, Long> tuple2) throws Exception {
                return false;
            }
        };
        ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();
        DataSource fromElements = executionEnvironment.fromElements(new Tuple2[]{new Tuple2(1L, 2L)});
        declaredMethod.invoke(fromElements, build);
        MapOperator map = fromElements.map(mapFunction);
        declaredMethod.invoke(map, build2);
        DeltaIteration registerAggregator = map.iterateDelta(map, 100, new int[]{0}).registerAggregator("test", new LongSumAggregator());
        declaredMethod2.invoke(registerAggregator, build3);
        MapOperator map2 = registerAggregator.getWorkset().map(mapFunction);
        declaredMethod.invoke(map2, build4);
        FilterOperator filter = map2.filter(filterFunction);
        declaredMethod.invoke(filter, build5);
        declaredMethod3.invoke(registerAggregator.closeWith(map2, filter).output(new DiscardingOutputFormat()), build6);
        JobGraph compileJob = compileJob(executionEnvironment);
        JobVertex jobVertex = (JobVertex) compileJob.getVerticesSortedTopologicallyFromSources().get(0);
        JobVertex jobVertex2 = (JobVertex) compileJob.getVerticesSortedTopologicallyFromSources().get(1);
        JobVertex jobVertex3 = (JobVertex) compileJob.getVerticesSortedTopologicallyFromSources().get(2);
        JobVertex jobVertex4 = (JobVertex) compileJob.getVerticesSortedTopologicallyFromSources().get(3);
        JobVertex jobVertex5 = (JobVertex) compileJob.getVerticesSortedTopologicallyFromSources().get(4);
        JobVertex jobVertex6 = (JobVertex) compileJob.getVerticesSortedTopologicallyFromSources().get(5);
        JobVertex jobVertex7 = (JobVertex) compileJob.getVerticesSortedTopologicallyFromSources().get(6);
        Assert.assertTrue(jobVertex.getMinResources().equals(build.merge(build2)));
        Assert.assertTrue(jobVertex2.getPreferredResources().equals(build3));
        Assert.assertTrue(jobVertex3.getMinResources().equals(build4));
        Assert.assertTrue(jobVertex4.getPreferredResources().equals(ResourceSpec.DEFAULT));
        Assert.assertTrue(jobVertex5.getMinResources().equals(build5));
        Assert.assertTrue(jobVertex6.getPreferredResources().equals(build6));
        Assert.assertTrue(jobVertex7.getMinResources().equals(build3));
    }

    @Test
    public void testArtifactCompression() throws IOException {
        Path path = this.tmp.newFile("plainFile1").toPath();
        Path path2 = this.tmp.newFile("plainFile2").toPath();
        Path path3 = this.tmp.newFolder("directory1").toPath();
        Files.createDirectory(path3.resolve("containedFile1"), new FileAttribute[0]);
        Path path4 = this.tmp.newFolder("directory2").toPath();
        Files.createDirectory(path4.resolve("containedFile2"), new FileAttribute[0]);
        JobGraph jobGraph = new JobGraph(new JobVertex[0]);
        JobGraphGenerator.addUserArtifactEntries(Arrays.asList(Tuple2.of("executableFile", new DistributedCache.DistributedCacheEntry(path.toString(), true)), Tuple2.of("nonExecutableFile", new DistributedCache.DistributedCacheEntry(path2.toString(), false)), Tuple2.of("executableDir", new DistributedCache.DistributedCacheEntry(path3.toString(), true)), Tuple2.of("nonExecutableDIr", new DistributedCache.DistributedCacheEntry(path4.toString(), false))), jobGraph);
        Map userArtifacts = jobGraph.getUserArtifacts();
        assertState((DistributedCache.DistributedCacheEntry) userArtifacts.get("executableFile"), true, false);
        assertState((DistributedCache.DistributedCacheEntry) userArtifacts.get("nonExecutableFile"), false, false);
        assertState((DistributedCache.DistributedCacheEntry) userArtifacts.get("executableDir"), true, true);
        assertState((DistributedCache.DistributedCacheEntry) userArtifacts.get("nonExecutableDIr"), false, true);
    }

    @Test
    public void testGeneratingJobGraphWithUnconsumedResultPartition() {
        ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();
        Operator parallelism = executionEnvironment.fromElements(new Tuple2[]{new Tuple2(1L, 2L)}).setParallelism(1).map(new IdentityMapper()).setParallelism(3);
        AbstractID abstractID = new AbstractID();
        parallelism.output(BlockingShuffleOutputFormat.createOutputFormat(abstractID)).setParallelism(1);
        parallelism.output(new DiscardingOutputFormat()).setParallelism(1);
        JobGraph compileJob = compileJob(executionEnvironment);
        Assert.assertEquals(3L, compileJob.getVerticesSortedTopologicallyFromSources().size());
        JobVertex jobVertex = (JobVertex) compileJob.getVerticesSortedTopologicallyFromSources().get(1);
        Assert.assertThat(jobVertex, Matchers.instanceOf(JobVertex.class));
        Assert.assertEquals(2L, jobVertex.getProducedDataSets().size());
        Assert.assertTrue(jobVertex.getProducedDataSets().stream().anyMatch(intermediateDataSet -> {
            return intermediateDataSet.getId().equals(new IntermediateDataSetID(abstractID)) && intermediateDataSet.getResultType() == ResultPartitionType.BLOCKING_PERSISTENT;
        }));
    }

    private static void assertState(DistributedCache.DistributedCacheEntry distributedCacheEntry, boolean z, boolean z2) throws IOException {
        Assert.assertNotNull(distributedCacheEntry);
        Assert.assertEquals(Boolean.valueOf(z), distributedCacheEntry.isExecutable);
        Assert.assertEquals(Boolean.valueOf(z2), Boolean.valueOf(distributedCacheEntry.isZipped));
        org.apache.flink.core.fs.Path path = new org.apache.flink.core.fs.Path(distributedCacheEntry.filePath);
        Assert.assertTrue(path.getFileSystem().exists(path));
        Assert.assertFalse(path.getFileSystem().getFileStatus(path).isDir());
    }

    private static JobGraph compileJob(ExecutionEnvironment executionEnvironment) {
        return new JobGraphGenerator().compileJobGraph(new Optimizer(new Configuration()).compile(executionEnvironment.createProgramPlan()));
    }
}
