package eu.stratosphere.core.testing;

import eu.stratosphere.api.common.Plan;
import eu.stratosphere.api.common.io.FileInputFormat;
import eu.stratosphere.api.common.operators.FileDataSink;
import eu.stratosphere.api.common.operators.FileDataSource;
import eu.stratosphere.api.common.operators.GenericDataSink;
import eu.stratosphere.api.common.operators.GenericDataSource;
import eu.stratosphere.api.common.operators.Operator;
import eu.stratosphere.client.LocalExecutor;
import eu.stratosphere.configuration.Configuration;
import eu.stratosphere.configuration.GlobalConfiguration;
import eu.stratosphere.core.fs.FileSystem;
import eu.stratosphere.core.fs.Path;
import eu.stratosphere.core.testing.GenericTestRecords;
import eu.stratosphere.core.testing.io.SequentialInputFormat;
import eu.stratosphere.core.testing.io.SequentialOutputFormat;
import eu.stratosphere.util.StringUtils;
import eu.stratosphere.util.Visitor;
import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.IdentityHashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import org.junit.Assert;

/* loaded from: input_file:eu/stratosphere/core/testing/GenericTestPlan.class */
public abstract class GenericTestPlan<T, Records extends GenericTestRecords<T>> implements Closeable {
    private final Map<GenericDataSink, Records> actualOutputs;
    private final Operator[] contracts;
    private int degreeOfParallelism;
    private final Map<GenericDataSink, Records> expectedOutputs;
    private final Map<GenericDataSource<?>, Records> inputs;
    private final List<GenericDataSink> sinks;
    private final List<GenericDataSource<?>> sources;
    private TypeConfig<T> defaultConfig;

    public GenericTestPlan(TypeConfig<T> typeConfig, Collection<? extends Operator> collection) {
        this(typeConfig, (Operator[]) collection.toArray(new Operator[collection.size()]));
    }

    public GenericTestPlan(TypeConfig<T> typeConfig, Operator... operatorArr) {
        this.actualOutputs = new IdentityHashMap();
        this.degreeOfParallelism = 1;
        this.expectedOutputs = new IdentityHashMap();
        this.inputs = new IdentityHashMap();
        this.sinks = new ArrayList();
        this.sources = new ArrayList();
        if (operatorArr.length == 0) {
            throw new IllegalArgumentException();
        }
        this.defaultConfig = typeConfig;
        Configuration configuration = new Configuration();
        configuration.setString("default,2,1,1024,10,10", "standard,1,1,200,1,1");
        GlobalConfiguration.includeConfiguration(configuration);
        this.contracts = new InputOutputAdder().process(operatorArr);
        findSinksAndSources();
        configureSinksAndSources();
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        ClosableManager closableManager = new ClosableManager();
        Iterator<Records> it = this.inputs.values().iterator();
        while (it.hasNext()) {
            closableManager.add(it.next());
        }
        Iterator<Records> it2 = this.actualOutputs.values().iterator();
        while (it2.hasNext()) {
            closableManager.add(it2.next());
        }
        Iterator<Records> it3 = this.expectedOutputs.values().iterator();
        while (it3.hasNext()) {
            closableManager.add(it3.next());
        }
        closableManager.close();
    }

    public Records getActualOutput() {
        return getActualOutput(0);
    }

    public Records getActualOutput(GenericDataSink genericDataSink) {
        return getActualOutput(genericDataSink, (TypeConfig) null);
    }

    public Records getActualOutput(GenericDataSink genericDataSink, TypeConfig<T> typeConfig) {
        Records records = this.actualOutputs.get(genericDataSink);
        if (records == null) {
            Map<GenericDataSink, Records> map = this.actualOutputs;
            Records createTestRecords = createTestRecords(typeConfig);
            records = createTestRecords;
            map.put(genericDataSink, createTestRecords);
        } else if (typeConfig != null) {
            records.setTypeConfig(typeConfig);
        }
        return records;
    }

    public Records getActualOutput(int i) {
        return getActualOutput(getDataSinks().get(i));
    }

