package eu.stratosphere.pact.runtime.task;

import eu.stratosphere.api.common.io.InputFormat;
import eu.stratosphere.api.common.typeutils.TypeSerializer;
import eu.stratosphere.configuration.Configuration;
import eu.stratosphere.core.io.InputSplit;
import eu.stratosphere.nephele.execution.librarycache.LibraryCacheManager;
import eu.stratosphere.nephele.template.AbstractInputTask;
import eu.stratosphere.pact.runtime.shipping.OutputCollector;
import eu.stratosphere.pact.runtime.shipping.RecordOutputCollector;
import eu.stratosphere.pact.runtime.task.chaining.ChainedDriver;
import eu.stratosphere.pact.runtime.task.chaining.ChainedMapDriver;
import eu.stratosphere.pact.runtime.task.util.TaskConfig;
import eu.stratosphere.types.Record;
import eu.stratosphere.util.Collector;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

/* loaded from: input_file:eu/stratosphere/pact/runtime/task/DataSourceTask.class */
public class DataSourceTask<OT> extends AbstractInputTask<InputSplit> {
    private static final Log LOG = LogFactory.getLog(DataSourceTask.class);
    private Collector<OT> output;
    private InputFormat<OT, InputSplit> format;
    private TypeSerializer<OT> serializer;
    private TaskConfig config;
    private ArrayList<ChainedDriver<?, ?>> chainedTasks;
    private ClassLoader userCodeClassLoader;
    private volatile boolean taskCanceled = false;

    @Override // eu.stratosphere.nephele.template.AbstractInvokable
    public void registerInputOutput() {
        if (LOG.isDebugEnabled()) {
            LOG.debug(getLogString("Start registering input and output"));
        }
        if (this.userCodeClassLoader == null) {
            try {
                this.userCodeClassLoader = LibraryCacheManager.getClassLoader(getEnvironment().getJobID());
            } catch (IOException e) {
                throw new RuntimeException("Usercode ClassLoader could not be obtained for job: " + getEnvironment().getJobID(), e);
            }
        }
        Configuration taskConfiguration = getTaskConfiguration();
        taskConfiguration.setClassLoader(this.userCodeClassLoader);
        this.config = new TaskConfig(taskConfiguration);
        initInputFormat(this.userCodeClassLoader);
        try {
            initOutputs(this.userCodeClassLoader);
            if (LOG.isDebugEnabled()) {
                LOG.debug(getLogString("Finished registering input and output"));
            }
        } catch (Exception e2) {
            throw new RuntimeException("The initialization of the DataSource's outputs caused an error: " + e2.getMessage(), e2);
        }
    }

