package eu.stratosphere.nephele.jobmanager.splitassigner;

import eu.stratosphere.configuration.GlobalConfiguration;
import eu.stratosphere.core.fs.FileInputSplit;
import eu.stratosphere.core.io.GenericInputSplit;
import eu.stratosphere.core.io.InputSplit;
import eu.stratosphere.nephele.executiongraph.ExecutionGraph;
import eu.stratosphere.nephele.executiongraph.ExecutionGroupVertex;
import eu.stratosphere.nephele.executiongraph.ExecutionGroupVertexIterator;
import eu.stratosphere.nephele.executiongraph.ExecutionVertex;
import eu.stratosphere.nephele.jobmanager.splitassigner.file.FileInputSplitAssigner;
import eu.stratosphere.nephele.template.AbstractInputTask;
import eu.stratosphere.nephele.template.AbstractInvokable;
import eu.stratosphere.util.StringUtils;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

/* loaded from: input_file:eu/stratosphere/nephele/jobmanager/splitassigner/InputSplitManager.class */
public final class InputSplitManager {
    private static final Log LOG = LogFactory.getLog(InputSplitManager.class);
    private static final String INPUT_SPLIT_CONFIG_KEY_PREFIX = "inputsplit.assigner.";
    private final Map<ExecutionGroupVertex, InputSplitAssigner> assignerCache = new ConcurrentHashMap();
    private final Map<Class<? extends InputSplit>, InputSplitAssigner> loadedAssigners = new HashMap();
    private final InputSplitTracker inputSplitTracker = new InputSplitTracker();
    private final InputSplitAssigner defaultAssigner = new DefaultInputSplitAssigner();

    public void registerJob(ExecutionGraph executionGraph) {
        ExecutionGroupVertexIterator executionGroupVertexIterator = new ExecutionGroupVertexIterator(executionGraph, true, -1);
        while (executionGroupVertexIterator.hasNext()) {
            ExecutionGroupVertex next = executionGroupVertexIterator.next();
            InputSplit[] inputSplits = next.getInputSplits();
            if (inputSplits != null && inputSplits.length != 0) {
                AbstractInvokable invokable = next.getEnvironment().getInvokable();
                if (invokable instanceof AbstractInputTask) {
                    InputSplitAssigner assignerByType = getAssignerByType(((AbstractInputTask) invokable).getInputSplitType(), true);
                    this.assignerCache.put(next, assignerByType);
                    assignerByType.registerGroupVertex(next);
                } else {
                    LOG.error(next.getName() + " has " + inputSplits.length + " input splits, but is not of typt AbstractInputTask, ignoring...");
                }
            }
        }
        this.inputSplitTracker.registerJob(executionGraph);
    }

    public void unregisterJob(ExecutionGraph executionGraph) {
        ExecutionGroupVertexIterator executionGroupVertexIterator = new ExecutionGroupVertexIterator(executionGraph, true, -1);
        while (executionGroupVertexIterator.hasNext()) {
            ExecutionGroupVertex next = executionGroupVertexIterator.next();
            InputSplit[] inputSplits = next.getInputSplits();
            if (inputSplits != null && inputSplits.length != 0) {
                InputSplitAssigner remove = this.assignerCache.remove(next);
                if (remove == null) {
                    LOG.error("Group vertex " + next.getName() + " is unregistered, but cannot be found in assigner cache");
                } else {
                    remove.unregisterGroupVertex(next);
                }
            }
        }
        this.inputSplitTracker.unregisterJob(executionGraph);
    }

    public InputSplit getNextInputSplit(ExecutionVertex executionVertex, int i) {
        InputSplit inputSplitFromLog = this.inputSplitTracker.getInputSplitFromLog(executionVertex, i);
        if (inputSplitFromLog != null) {
            LOG.info("Input split " + inputSplitFromLog.getSplitNumber() + " for vertex " + executionVertex + " replayed from log");
            return inputSplitFromLog;
        }
        ExecutionGroupVertex groupVertex = executionVertex.getGroupVertex();
        InputSplitAssigner inputSplitAssigner = this.assignerCache.get(groupVertex);
        if (inputSplitAssigner == null) {
            LOG.error("Cannot find input assigner for group vertex " + groupVertex.getName() + " (job " + groupVertex.getExecutionStage().getExecutionGraph().getJobID() + ")");
            return null;
        }
        InputSplit nextInputSplit = inputSplitAssigner.getNextInputSplit(executionVertex);
        if (nextInputSplit != null) {
            this.inputSplitTracker.addInputSplitToLog(executionVertex, i, nextInputSplit);
            LOG.info(executionVertex + " receives input split " + nextInputSplit.getSplitNumber());
        }
        return nextInputSplit;
    }

    private InputSplitAssigner getAssignerByType(Class<? extends InputSplit> cls, boolean z) {
        synchronized (this.loadedAssigners) {
            InputSplitAssigner inputSplitAssigner = this.loadedAssigners.get(cls);
            if (inputSplitAssigner == null && z) {
                inputSplitAssigner = loadInputSplitAssigner(cls);
                if (inputSplitAssigner != null) {
                    this.loadedAssigners.put(cls, inputSplitAssigner);
                }
            }
            if (inputSplitAssigner != null) {
                return inputSplitAssigner;
            }
            LOG.warn("Unable to find specific input split provider for type " + cls.getName() + ", using default assigner");
            return this.defaultAssigner;
        }
    }

    private InputSplitAssigner loadInputSplitAssigner(Class<? extends InputSplit> cls) {
        String name = cls.getName();
        String str = INPUT_SPLIT_CONFIG_KEY_PREFIX + name;
        LOG.info("Trying to load input split assigner for type " + name);
        String string = GlobalConfiguration.getString(str, (String) null);
        if (string != null) {
            try {
                return (InputSplitAssigner) Class.forName(string).asSubclass(InputSplitAssigner.class).newInstance();
            } catch (Exception e) {
                LOG.error(StringUtils.stringifyException(e));
                return null;
            }
        }
        if (FileInputSplit.class == cls) {
            return new FileInputSplitAssigner();
        }
        if (GenericInputSplit.class == cls) {
            return new DefaultInputSplitAssigner();
        }
        return null;
    }
}