    public Records getActualOutput(int i, TypeConfig<T> typeConfig) {
        return getActualOutput(this.sinks.get(i), typeConfig);
    }

    public Records getActualOutput(TypeConfig<T> typeConfig) {
        return getActualOutput(0, typeConfig);
    }

    public int getDegreeOfParallelism() {
        return this.degreeOfParallelism;
    }

    public Records getExpectedOutput(GenericDataSink genericDataSink, TypeConfig<T> typeConfig) {
        Records records = this.expectedOutputs.get(genericDataSink);
        if (records == null) {
            Map<GenericDataSink, Records> map = this.expectedOutputs;
            Records createTestRecords = createTestRecords(typeConfig);
            records = createTestRecords;
            map.put(genericDataSink, createTestRecords);
            getActualOutput(genericDataSink).setTypeConfig(typeConfig);
        } else if (typeConfig != null) {
            records.setTypeConfig(typeConfig);
        }
        return records;
    }

    public Records getExpectedOutput(int i, TypeConfig<T> typeConfig) {
        return getExpectedOutput(getDataSinks().get(i), typeConfig);
    }

    public Records getExpectedOutput(TypeConfig<T> typeConfig) {
        return getExpectedOutput(0, typeConfig);
    }

    public Records getInput() {
        return getInput(0);
    }

    public Records getInput(GenericDataSource<?> genericDataSource) {
        return getInput(genericDataSource, (TypeConfig) null);
    }

    public Records getInput(GenericDataSource<?> genericDataSource, TypeConfig<T> typeConfig) {
        Records records = this.inputs.get(genericDataSource);
        if (records == null) {
            Map<GenericDataSource<?>, Records> map = this.inputs;
            Records createTestRecords = createTestRecords(typeConfig);
            records = createTestRecords;
            map.put(genericDataSource, createTestRecords);
        } else if (typeConfig != null) {
            records.setTypeConfig(typeConfig);
        }
        return records;
    }

    public Records getInput(int i) {
        return getInput(i, (TypeConfig) null);
    }

    public Records getInput(int i, TypeConfig<T> typeConfig) {
        return getInput(getDataSources().get(i), typeConfig);
    }

    public Operator getOutputOfOperator(Operator operator) {
        return getOutputsOfOperator(operator)[0];
    }

    public Operator[] getOutputsOfOperator(final Operator operator) {
        final ArrayList arrayList = new ArrayList();
        Iterator<GenericDataSink> it = this.sinks.iterator();
        while (it.hasNext()) {
            it.next().accept(new Visitor<Operator>() { // from class: eu.stratosphere.core.testing.GenericTestPlan.1
                LinkedList<Operator> outputStack = new LinkedList<>();

                public void postVisit(Operator operator2) {
                }

                public boolean preVisit(Operator operator2) {
                    if (operator2 == operator) {
                        arrayList.add(this.outputStack.peek());
                    }
                    this.outputStack.push(operator2);
                    return true;
                }
            });
        }
        return (Operator[]) arrayList.toArray(new Operator[arrayList.size()]);
    }

    public List<GenericDataSink> getSinks() {
        return this.sinks;
    }

    public List<GenericDataSource<?>> getSources() {
        return this.sources;
    }

    public void run() {
        try {
            Plan buildPlanWithReadableSinks = buildPlanWithReadableSinks();
            syncDegreeOfParallelism(buildPlanWithReadableSinks);
            initAdhocInputs();
            LocalExecutor.execute(buildPlanWithReadableSinks);
        } catch (Exception e) {
            Assert.fail("plan scheduling: " + e.getMessage() + "\n" + StringUtils.stringifyException(e));
        }
        try {
            validateResults();
        } finally {
            try {
                close();
            } catch (IOException e2) {
            }
        }
    }

    public void setDegreeOfParallelism(int i) {
        this.degreeOfParallelism = i;
    }

    protected Plan createPlan(Collection<GenericDataSink> collection) {
        return new Plan(collection);
    }

    protected abstract Records createTestRecords(TypeConfig<T> typeConfig);

