package eu.stratosphere.pact.runtime.task;

import eu.stratosphere.api.common.io.OutputFormat;
import eu.stratosphere.api.common.typeutils.TypeComparatorFactory;
import eu.stratosphere.api.common.typeutils.TypeSerializer;
import eu.stratosphere.api.java.record.io.FileOutputFormat;
import eu.stratosphere.configuration.Configuration;
import eu.stratosphere.core.fs.FileStatus;
import eu.stratosphere.core.fs.FileSystem;
import eu.stratosphere.core.fs.Path;
import eu.stratosphere.nephele.execution.librarycache.LibraryCacheManager;
import eu.stratosphere.nephele.io.MutableReader;
import eu.stratosphere.nephele.io.MutableRecordReader;
import eu.stratosphere.nephele.io.MutableUnionRecordReader;
import eu.stratosphere.nephele.profiling.ProfilingUtils;
import eu.stratosphere.nephele.template.AbstractOutputTask;
import eu.stratosphere.pact.runtime.plugable.pactrecord.RecordSerializer;
import eu.stratosphere.pact.runtime.sort.UnilateralSortMerger;
import eu.stratosphere.pact.runtime.task.util.CloseableInputProvider;
import eu.stratosphere.pact.runtime.task.util.LocalStrategy;
import eu.stratosphere.pact.runtime.task.util.ReaderIterator;
import eu.stratosphere.pact.runtime.task.util.RecordReaderIterator;
import eu.stratosphere.pact.runtime.task.util.TaskConfig;
import eu.stratosphere.types.Record;
import eu.stratosphere.util.MutableObjectIterator;
import java.io.FileNotFoundException;
import java.io.IOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

/* loaded from: input_file:eu/stratosphere/pact/runtime/task/DataSinkTask.class */
public class DataSinkTask<IT> extends AbstractOutputTask {
    public static final String DEGREE_OF_PARALLELISM_KEY = "pact.sink.dop";
    private static final Log LOG = LogFactory.getLog(DataSinkTask.class);
    private volatile OutputFormat<IT> format;
    private MutableObjectIterator<IT> reader;
    private MutableObjectIterator<IT> input;
    private TypeSerializer<IT> inputTypeSerializer;
    private CloseableInputProvider<IT> localStrategy;
    private TaskConfig config;
    private ClassLoader userCodeClassLoader;
    private volatile boolean taskCanceled;

    /* renamed from: eu.stratosphere.pact.runtime.task.DataSinkTask$1, reason: invalid class name */
    /* loaded from: input_file:eu/stratosphere/pact/runtime/task/DataSinkTask$1.class */
    static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$eu$stratosphere$pact$runtime$task$util$LocalStrategy = new int[LocalStrategy.values().length];

