package eu.stratosphere.nephele.jobmanager.scheduler.queue;

import eu.stratosphere.nephele.execution.ExecutionState;
import eu.stratosphere.nephele.executiongraph.ExecutionGraph;
import eu.stratosphere.nephele.executiongraph.ExecutionGraphIterator;
import eu.stratosphere.nephele.executiongraph.ExecutionStage;
import eu.stratosphere.nephele.executiongraph.ExecutionStageListener;
import eu.stratosphere.nephele.executiongraph.ExecutionVertex;
import eu.stratosphere.nephele.executiongraph.InternalJobStatus;
import eu.stratosphere.nephele.executiongraph.JobStatusListener;
import eu.stratosphere.nephele.instance.InstanceException;
import eu.stratosphere.nephele.instance.InstanceManager;
import eu.stratosphere.nephele.instance.InstanceRequestMap;
import eu.stratosphere.nephele.instance.InstanceType;
import eu.stratosphere.nephele.instance.InstanceTypeDescription;
import eu.stratosphere.nephele.jobgraph.JobID;
import eu.stratosphere.nephele.jobmanager.DeploymentManager;
import eu.stratosphere.nephele.jobmanager.scheduler.AbstractScheduler;
import eu.stratosphere.nephele.jobmanager.scheduler.SchedulingException;
import eu.stratosphere.util.StringUtils;
import java.util.ArrayDeque;
import java.util.Deque;
import java.util.Iterator;
import java.util.Map;

/* loaded from: input_file:eu/stratosphere/nephele/jobmanager/scheduler/queue/QueueScheduler.class */
public class QueueScheduler extends AbstractScheduler implements JobStatusListener, ExecutionStageListener {
    private Deque<ExecutionGraph> jobQueue;

    public QueueScheduler(DeploymentManager deploymentManager, InstanceManager instanceManager) {
        super(deploymentManager, instanceManager);
        this.jobQueue = new ArrayDeque();
    }

    void removeJobFromSchedule(ExecutionGraph executionGraph) {
        boolean z = false;
        synchronized (this.jobQueue) {
            Iterator<ExecutionGraph> it = this.jobQueue.iterator();
            while (true) {
                if (!it.hasNext()) {
                    break;
                }
                if (it.next().getJobID().equals(executionGraph.getJobID())) {
                    z = true;
                    it.remove();
                    break;
                }
            }
        }
        if (z) {
            return;
        }
        LOG.error("Cannot find job " + executionGraph.getJobName() + " (" + executionGraph.getJobID() + ") to remove");
    }

    @Override // eu.stratosphere.nephele.jobmanager.scheduler.AbstractScheduler
    public void schedulJob(ExecutionGraph executionGraph) throws SchedulingException {
        Map<InstanceType, InstanceTypeDescription> mapOfAvailableInstanceTypes = getInstanceManager().getMapOfAvailableInstanceTypes();
        Iterator<ExecutionStage> it = executionGraph.iterator();
        while (it.hasNext()) {
            InstanceRequestMap instanceRequestMap = new InstanceRequestMap();
            it.next().collectRequiredInstanceTypes(instanceRequestMap, ExecutionState.CREATED);
            Iterator<Map.Entry<InstanceType, Integer>> minimumIterator = instanceRequestMap.getMinimumIterator();
            while (minimumIterator.hasNext()) {
                Map.Entry<InstanceType, Integer> next = minimumIterator.next();
                InstanceTypeDescription instanceTypeDescription = mapOfAvailableInstanceTypes.get(next.getKey());
                if (instanceTypeDescription == null) {
                    throw new SchedulingException("Unable to schedule job: No instance of type " + next.getKey() + " available");
                }
                if (instanceTypeDescription.getMaximumNumberOfAvailableInstances() != -1 && instanceTypeDescription.getMaximumNumberOfAvailableInstances() < next.getValue().intValue()) {
                    throw new SchedulingException("Unable to schedule job: " + next.getValue().intValue() + " instances of type " + next.getKey() + " required, but only " + instanceTypeDescription.getMaximumNumberOfAvailableInstances() + " are available");
                }
            }
        }
        executionGraph.registerJobStatusListener(this);
        ExecutionGraphIterator executionGraphIterator = new ExecutionGraphIterator(executionGraph, true);
        while (executionGraphIterator.hasNext()) {
            ExecutionVertex next2 = executionGraphIterator.next();
            next2.registerExecutionListener(new QueueExecutionListener(this, next2));
        }
        executionGraph.registerExecutionStageListener(this);
        synchronized (this.jobQueue) {
            this.jobQueue.add(executionGraph);
        }
        try {
            requestInstances(executionGraph.getCurrentExecutionStage());
        } catch (InstanceException e) {
            String stringifyException = StringUtils.stringifyException(e);
            LOG.error(stringifyException);
            this.jobQueue.remove(executionGraph);
            throw new SchedulingException(stringifyException);
        }
    }

    @Override // eu.stratosphere.nephele.jobmanager.scheduler.AbstractScheduler
    public ExecutionGraph getExecutionGraphByID(JobID jobID) {
        synchronized (this.jobQueue) {
            for (ExecutionGraph executionGraph : this.jobQueue) {
                if (executionGraph.getJobID().equals(jobID)) {
                    return executionGraph;
                }
            }
            return null;
        }
    }

    @Override // eu.stratosphere.nephele.jobmanager.scheduler.AbstractScheduler
    public void shutdown() {
        synchronized (this.jobQueue) {
            this.jobQueue.clear();
        }
    }

    @Override // eu.stratosphere.nephele.executiongraph.JobStatusListener
    public void jobStatusHasChanged(ExecutionGraph executionGraph, InternalJobStatus internalJobStatus, String str) {
        if (internalJobStatus == InternalJobStatus.FAILED || internalJobStatus == InternalJobStatus.FINISHED || internalJobStatus == InternalJobStatus.CANCELED) {
            removeJobFromSchedule(executionGraph);
        }
    }

    @Override // eu.stratosphere.nephele.executiongraph.ExecutionStageListener
    public void nextExecutionStageEntered(JobID jobID, ExecutionStage executionStage) {
        try {
            requestInstances(executionStage);
        } catch (InstanceException e) {
            LOG.error(StringUtils.stringifyException(e));
        }
        deployAssignedInputVertices(executionStage.getExecutionGraph());
    }
}
