package eu.stratosphere.nephele.jobmanager.scheduler;

import eu.stratosphere.nephele.execution.ExecutionState;
import eu.stratosphere.nephele.executiongraph.ExecutionEdge;
import eu.stratosphere.nephele.executiongraph.ExecutionGate;
import eu.stratosphere.nephele.executiongraph.ExecutionVertex;
import eu.stratosphere.nephele.executiongraph.ExecutionVertexID;
import eu.stratosphere.nephele.instance.AbstractInstance;
import eu.stratosphere.nephele.instance.DummyInstance;
import eu.stratosphere.nephele.taskmanager.AbstractTaskResult;
import eu.stratosphere.nephele.taskmanager.TaskCancelResult;
import eu.stratosphere.nephele.util.SerializableHashSet;
import eu.stratosphere.runtime.io.channels.ChannelID;
import eu.stratosphere.util.StringUtils;
import java.io.IOException;
import java.util.ArrayDeque;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

/* loaded from: input_file:eu/stratosphere/nephele/jobmanager/scheduler/RecoveryLogic.class */
public final class RecoveryLogic {
    private static final Log LOG = LogFactory.getLog(RecoveryLogic.class);

    private RecoveryLogic() {
    }

    public static boolean recover(ExecutionVertex executionVertex, Map<ExecutionVertexID, ExecutionVertex> map, Set<ExecutionVertex> set) {
        if (executionVertex.getExecutionState() != ExecutionState.FAILED) {
            LOG.error("Vertex " + executionVertex + " is requested to be recovered, but is not failed");
            return false;
        }
        synchronized (executionVertex.getExecutionGraph()) {
            LOG.info("Starting recovery for failed vertex " + executionVertex);
            HashSet<ExecutionVertex> hashSet = new HashSet();
            findVerticesToRestart(executionVertex, hashSet);
            for (ExecutionVertex executionVertex2 : hashSet) {
                if (executionVertex2.compareAndUpdateExecutionState(ExecutionState.FINISHED, getStateToUpdate(executionVertex2))) {
                    LOG.info("Vertex " + executionVertex2 + " has already finished and will not be canceled");
                    if (executionVertex2.getExecutionState() == ExecutionState.ASSIGNED) {
                        set.add(executionVertex2);
                    }
                } else {
                    LOG.info(executionVertex2 + " is canceled by recovery logic");
                    map.put(executionVertex2.getID(), executionVertex2);
                    TaskCancelResult cancelTask = executionVertex2.cancelTask();
                    if (cancelTask.getReturnCode() != AbstractTaskResult.ReturnCode.SUCCESS && cancelTask.getReturnCode() != AbstractTaskResult.ReturnCode.TASK_NOT_FOUND) {
                        map.remove(executionVertex2.getID());
                        LOG.error("Unable to cancel vertex" + cancelTask.getDescription());
                        return false;
                    }
                }
            }
            LOG.info("Starting cache invalidation");
            if (!invalidateReceiverLookupCaches(executionVertex, hashSet)) {
                return false;
            }
            LOG.info("Cache invalidation complete");
            executionVertex.updateExecutionState(getStateToUpdate(executionVertex));
            if (executionVertex.getExecutionState() == ExecutionState.ASSIGNED) {
                set.add(executionVertex);
            }
            return true;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static boolean hasInstanceAssigned(ExecutionVertex executionVertex) {
        return !(executionVertex.getAllocatedResource().getInstance() instanceof DummyInstance);
    }

    private static ExecutionState getStateToUpdate(ExecutionVertex executionVertex) {
        return hasInstanceAssigned(executionVertex) ? ExecutionState.ASSIGNED : ExecutionState.CREATED;
    }

    private static void findVerticesToRestart(ExecutionVertex executionVertex, Set<ExecutionVertex> set) {
        ArrayDeque arrayDeque = new ArrayDeque();
        HashSet hashSet = new HashSet();
        arrayDeque.add(executionVertex);
        while (!arrayDeque.isEmpty()) {
            ExecutionVertex executionVertex2 = (ExecutionVertex) arrayDeque.poll();
            for (int i = 0; i < executionVertex2.getNumberOfPredecessors(); i++) {
                ExecutionVertex predecessor = executionVertex2.getPredecessor(i);
                if (hasInstanceAssigned(predecessor)) {
                    set.add(predecessor);
                }
                if (!hashSet.contains(predecessor)) {
                    arrayDeque.add(predecessor);
                }
            }
            hashSet.add(executionVertex2);
        }
    }

    private static final boolean invalidateReceiverLookupCaches(ExecutionVertex executionVertex, Set<ExecutionVertex> set) {
        HashMap hashMap = new HashMap();
        collectCacheEntriesToInvalidate(executionVertex, hashMap);
        Iterator<ExecutionVertex> it = set.iterator();
        while (it.hasNext()) {
            collectCacheEntriesToInvalidate(it.next(), hashMap);
        }
        for (Map.Entry entry : hashMap.entrySet()) {
            try {
                ((AbstractInstance) entry.getKey()).invalidateLookupCacheEntries((Set) entry.getValue());
            } catch (IOException e) {
                LOG.error(StringUtils.stringifyException(e));
                return false;
            }
        }
        return true;
    }

    private static void collectCacheEntriesToInvalidate(ExecutionVertex executionVertex, Map<AbstractInstance, Set<ChannelID>> map) {
        int numberOfOutputGates = executionVertex.getNumberOfOutputGates();
        for (int i = 0; i < numberOfOutputGates; i++) {
            ExecutionGate outputGate = executionVertex.getOutputGate(i);
            for (int i2 = 0; i2 < outputGate.getNumberOfEdges(); i2++) {
                ExecutionEdge edge = outputGate.getEdge(i2);
                ExecutionVertex vertex = edge.getInputGate().getVertex();
                if (vertex == null) {
                    LOG.error("Connected vertex is null");
                } else {
                    AbstractInstance allocatedResource = vertex.getAllocatedResource().getInstance();
                    if (!(allocatedResource instanceof DummyInstance)) {
                        Set<ChannelID> set = map.get(allocatedResource);
                        if (set == null) {
                            set = new SerializableHashSet();
                            map.put(allocatedResource, set);
                        }
                        set.add(edge.getInputChannelID());
                    }
                }
            }
        }
        for (int i3 = 0; i3 < executionVertex.getNumberOfInputGates(); i3++) {
            ExecutionGate inputGate = executionVertex.getInputGate(i3);
            for (int i4 = 0; i4 < inputGate.getNumberOfEdges(); i4++) {
                ExecutionEdge edge2 = inputGate.getEdge(i4);
                ExecutionVertex vertex2 = edge2.getOutputGate().getVertex();
                if (vertex2 == null) {
                    LOG.error("Connected vertex is null");
                } else {
                    AbstractInstance allocatedResource2 = vertex2.getAllocatedResource().getInstance();
                    if (!(allocatedResource2 instanceof DummyInstance)) {
                        Set<ChannelID> set2 = map.get(allocatedResource2);
                        if (set2 == null) {
                            set2 = new SerializableHashSet();
                            map.put(allocatedResource2, set2);
                        }
                        set2.add(edge2.getOutputChannelID());
                    }
                }
            }
        }
    }
}