        static {
            try {
                $SwitchMap$eu$stratosphere$pact$runtime$task$util$LocalStrategy[LocalStrategy.NONE.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$eu$stratosphere$pact$runtime$task$util$LocalStrategy[LocalStrategy.SORT.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
        }
    }

    @Override // eu.stratosphere.nephele.template.AbstractInvokable
    public void registerInputOutput() {
        if (LOG.isDebugEnabled()) {
            LOG.debug(getLogString("Start registering input and output"));
        }
        initOutputFormat();
        try {
            initInputReaders();
            if (LOG.isDebugEnabled()) {
                LOG.debug(getLogString("Finished registering input and output"));
            }
        } catch (Exception e) {
            throw new RuntimeException(new StringBuilder().append("Initializing the input streams failed").append(e.getMessage()).toString() == null ? "." : ": " + e.getMessage(), e);
        }
    }

    @Override // eu.stratosphere.nephele.template.AbstractInvokable
    public void invoke() throws Exception {
        MutableObjectIterator<IT> mutableObjectIterator;
        FileOutputFormat fileOutputFormat;
        Object createInstance;
        if (LOG.isInfoEnabled()) {
            LOG.info(getLogString("Start PACT code"));
        }
        try {
            try {
                switch (AnonymousClass1.$SwitchMap$eu$stratosphere$pact$runtime$task$util$LocalStrategy[this.config.getInputLocalStrategy(0).ordinal()]) {
                    case 1:
                        this.localStrategy = null;
                        this.input = this.reader;
                        break;
                    case ProfilingUtils.DEFAULT_TASKMANAGER_REPORTINTERVAL /* 2 */:
                        try {
                            TypeComparatorFactory inputComparator = this.config.getInputComparator(0, this.userCodeClassLoader);
                            if (inputComparator != null) {
                                UnilateralSortMerger unilateralSortMerger = new UnilateralSortMerger(getEnvironment().getMemoryManager(), getEnvironment().getIOManager(), this.reader, this, this.inputTypeSerializer, inputComparator.createComparator(), this.config.getMemoryInput(0), this.config.getFilehandlesInput(0), this.config.getSpillingThresholdInput(0));
                                this.localStrategy = unilateralSortMerger;
                                this.input = unilateralSortMerger.getIterator();
                                break;
                            } else {
                                throw new Exception("Missing comparator factory for local strategy on input 0");
                            }
                        } catch (Exception e) {
                            throw new RuntimeException(new StringBuilder().append("Initializing the input processing failed").append(e.getMessage()).toString() == null ? "." : ": " + e.getMessage(), e);
                        }
                    default:
                        throw new RuntimeException("Invalid local strategy for DataSinkTask");
                }
                mutableObjectIterator = this.input;
                fileOutputFormat = this.format;
                createInstance = this.inputTypeSerializer.createInstance();
            } catch (Exception e2) {
                if (!this.taskCanceled) {
                    if (LOG.isErrorEnabled()) {
                        LOG.error(getLogString("Error in Pact user code: " + e2.getMessage()), e2);
                    }
                    throw e2;
                }
                if (this.format != null) {
                    try {
                        this.format.close();
                    } catch (Throwable th) {
                        if (LOG.isWarnEnabled()) {
                            LOG.warn(getLogString("Error closing the ouput format."), th);
                        }
                    }
                }
                if (this.localStrategy != null) {
                    try {
                        this.localStrategy.close();
                    } catch (Throwable th2) {
                        LOG.error("Error closing local strategy", th2);
                    }
                }
            }
            if (this.taskCanceled) {
                if (this.format != null) {
                    try {
                        this.format.close();
                    } catch (Throwable th3) {
                        if (LOG.isWarnEnabled()) {
                            LOG.warn(getLogString("Error closing the ouput format."), th3);
                        }
                    }
                }
                if (this.localStrategy != null) {
                    try {
                        this.localStrategy.close();
                        return;
                    } catch (Throwable th4) {
                        LOG.error("Error closing local strategy", th4);
                        return;
                    }
                }
                return;
            }
            if (LOG.isDebugEnabled()) {
                LOG.debug(getLogString("Starting to produce output"));
            }
            fileOutputFormat.open(getEnvironment().getIndexInSubtaskGroup() + 1);
            if (createInstance.getClass() == Record.class && (fileOutputFormat instanceof FileOutputFormat)) {
                Record record = (Record) createInstance;
                FileOutputFormat fileOutputFormat2 = fileOutputFormat;
                while (!this.taskCanceled && mutableObjectIterator.next(record)) {
                    fileOutputFormat2.writeRecord(record);
                }
            } else {
                while (!this.taskCanceled && mutableObjectIterator.next(createInstance)) {
                    fileOutputFormat.writeRecord(createInstance);
                }
            }
            if (!this.taskCanceled) {
                this.format.close();
                this.format = null;
            }
            if (this.format != null) {
                try {
                    this.format.close();
                } catch (Throwable th5) {
                    if (LOG.isWarnEnabled()) {
                        LOG.warn(getLogString("Error closing the ouput format."), th5);
                    }
                }
            }
            if (this.localStrategy != null) {
                try {
                    this.localStrategy.close();
                } catch (Throwable th6) {
                    LOG.error("Error closing local strategy", th6);
                }
            }
            if (this.taskCanceled) {
                if (LOG.isWarnEnabled()) {
                    LOG.warn(getLogString("PACT code cancelled"));
                }
            } else {
                if (LOG.isDebugEnabled()) {
                    LOG.debug(getLogString("Finished producing output"));
                }
                if (LOG.isInfoEnabled()) {
                    LOG.info(getLogString("Finished PACT code"));
                }
            }
        } catch (Throwable th7) {
            if (this.format != null) {
                try {
                    this.format.close();
                } catch (Throwable th8) {
                    if (LOG.isWarnEnabled()) {
                        LOG.warn(getLogString("Error closing the ouput format."), th8);
                    }
                }
            }
            if (this.localStrategy != null) {
                try {
                    this.localStrategy.close();
                } catch (Throwable th9) {
                    LOG.error("Error closing local strategy", th9);
                }
            }
            throw th7;
        }
    }

    @Override // eu.stratosphere.nephele.template.AbstractInvokable
    public void cancel() throws Exception {
        this.taskCanceled = true;
        if (this.format != null) {
            try {
                this.format.close();
            } catch (Throwable th) {
            }
        }
        if (LOG.isWarnEnabled()) {
            LOG.warn(getLogString("Cancelling PACT code"));
        }
    }

    public void setUserCodeClassLoader(ClassLoader classLoader) {
        this.userCodeClassLoader = classLoader;
    }

    private void initOutputFormat() {
        if (this.userCodeClassLoader == null) {
            try {
                this.userCodeClassLoader = LibraryCacheManager.getClassLoader(getEnvironment().getJobID());
            } catch (IOException e) {
                throw new RuntimeException("Library cache manager could not be instantiated.", e);
            }
        }
        Configuration taskConfiguration = getTaskConfiguration();
        taskConfiguration.setClassLoader(this.userCodeClassLoader);
        this.config = new TaskConfig(taskConfiguration);
        try {
            this.format = (OutputFormat) this.config.getStubWrapper(this.userCodeClassLoader).getUserCodeObject(OutputFormat.class, this.userCodeClassLoader);
            if (!OutputFormat.class.isAssignableFrom(this.format.getClass())) {
                throw new RuntimeException("The class '" + this.format.getClass().getName() + "' is not a subclass of '" + OutputFormat.class.getName() + "' as is required.");
            }
            try {
                this.format.configure(this.config.getStubParameters());
            } catch (Throwable th) {
                throw new RuntimeException("The user defined 'configure()' method in the Output Format caused an error: " + th.getMessage(), th);
            }
        } catch (ClassCastException e2) {
            throw new RuntimeException("The stub class is not a proper subclass of " + OutputFormat.class.getName(), e2);
        }
    }

    private void initInputReaders() throws Exception {
        MutableReader mutableUnionRecordReader;
        int groupSize = this.config.getGroupSize(0);
        int i = 0 + groupSize;
        if (groupSize == 1) {
            mutableUnionRecordReader = new MutableRecordReader(this);
        } else {
            if (groupSize <= 1) {
                throw new Exception("Illegal input group size in task configuration: " + groupSize);
            }
            MutableRecordReader[] mutableRecordReaderArr = new MutableRecordReader[groupSize];
            for (int i2 = 0; i2 < groupSize; i2++) {
                mutableRecordReaderArr[i2] = new MutableRecordReader(this);
            }
            mutableUnionRecordReader = new MutableUnionRecordReader(mutableRecordReaderArr);
        }
        this.inputTypeSerializer = this.config.getInputSerializer(0, this.userCodeClassLoader).getSerializer();
        if (this.inputTypeSerializer.getClass() == RecordSerializer.class) {
            this.reader = new RecordReaderIterator(mutableUnionRecordReader);
        } else {
            this.reader = new ReaderIterator(mutableUnionRecordReader, this.inputTypeSerializer);
        }
        if (i != this.config.getNumInputs()) {
            throw new Exception("Illegal configuration: Number of input gates and group sizes are not consistent.");
        }
    }

    @Override // eu.stratosphere.nephele.template.AbstractInvokable
    public int getMaximumNumberOfSubtasks() {
        if (!(this.format instanceof eu.stratosphere.api.common.io.FileOutputFormat)) {
            return -1;
        }
        String stubParameter = this.config.getStubParameter("pact.output.file", null);
        if (stubParameter == null) {
            return 0;
        }
        try {
            Path path = new Path(stubParameter);
            try {
                FileSystem fileSystem = path.getFileSystem();
                try {
                    FileStatus fileStatus = fileSystem.getFileStatus(path);
                    if (fileStatus == null) {
                        return 1;
                    }
                    if (fileStatus.isDir()) {
                        return -1;
                    }
                    fileSystem.delete(path, false);
                    if (getTaskConfiguration().getInteger(DEGREE_OF_PARALLELISM_KEY, -1) == 1) {
                        return 1;
                    }
                    fileSystem.mkdirs(path);
                    return -1;
                } catch (FileNotFoundException e) {
                    if (getTaskConfiguration().getInteger(DEGREE_OF_PARALLELISM_KEY, -1) == 1) {
                        return 1;
                    }
                    fileSystem.mkdirs(path);
                    return -1;
                }
            } catch (IOException e2) {
                LOG.error("Could not access the file system to detemine the status of the output.", e2);
                throw new RuntimeException("I/O Error while accessing file", e2);
            }
        } catch (Throwable th) {
            return 0;
        }
    }

    private String getLogString(String str) {
        return RegularPactTask.constructLogString(str, getEnvironment().getTaskName(), this);
    }
}