    /* JADX WARN: Finally extract failed */
    @Override // eu.stratosphere.nephele.template.AbstractInvokable
    public void invoke() throws Exception {
        if (LOG.isInfoEnabled()) {
            LOG.info(getLogString("Start PACT code"));
        }
        try {
            RegularPactTask.openChainedTasks(this.chainedTasks, this);
            Iterator<InputSplit> inputSplits = getInputSplits();
            Object createInstance = this.serializer.createInstance();
            while (!this.taskCanceled && inputSplits.hasNext()) {
                InputSplit next = inputSplits.next();
                if (LOG.isDebugEnabled()) {
                    LOG.debug(getLogString("Opening input split " + next.toString()));
                }
                InputFormat<OT, InputSplit> inputFormat = this.format;
                inputFormat.open(next);
                if (LOG.isDebugEnabled()) {
                    LOG.debug(getLogString("Starting to read input from split " + next.toString()));
                }
                try {
                    if (createInstance.getClass() == Record.class) {
                        Record record = (Record) createInstance;
                        if (this.output instanceof RecordOutputCollector) {
                            RecordOutputCollector recordOutputCollector = (RecordOutputCollector) this.output;
                            while (!this.taskCanceled && !inputFormat.reachedEnd()) {
                                record.clear();
                                if (inputFormat.nextRecord(record)) {
                                    recordOutputCollector.collect(record);
                                }
                            }
                        } else if (this.output instanceof ChainedMapDriver) {
                            ChainedMapDriver chainedMapDriver = (ChainedMapDriver) this.output;
                            while (!this.taskCanceled && !inputFormat.reachedEnd()) {
                                record.clear();
                                if (inputFormat.nextRecord(record)) {
                                    chainedMapDriver.collect(record);
                                }
                            }
                        } else {
                            Collector<OT> collector = this.output;
                            while (!this.taskCanceled && !inputFormat.reachedEnd()) {
                                record.clear();
                                if (inputFormat.nextRecord(record)) {
                                    collector.collect(record);
                                }
                            }
                        }
                    } else if (this.output instanceof OutputCollector) {
                        OutputCollector outputCollector = (OutputCollector) this.output;
                        while (!this.taskCanceled && !inputFormat.reachedEnd()) {
                            if (inputFormat.nextRecord(createInstance)) {
                                outputCollector.collect(createInstance);
                            }
                        }
                    } else if (this.output instanceof ChainedMapDriver) {
                        ChainedMapDriver chainedMapDriver2 = (ChainedMapDriver) this.output;
                        while (!this.taskCanceled && !inputFormat.reachedEnd()) {
                            if (inputFormat.nextRecord(createInstance)) {
                                chainedMapDriver2.collect(createInstance);
                            }
                        }
                    } else {
                        Collector<OT> collector2 = this.output;
                        while (!this.taskCanceled && !inputFormat.reachedEnd()) {
                            if (inputFormat.nextRecord(createInstance)) {
                                collector2.collect(createInstance);
                            }
                        }
                    }
                    if (LOG.isDebugEnabled() && !this.taskCanceled) {
                        LOG.debug(getLogString("Closing input split " + next.toString()));
                    }
                    inputFormat.close();
                } catch (Throwable th) {
                    inputFormat.close();
                    throw th;
                }
            }
            this.output.close();
            RegularPactTask.closeChainedTasks(this.chainedTasks, this);
            RegularPactTask.reportAndClearAccumulators(getEnvironment(), new HashMap(), this.chainedTasks);
        } catch (Exception e) {
            try {
                this.format.close();
            } catch (Throwable th2) {
            }
            RegularPactTask.cancelChainedTasks(this.chainedTasks);
            if (!this.taskCanceled) {
                RegularPactTask.logAndThrowException(e, this);
            }
        }
        if (this.taskCanceled) {
            if (LOG.isWarnEnabled()) {
                LOG.warn(getLogString("PACT code cancelled"));
            }
        } else if (LOG.isInfoEnabled()) {
            LOG.info(getLogString("Finished PACT code"));
        }
    }

    @Override // eu.stratosphere.nephele.template.AbstractInvokable
    public void cancel() throws Exception {
        this.taskCanceled = true;
        if (LOG.isWarnEnabled()) {
            LOG.warn(getLogString("Cancelling PACT code"));
        }
    }

    public void setUserCodeClassLoader(ClassLoader classLoader) {
        this.userCodeClassLoader = classLoader;
    }

    private void initInputFormat(ClassLoader classLoader) {
        this.format = (InputFormat) RegularPactTask.instantiateUserCode(this.config, classLoader, InputFormat.class);
        try {
            this.format.configure(this.config.getStubParameters());
            this.serializer = this.config.getOutputSerializer(classLoader).getSerializer();
        } catch (Throwable th) {
            throw new RuntimeException("The user defined 'configure()' method caused an error: " + th.getMessage(), th);
        }
    }

    private void initOutputs(ClassLoader classLoader) throws Exception {
        this.chainedTasks = new ArrayList<>();
        this.output = RegularPactTask.initOutputs(this, classLoader, this.config, this.chainedTasks, null);
    }

    @Override // eu.stratosphere.nephele.template.AbstractInputTask
    public InputSplit[] computeInputSplits(int i) throws Exception {
        if (this.format == null) {
            throw new IllegalStateException("BUG: Input format hast not been instantiated, yet.");
        }
        return this.format.createInputSplits(i);
    }

    @Override // eu.stratosphere.nephele.template.AbstractInputTask
    public Class<InputSplit> getInputSplitType() {
        if (this.format == null) {
            throw new IllegalStateException("BUG: Input format hast not been instantiated, yet.");
        }
        return this.format.getInputSplitType();
    }

    @Override // eu.stratosphere.nephele.template.AbstractInvokable
    public int getMinimumNumberOfSubtasks() {
        return 1;
    }

    @Override // eu.stratosphere.nephele.template.AbstractInvokable
    public int getMaximumNumberOfSubtasks() {
        return -1;
    }

    private String getLogString(String str) {
        return getLogString(str, getEnvironment().getTaskName());
    }

    private String getLogString(String str, String str2) {
        return RegularPactTask.constructLogString(str, str2, this);
    }
}