    /* JADX INFO: Access modifiers changed from: protected */
    public TypeConfig<T> getDefaultConfig() {
        return this.defaultConfig;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void setDefaultConfig(TypeConfig<T> typeConfig) {
        if (typeConfig == null) {
            throw new NullPointerException("defaultConfig must not be null");
        }
        this.defaultConfig = typeConfig;
    }

    private Plan buildPlanWithReadableSinks() {
        List<GenericDataSink> dataSinks = getDataSinks();
        ArrayList arrayList = new ArrayList();
        Iterator<GenericDataSink> it = dataSinks.iterator();
        while (it.hasNext()) {
            FileDataSink fileDataSink = (GenericDataSink) it.next();
            Configuration configuration = new Configuration();
            TypeConfig<T> typeConfig = getActualOutput((GenericDataSink) fileDataSink, (TypeConfig) this.defaultConfig).getTypeConfig();
            SequentialInputFormat.configureSequentialFormat(configuration).typeSerializer(typeConfig.getTypeSerializerFactory());
            if (fileDataSink.getFormatWrapper().getUserCodeClass().equals(SequentialOutputFormat.class)) {
                arrayList.add(fileDataSink);
                getActualOutput((GenericDataSink) fileDataSink).load(SequentialInputFormat.class, fileDataSink.getFilePath(), configuration);
                Configuration configuration2 = new Configuration();
                SequentialOutputFormat.configureSequentialFormat(configuration2).typeSerializer(typeConfig.getTypeSerializerFactory());
                fileDataSink.getParameters().addAll(configuration2);
            } else {
                Records records = this.expectedOutputs.get(fileDataSink);
                GenericDataSink createDefaultSink = createDefaultSink(fileDataSink.getName(), typeConfig);
                createDefaultSink.setInputs(fileDataSink.getInputs());
                arrayList.add(fileDataSink);
                arrayList.add(createDefaultSink);
                if (records != null) {
                    this.expectedOutputs.put(createDefaultSink, records);
                }
                this.actualOutputs.put(createDefaultSink, getActualOutput((GenericDataSink) fileDataSink));
                getActualOutput((GenericDataSink) fileDataSink).load(SequentialInputFormat.class, createDefaultSink.getFilePath(), configuration);
            }
        }
        return createPlan(arrayList);
    }

    private void configureSinksAndSources() {
        Iterator<GenericDataSink> it = this.sinks.iterator();
        while (it.hasNext()) {
            it.next().getParameters().setLong("pact.output.file.timeout", 0L);
        }
        Iterator<GenericDataSource<?>> it2 = this.sources.iterator();
        while (it2.hasNext()) {
            FileInputFormat.configureFileFormat(it2.next()).openingTimeout(0);
        }
    }

    private void findSinksAndSources() {
        for (Operator operator : this.contracts) {
            operator.accept(new Visitor<Operator>() { // from class: eu.stratosphere.core.testing.GenericTestPlan.2
                public void postVisit(Operator operator2) {
                }

                public boolean preVisit(Operator operator2) {
                    if ((operator2 instanceof GenericDataSink) && !GenericTestPlan.this.sinks.contains(operator2)) {
                        GenericTestPlan.this.sinks.add((GenericDataSink) operator2);
                    }
                    if (!(operator2 instanceof GenericDataSource) || GenericTestPlan.this.sources.contains(operator2)) {
                        return true;
                    }
                    GenericTestPlan.this.sources.add((GenericDataSource) operator2);
                    return true;
                }
            });
        }
        Iterator<GenericDataSource<?>> it = this.sources.iterator();
        while (it.hasNext()) {
            FileDataSource fileDataSource = (GenericDataSource) it.next();
            if (fileDataSource instanceof FileDataSource) {
                getInput((GenericDataSource<?>) fileDataSource).load(fileDataSource.getFormatWrapper().getUserCodeClass(), fileDataSource.getFilePath(), fileDataSource.getParameters());
            } else {
                getInput((GenericDataSource<?>) fileDataSource).load(fileDataSource.getFormatWrapper().getUserCodeClass(), fileDataSource.getParameters());
            }
        }
    }

    private List<GenericDataSink> getDataSinks() {
        return this.sinks;
    }

    private List<? extends GenericDataSource<?>> getDataSources() {
        return this.sources;
    }

    private void initAdhocInputs() throws IOException {
        Iterator<GenericDataSource<?>> it = this.sources.iterator();
        while (it.hasNext()) {
            FileDataSource fileDataSource = (GenericDataSource) it.next();
            Records input = getInput((GenericDataSource<?>) fileDataSource, this.defaultConfig);
            if (fileDataSource.getFormatWrapper().getUserCodeClass().equals(SequentialInputFormat.class)) {
                SequentialInputFormat.configureSequentialFormat((GenericDataSource<?>) fileDataSource).typeSerializer(input.getTypeConfig().getTypeSerializerFactory());
            }
            if (input.isAdhoc() && (fileDataSource instanceof FileDataSource)) {
                input.saveToFile(fileDataSource.getFilePath());
            }
        }
    }

    private void syncDegreeOfParallelism(Plan plan) {
        plan.accept(new Visitor<Operator>() { // from class: eu.stratosphere.core.testing.GenericTestPlan.3
            public void postVisit(Operator operator) {
            }

            public boolean preVisit(Operator operator) {
                int degreeOfParallelism = GenericTestPlan.this.getDegreeOfParallelism();
                if (operator instanceof GenericDataSource) {
                    degreeOfParallelism = 1;
                } else if (degreeOfParallelism > 1 && (operator instanceof FileDataSink)) {
                    try {
                        Path path = new Path(((FileDataSink) operator).getFilePath());
                        FileSystem fileSystem = path.getFileSystem();
                        if (!fileSystem.getFileStatus(path).isDir()) {
                            fileSystem.delete(path, false);
                            fileSystem.mkdirs(path);
                        }
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
                if (operator.getDegreeOfParallelism() != -1) {
                    return true;
                }
                operator.setDegreeOfParallelism(degreeOfParallelism);
                return true;
            }
        });
    }

    private void validateResults() {
        for (GenericDataSink genericDataSink : getDataSinks()) {
            Records records = this.expectedOutputs.get(genericDataSink);
            if (genericDataSink.getFormatWrapper().getUserCodeClass() == SequentialOutputFormat.class && records != null && records.isInitialized()) {
                Records actualOutput = getActualOutput(genericDataSink);
                try {
                    try {
                        actualOutput.assertEquals(records);
                        actualOutput.close();
                    } catch (AssertionError e) {
                        AssertionError assertionError = new AssertionError(genericDataSink.getName() + ": " + e.getMessage());
                        assertionError.initCause(e.getCause());
                        throw assertionError;
                    }
                } catch (Throwable th) {
                    actualOutput.close();
                    throw th;
                }
            }
        }
    }

    public static FileDataSink createDefaultSink(String str, TypeConfig<?> typeConfig) {
        FileDataSink fileDataSink = new FileDataSink(SequentialOutputFormat.class, getTestPlanFile("output"), str);
        if (typeConfig != null) {
            SequentialOutputFormat.configureSequentialFormat((GenericDataSink) fileDataSink).typeSerializer(typeConfig.getTypeSerializerFactory());
        }
        return fileDataSink;
    }

    public static FileDataSource createDefaultSource(String str, TypeConfig<?> typeConfig) {
        FileDataSource fileDataSource = new FileDataSource(SequentialInputFormat.class, getTestPlanFile("input"), str);
        if (typeConfig != null) {
            SequentialInputFormat.configureSequentialFormat((GenericDataSource<?>) fileDataSource).typeSerializer(typeConfig.getTypeSerializerFactory());
        }
        return fileDataSource;
    }

    static String getTestPlanFile(String str) {
        return createTemporaryFile("testPlan", str);
    }

    private static String createTemporaryFile(String str, String str2) {
        try {
            File createTempFile = File.createTempFile(str, str2);
            createTempFile.deleteOnExit();
            return createTempFile.toURI().toString();
        } catch (IOException e) {
            throw new IllegalStateException("Cannot create temporary file for prefix " + str2, e);
        }
    }
}
