package eu.stratosphere.nephele.jobmanager.scheduler;

import eu.stratosphere.nephele.execution.ExecutionState;
import eu.stratosphere.nephele.executiongraph.ExecutionGate;
import eu.stratosphere.nephele.executiongraph.ExecutionGraph;
import eu.stratosphere.nephele.executiongraph.ExecutionGraphIterator;
import eu.stratosphere.nephele.executiongraph.ExecutionGroupVertex;
import eu.stratosphere.nephele.executiongraph.ExecutionGroupVertexIterator;
import eu.stratosphere.nephele.executiongraph.ExecutionPipeline;
import eu.stratosphere.nephele.executiongraph.ExecutionStage;
import eu.stratosphere.nephele.executiongraph.ExecutionVertex;
import eu.stratosphere.nephele.executiongraph.ExecutionVertexID;
import eu.stratosphere.nephele.executiongraph.InternalJobStatus;
import eu.stratosphere.nephele.instance.AbstractInstance;
import eu.stratosphere.nephele.instance.AllocatedResource;
import eu.stratosphere.nephele.instance.AllocationID;
import eu.stratosphere.nephele.instance.DummyInstance;
import eu.stratosphere.nephele.instance.InstanceException;
import eu.stratosphere.nephele.instance.InstanceListener;
import eu.stratosphere.nephele.instance.InstanceManager;
import eu.stratosphere.nephele.instance.InstanceRequestMap;
import eu.stratosphere.nephele.instance.InstanceType;
import eu.stratosphere.nephele.io.channels.ChannelType;
import eu.stratosphere.nephele.jobgraph.JobID;
import eu.stratosphere.nephele.jobmanager.DeploymentManager;
import eu.stratosphere.nephele.profiling.ProfilingUtils;
import eu.stratosphere.util.StringUtils;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

