package eu.stratosphere.test.util;

import eu.stratosphere.api.common.JobExecutionResult;
import eu.stratosphere.api.java.ExecutionEnvironment;
import eu.stratosphere.api.java.operators.translation.JavaPlan;
import eu.stratosphere.client.minicluster.NepheleMiniCluster;
import eu.stratosphere.compiler.DataStatistics;
import eu.stratosphere.compiler.PactCompiler;
import eu.stratosphere.compiler.plan.OptimizedPlan;
import eu.stratosphere.compiler.plandump.PlanJSONDumpGenerator;
import eu.stratosphere.compiler.plantranslate.NepheleJobGraphGenerator;
import eu.stratosphere.configuration.Configuration;
import eu.stratosphere.nephele.client.JobClient;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:eu/stratosphere/test/util/JavaProgramTestBase.class */
public abstract class JavaProgramTestBase extends AbstractTestBase {
    private static final int DEFAULT_DEGREE_OF_PARALLELISM = 4;
    private JobExecutionResult latestExecutionResult;
    private int degreeOfParallelism;

    /* loaded from: input_file:eu/stratosphere/test/util/JavaProgramTestBase$TestEnvironment.class */
    private static final class TestEnvironment extends ExecutionEnvironment {
        private final NepheleMiniCluster executor;
        private JobExecutionResult latestResult;

        private TestEnvironment(NepheleMiniCluster nepheleMiniCluster, int i) {
            this.executor = nepheleMiniCluster;
            setDegreeOfParallelism(i);
        }

        public JobExecutionResult execute(String str) throws Exception {
            try {
                JobClient jobClient = this.executor.getJobClient(new NepheleJobGraphGenerator().compileJobGraph(compileProgram(str)));
                jobClient.setConsoleStreamForReporting(AbstractTestBase.getNullPrintStream());
                JobExecutionResult submitJobAndWait = jobClient.submitJobAndWait();
                this.latestResult = submitJobAndWait;
                return submitJobAndWait;
            } catch (Exception e) {
                System.err.println(e.getMessage());
                e.printStackTrace();
                Assert.fail("Job execution failed!");
                return null;
            }
        }

        public String getExecutionPlan() throws Exception {
            return new PlanJSONDumpGenerator().getOptimizerPlanAsJSON(compileProgram("unused"));
        }

        private OptimizedPlan compileProgram(String str) {
            JavaPlan createProgramPlan = createProgramPlan(str);
            createProgramPlan.setDefaultParallelism(getDegreeOfParallelism());
            return new PactCompiler(new DataStatistics()).compile(createProgramPlan);
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void setAsContext() {
            initializeContextEnvironment(this);
        }
    }

    public JavaProgramTestBase() {
        this(new Configuration());
    }

    public JavaProgramTestBase(Configuration configuration) {
        super(configuration);
        this.degreeOfParallelism = DEFAULT_DEGREE_OF_PARALLELISM;
    }

    public void setDegreeOfParallelism(int i) {
        this.degreeOfParallelism = i;
    }

    public JobExecutionResult getLatestExecutionResult() {
        return this.latestExecutionResult;
    }

    protected abstract void testProgram() throws Exception;

    protected void preSubmit() throws Exception {
    }

    protected void postSubmit() throws Exception {
    }

    @Test
    public void testJob() throws Exception {
        try {
            preSubmit();
        } catch (Exception e) {
            System.err.println(e.getMessage());
            e.printStackTrace();
            Assert.fail("Pre-submit work caused an error: " + e.getMessage());
        }
        TestEnvironment testEnvironment = new TestEnvironment(this.executor, this.degreeOfParallelism);
        testEnvironment.setAsContext();
        try {
            testProgram();
            this.latestExecutionResult = testEnvironment.latestResult;
        } catch (Exception e2) {
            System.err.println(e2.getMessage());
            e2.printStackTrace();
            Assert.fail("Error while calling the test program: " + e2.getMessage());
        }
        Assert.assertNotNull("The test program never triggered an execution.", this.latestExecutionResult);
        try {
            postSubmit();
        } catch (Exception e3) {
            System.err.println(e3.getMessage());
            e3.printStackTrace();
            Assert.fail("Post-submit work caused an error: " + e3.getMessage());
        }
    }

    protected ExecutionEnvironment getExecutionEnvironment() {
        return new TestEnvironment(this.executor, this.degreeOfParallelism);
    }
}