/* loaded from: input_file:eu/stratosphere/nephele/jobmanager/scheduler/AbstractScheduler.class */
public abstract class AbstractScheduler implements InstanceListener {
    protected static final Log LOG = LogFactory.getLog(AbstractScheduler.class);
    private final InstanceManager instanceManager;
    private final DeploymentManager deploymentManager;
    private final Map<ExecutionVertexID, ExecutionVertex> verticesToBeRestarted = new ConcurrentHashMap();

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: eu.stratosphere.nephele.jobmanager.scheduler.AbstractScheduler$3, reason: invalid class name */
    /* loaded from: input_file:eu/stratosphere/nephele/jobmanager/scheduler/AbstractScheduler$3.class */
    public static /* synthetic */ class AnonymousClass3 {
        static final /* synthetic */ int[] $SwitchMap$eu$stratosphere$nephele$io$channels$ChannelType;
        static final /* synthetic */ int[] $SwitchMap$eu$stratosphere$nephele$execution$ExecutionState = new int[ExecutionState.values().length];

        static {
            try {
                $SwitchMap$eu$stratosphere$nephele$execution$ExecutionState[ExecutionState.ASSIGNED.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$eu$stratosphere$nephele$execution$ExecutionState[ExecutionState.READY.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$eu$stratosphere$nephele$execution$ExecutionState[ExecutionState.STARTING.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
            try {
                $SwitchMap$eu$stratosphere$nephele$execution$ExecutionState[ExecutionState.RUNNING.ordinal()] = 4;
            } catch (NoSuchFieldError e4) {
            }
            try {
                $SwitchMap$eu$stratosphere$nephele$execution$ExecutionState[ExecutionState.FINISHING.ordinal()] = 5;
            } catch (NoSuchFieldError e5) {
            }
            $SwitchMap$eu$stratosphere$nephele$io$channels$ChannelType = new int[ChannelType.values().length];
            try {
                $SwitchMap$eu$stratosphere$nephele$io$channels$ChannelType[ChannelType.NETWORK.ordinal()] = 1;
            } catch (NoSuchFieldError e6) {
            }
            try {
                $SwitchMap$eu$stratosphere$nephele$io$channels$ChannelType[ChannelType.INMEMORY.ordinal()] = 2;
            } catch (NoSuchFieldError e7) {
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public AbstractScheduler(DeploymentManager deploymentManager, InstanceManager instanceManager) {
        this.deploymentManager = deploymentManager;
        this.instanceManager = instanceManager;
        this.instanceManager.setInstanceListener(this);
    }

    public abstract void schedulJob(ExecutionGraph executionGraph) throws SchedulingException;

    public abstract ExecutionGraph getExecutionGraphByID(JobID jobID);

    public InstanceManager getInstanceManager() {
        return this.instanceManager;
    }

    public abstract void shutdown();

    /* JADX INFO: Access modifiers changed from: protected */
    public void requestInstances(ExecutionStage executionStage) throws InstanceException {
        ExecutionGraph executionGraph = executionStage.getExecutionGraph();
        InstanceRequestMap instanceRequestMap = new InstanceRequestMap();
        synchronized (executionStage) {
            executionStage.collectRequiredInstanceTypes(instanceRequestMap, ExecutionState.CREATED);
            Iterator<Map.Entry<InstanceType, Integer>> minimumIterator = instanceRequestMap.getMinimumIterator();
            LOG.info("Requesting the following instances for job " + executionGraph.getJobID());
            while (minimumIterator.hasNext()) {
                Map.Entry<InstanceType, Integer> next = minimumIterator.next();
                LOG.info(" " + next.getKey() + " [" + next.getValue().intValue() + ", " + instanceRequestMap.getMaximumNumberOfInstances(next.getKey()) + "]");
            }
            if (instanceRequestMap.isEmpty()) {
                return;
            }
            this.instanceManager.requestInstance(executionGraph.getJobID(), executionGraph.getJobConfiguration(), instanceRequestMap, null);
            ExecutionGraphIterator executionGraphIterator = new ExecutionGraphIterator(executionGraph, executionGraph.getIndexOfCurrentExecutionStage(), true, true);
            while (executionGraphIterator.hasNext()) {
                executionGraphIterator.next().compareAndUpdateExecutionState(ExecutionState.CREATED, ExecutionState.SCHEDULED);
            }
        }
    }

    void findVerticesToBeDeployed(ExecutionVertex executionVertex, Map<AbstractInstance, List<ExecutionVertex>> map, Set<ExecutionVertex> set) {
        boolean z;
        if (set.add(executionVertex)) {
            if (executionVertex.compareAndUpdateExecutionState(ExecutionState.ASSIGNED, ExecutionState.READY)) {
                AbstractInstance allocatedResource = executionVertex.getAllocatedResource().getInstance();
                if (allocatedResource instanceof DummyInstance) {
                    LOG.error("Inconsistency: Vertex " + executionVertex + " is about to be deployed on a DummyInstance");
                }
                List<ExecutionVertex> list = map.get(allocatedResource);
                if (list == null) {
                    list = new ArrayList();
                    map.put(allocatedResource, list);
                }
                list.add(executionVertex);
            }
            int numberOfOutputGates = executionVertex.getNumberOfOutputGates();
            for (int i = 0; i < numberOfOutputGates; i++) {
                ExecutionGate outputGate = executionVertex.getOutputGate(i);
                switch (AnonymousClass3.$SwitchMap$eu$stratosphere$nephele$io$channels$ChannelType[outputGate.getChannelType().ordinal()]) {
                    case 1:
                        z = false;
                        break;
                    case ProfilingUtils.DEFAULT_TASKMANAGER_REPORTINTERVAL /* 2 */:
                        z = true;
                        break;
                    default:
                        throw new IllegalStateException("Unknown channel type");
                }
                if (z) {
                    int numberOfEdges = outputGate.getNumberOfEdges();
                    for (int i2 = 0; i2 < numberOfEdges; i2++) {
                        findVerticesToBeDeployed(outputGate.getEdge(i2).getInputGate().getVertex(), map, set);
                    }
                }
            }
        }
    }

    public void deployAssignedVertices(ExecutionVertex executionVertex) {
        JobID jobID = executionVertex.getExecutionGraph().getJobID();
        HashMap hashMap = new HashMap();
        findVerticesToBeDeployed(executionVertex, hashMap, new HashSet());
        if (hashMap.isEmpty()) {
            return;
        }
        for (Map.Entry<AbstractInstance, List<ExecutionVertex>> entry : hashMap.entrySet()) {
            this.deploymentManager.deploy(jobID, entry.getKey(), entry.getValue());
        }
    }

    public void deployAssignedPipeline(ExecutionPipeline executionPipeline) {
        HashMap hashMap = new HashMap();
        HashSet hashSet = new HashSet();
        Iterator<ExecutionVertex> it = executionPipeline.iterator();
        while (it.hasNext()) {
            findVerticesToBeDeployed(it.next(), hashMap, hashSet);
        }
        if (hashMap.isEmpty()) {
            return;
        }
        for (Map.Entry<AbstractInstance, List<ExecutionVertex>> entry : hashMap.entrySet()) {
            this.deploymentManager.deploy(null, entry.getKey(), entry.getValue());
        }
    }

    public void deployAssignedVertices(Collection<ExecutionVertex> collection) {
        JobID jobID = null;
        HashMap hashMap = new HashMap();
        HashSet hashSet = new HashSet();
        for (ExecutionVertex executionVertex : collection) {
            if (jobID == null) {
                jobID = executionVertex.getExecutionGraph().getJobID();
            }
            findVerticesToBeDeployed(executionVertex, hashMap, hashSet);
        }
        if (hashMap.isEmpty()) {
            return;
        }
        for (Map.Entry<AbstractInstance, List<ExecutionVertex>> entry : hashMap.entrySet()) {
            this.deploymentManager.deploy(jobID, entry.getKey(), entry.getValue());
        }
    }

    public void deployAssignedInputVertices(ExecutionGraph executionGraph) {
        HashMap hashMap = new HashMap();
        ExecutionStage currentExecutionStage = executionGraph.getCurrentExecutionStage();
        HashSet hashSet = new HashSet();
        for (int i = 0; i < currentExecutionStage.getNumberOfStageMembers(); i++) {
            ExecutionGroupVertex stageMember = currentExecutionStage.getStageMember(i);
            if (stageMember.isInputVertex()) {
                for (int i2 = 0; i2 < stageMember.getCurrentNumberOfGroupMembers(); i2++) {
                    findVerticesToBeDeployed(stageMember.getGroupMember(i2), hashMap, hashSet);
                }
            }
        }
        if (hashMap.isEmpty()) {
            return;
        }
        for (Map.Entry<AbstractInstance, List<ExecutionVertex>> entry : hashMap.entrySet()) {
            this.deploymentManager.deploy(executionGraph.getJobID(), entry.getKey(), entry.getValue());
        }
    }

    @Override // eu.stratosphere.nephele.instance.InstanceListener
    public void resourcesAllocated(final JobID jobID, final List<AllocatedResource> list) {
        if (list == null) {
            LOG.error("Resource to lock is null!");
            return;
        }
        Iterator<AllocatedResource> it = list.iterator();
        while (it.hasNext()) {
            if (it.next().getInstance() instanceof DummyInstance) {
                LOG.debug("Available instance is of type DummyInstance!");
                return;
            }
        }
        final ExecutionGraph executionGraphByID = getExecutionGraphByID(jobID);
        if (executionGraphByID != null) {
            executionGraphByID.executeCommand(new Runnable() { // from class: eu.stratosphere.nephele.jobmanager.scheduler.AbstractScheduler.1
                @Override // java.lang.Runnable
                public void run() {
                    ExecutionStage currentExecutionStage = executionGraphByID.getCurrentExecutionStage();
                    synchronized (currentExecutionStage) {
                        for (AllocatedResource allocatedResource : list) {
                            AllocatedResource allocatedResource2 = null;
                            ExecutionGroupVertexIterator executionGroupVertexIterator = new ExecutionGroupVertexIterator(executionGraphByID, true, currentExecutionStage.getStageNumber());
                            while (executionGroupVertexIterator.hasNext()) {
                                ExecutionGroupVertex next = executionGroupVertexIterator.next();
                                int i = 0;
                                while (true) {
                                    if (i >= next.getCurrentNumberOfGroupMembers()) {
                                        break;
                                    }
                                    ExecutionVertex groupMember = next.getGroupMember(i);
                                    if (groupMember.getExecutionState() == ExecutionState.SCHEDULED && groupMember.getAllocatedResource() != null && groupMember.getAllocatedResource().getInstanceType().equals(allocatedResource.getInstanceType())) {
                                        allocatedResource2 = groupMember.getAllocatedResource();
                                        break;
                                    }
                                    i++;
                                }
                                if (allocatedResource2 != null) {
                                    break;
                                }
                            }
                            if (allocatedResource2 == null) {
                                AbstractScheduler.LOG.error("Instance " + allocatedResource.getInstance() + " is not required for job" + executionGraphByID.getJobID());
                                try {
                                    AbstractScheduler.this.getInstanceManager().releaseAllocatedResource(jobID, executionGraphByID.getJobConfiguration(), allocatedResource);
                                } catch (InstanceException e) {
                                    AbstractScheduler.LOG.error(e);
                                }
                                return;
                            }
                            Iterator<ExecutionVertex> assignedVertices = allocatedResource2.assignedVertices();
                            while (assignedVertices.hasNext()) {
                                ExecutionVertex next2 = assignedVertices.next();
                                next2.setAllocatedResource(allocatedResource);
                                next2.updateExecutionState(ExecutionState.ASSIGNED);
                            }
                        }
                        AbstractScheduler.this.deployAssignedInputVertices(executionGraphByID);
                    }
                }
            });
            return;
        }
        try {
            Iterator<AllocatedResource> it2 = list.iterator();
            while (it2.hasNext()) {
                getInstanceManager().releaseAllocatedResource(jobID, null, it2.next());
            }
        } catch (InstanceException e) {
            LOG.error(e);
        }
    }

    public void checkAndReleaseAllocatedResource(ExecutionGraph executionGraph, AllocatedResource allocatedResource) {
        if (allocatedResource == null) {
            LOG.error("Resource to lock is null!");
            return;
        }
        if (allocatedResource.getInstance() instanceof DummyInstance) {
            LOG.debug("Available instance is of type DummyInstance!");
            return;
        }
        boolean z = true;
        Iterator<ExecutionVertex> assignedVertices = allocatedResource.assignedVertices();
        while (true) {
            if (!assignedVertices.hasNext()) {
                break;
            }
            ExecutionState executionState = assignedVertices.next().getExecutionState();
            if (executionState != ExecutionState.CREATED && executionState != ExecutionState.FINISHED && executionState != ExecutionState.FAILED && executionState != ExecutionState.CANCELED) {
                z = false;
                break;
            }
        }
        if (z) {
            LOG.info("Releasing instance " + allocatedResource.getInstance());
            try {
                getInstanceManager().releaseAllocatedResource(executionGraph.getJobID(), executionGraph.getJobConfiguration(), allocatedResource);
            } catch (InstanceException e) {
                LOG.error(StringUtils.stringifyException(e));
            }
        }
    }

    DeploymentManager getDeploymentManager() {
        return this.deploymentManager;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void replayCheckpointsFromPreviousStage(ExecutionGraph executionGraph) {
        ExecutionStage stage = executionGraph.getStage(executionGraph.getIndexOfCurrentExecutionStage() - 1);
        ArrayList arrayList = new ArrayList();
        for (int i = 0; i < stage.getNumberOfOutputExecutionVertices(); i++) {
            ExecutionVertex outputExecutionVertex = stage.getOutputExecutionVertex(i);
            outputExecutionVertex.updateExecutionState(ExecutionState.ASSIGNED);
            arrayList.add(outputExecutionVertex);
        }
        deployAssignedVertices(arrayList);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Map<ExecutionVertexID, ExecutionVertex> getVerticesToBeRestarted() {
        return this.verticesToBeRestarted;
    }

    @Override // eu.stratosphere.nephele.instance.InstanceListener
    public void allocatedResourcesDied(final JobID jobID, final List<AllocatedResource> list) {
        final ExecutionGraph executionGraphByID = getExecutionGraphByID(jobID);
        if (executionGraphByID == null) {
            LOG.error("Cannot find execution graph for job with ID " + jobID);
        } else {
            executionGraphByID.executeCommand(new Runnable() { // from class: eu.stratosphere.nephele.jobmanager.scheduler.AbstractScheduler.2
                @Override // java.lang.Runnable
                public void run() {
                    synchronized (executionGraphByID) {
                        for (AllocatedResource allocatedResource : list) {
                            AbstractScheduler.LOG.info("Resource " + allocatedResource.getInstance().getName() + " for Job " + jobID + " died.");
                            if (AbstractScheduler.this.getExecutionGraphByID(jobID) == null) {
                                AbstractScheduler.LOG.error("Cannot find execution graph for job " + jobID);
                                return;
                            }
                            Iterator<ExecutionVertex> assignedVertices = allocatedResource.assignedVertices();
                            AllocatedResource allocatedResource2 = new AllocatedResource(DummyInstance.createDummyInstance(allocatedResource.getInstance().getType()), allocatedResource.getInstanceType(), new AllocationID());
                            while (assignedVertices.hasNext()) {
                                assignedVertices.next().setAllocatedResource(allocatedResource2);
                            }
                            String str = allocatedResource.getInstance().getName() + " died";
                            Iterator<ExecutionVertex> assignedVertices2 = allocatedResource.assignedVertices();
                            while (assignedVertices2.hasNext()) {
                                ExecutionVertex next = assignedVertices2.next();
                                switch (AnonymousClass3.$SwitchMap$eu$stratosphere$nephele$execution$ExecutionState[next.getExecutionState().ordinal()]) {
                                    case 1:
                                    case ProfilingUtils.DEFAULT_TASKMANAGER_REPORTINTERVAL /* 2 */:
                                    case 3:
                                    case 4:
                                    case 5:
                                        next.updateExecutionState(ExecutionState.FAILED, str);
                                        break;
                                }
                            }
                        }
                        InternalJobStatus jobStatus = executionGraphByID.getJobStatus();
                        if (jobStatus == InternalJobStatus.FAILING || jobStatus == InternalJobStatus.FAILED) {
                            return;
                        }
                        try {
                            AbstractScheduler.this.requestInstances(executionGraphByID.getCurrentExecutionStage());
                        } catch (InstanceException e) {
                            e.printStackTrace();
                        }
                    }
                }
            });
        }
    }
}
