package eu.stratosphere.nephele.multicast;

import eu.stratosphere.configuration.GlobalConfiguration;
import eu.stratosphere.nephele.execution.ExecutionState;
import eu.stratosphere.nephele.executiongraph.ExecutionEdge;
import eu.stratosphere.nephele.executiongraph.ExecutionGate;
import eu.stratosphere.nephele.executiongraph.ExecutionGraph;
import eu.stratosphere.nephele.executiongraph.ExecutionVertex;
import eu.stratosphere.nephele.instance.InstanceConnectionInfo;
import eu.stratosphere.nephele.io.channels.ChannelID;
import eu.stratosphere.nephele.jobgraph.JobID;
import eu.stratosphere.nephele.jobmanager.JobManager;
import eu.stratosphere.nephele.jobmanager.scheduler.AbstractScheduler;
import eu.stratosphere.nephele.protocols.ChannelLookupProtocol;
import eu.stratosphere.nephele.taskmanager.bytebuffered.ConnectionInfoLookupResponse;
import java.io.BufferedReader;
import java.io.DataInputStream;
import java.io.FileInputStream;
import java.io.InputStreamReader;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

/* loaded from: input_file:eu/stratosphere/nephele/multicast/MulticastManager.class */
public final class MulticastManager implements ChannelLookupProtocol {
    private static final Log LOG = LogFactory.getLog(JobManager.class);
    private final AbstractScheduler scheduler;
    private final Map<ChannelID, MulticastForwardingTable> cachedTrees = new HashMap();
    private final boolean randomized = GlobalConfiguration.getBoolean("multicast.randomize", false);
    private final int treeBranching = GlobalConfiguration.getInteger("multicast.branching", 1);
    private final boolean useHardCodedTree = GlobalConfiguration.getBoolean("multicast.usehardcodedtree", false);
    private final String hardCodedTreeFilePath = GlobalConfiguration.getString("multicast.hardcodedtreefile", (String) null);

    public MulticastManager(AbstractScheduler abstractScheduler) {
        this.scheduler = abstractScheduler;
    }

    @Override // eu.stratosphere.nephele.protocols.ChannelLookupProtocol
    public synchronized ConnectionInfoLookupResponse lookupConnectionInfo(InstanceConnectionInfo instanceConnectionInfo, JobID jobID, ChannelID channelID) {
        if (LOG.isInfoEnabled()) {
            LOG.info("Receiving multicast receiver request from " + instanceConnectionInfo + " channel ID: " + channelID);
        }
        if (this.cachedTrees.containsKey(channelID)) {
            LOG.info("Replying with cached entry...");
            return this.cachedTrees.get(channelID).getConnectionInfo(instanceConnectionInfo);
        }
        if (!checkIfAllTargetVerticesReady(instanceConnectionInfo, jobID, channelID)) {
            LOG.info("Received multicast request but not all receivers ready.");
            return ConnectionInfoLookupResponse.createReceiverNotReady();
        }
        LinkedList<TreeNode> extractTreeNodes = extractTreeNodes(instanceConnectionInfo, jobID, channelID, this.randomized);
        if (!this.useHardCodedTree) {
            this.cachedTrees.put(channelID, createDefaultTree(extractTreeNodes, this.treeBranching));
            return this.cachedTrees.get(channelID).getConnectionInfo(instanceConnectionInfo);
        }
        LOG.info("Creating a hard-coded tree topology from file: " + this.hardCodedTreeFilePath);
        this.cachedTrees.put(channelID, createHardCodedTree(extractTreeNodes));
        return this.cachedTrees.get(channelID).getConnectionInfo(instanceConnectionInfo);
    }

    private TreeNode pollClosestNode(TreeNode treeNode, LinkedList<TreeNode> linkedList) {
        TreeNode closestNode = getClosestNode(treeNode, linkedList);
        linkedList.remove(closestNode);
        return closestNode;
    }

    private TreeNode getClosestNode(TreeNode treeNode, LinkedList<TreeNode> linkedList) {
        if (treeNode == null) {
            return linkedList.getFirst();
        }
        TreeNode treeNode2 = null;
        Iterator<TreeNode> it = linkedList.iterator();
        while (it.hasNext()) {
            TreeNode next = it.next();
            if (treeNode2 == null || next.getDistance(treeNode) < treeNode2.getDistance(treeNode)) {
                treeNode2 = next;
            }
        }
        return treeNode2;
    }

    private MulticastForwardingTable createDefaultTree(LinkedList<TreeNode> linkedList, int i) {
        LinkedList linkedList2 = new LinkedList();
        TreeNode pollFirst = linkedList.pollFirst();
        TreeNode treeNode = pollFirst;
        while (true) {
            TreeNode treeNode2 = treeNode;
            if (linkedList.size() <= 0) {
                LOG.info("created multicast tree with following topology:\n" + pollFirst.printTree());
                return pollFirst.createForwardingTable();
            }
            for (int i2 = 0; i2 < i && linkedList.size() > 0; i2++) {
                TreeNode pollClosestNode = pollClosestNode(treeNode2, linkedList);
                treeNode2.addChild(pollClosestNode);
                linkedList2.add(pollClosestNode);
            }
            treeNode = (TreeNode) linkedList2.pollFirst();
        }
    }

    private MulticastForwardingTable createHardCodedTree(LinkedList<TreeNode> linkedList) {
        try {
            BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(new DataInputStream(new FileInputStream(this.hardCodedTreeFilePath))));
            while (true) {
                String readLine = bufferedReader.readLine();
                if (readLine == null) {
                    bufferedReader.close();
                    return linkedList.getFirst().createForwardingTable();
                }
                String[] split = readLine.split(" ");
                String str = split[0];
                Iterator<TreeNode> it = linkedList.iterator();
                while (it.hasNext()) {
                    TreeNode next = it.next();
                    if (next.toString().equals(str)) {
                        for (int i = 1; i < split.length; i++) {
                            Iterator<TreeNode> it2 = linkedList.iterator();
                            while (it2.hasNext()) {
                                TreeNode next2 = it2.next();
                                if (next2.toString().equals(split[i])) {
                                    next.addChild(next2);
                                }
                            }
                        }
                    }
                }
            }
        } catch (Exception e) {
            System.out.println("Error reading hard-coded topology file for multicast tree: " + e.getMessage());
            return null;
        }
    }

    private boolean checkIfAllTargetVerticesReady(InstanceConnectionInfo instanceConnectionInfo, JobID jobID, ChannelID channelID) {
        ExecutionGate outputGate = this.scheduler.getExecutionGraphByID(jobID).getEdgeByID(channelID).getOutputGate();
        ArrayList arrayList = null;
        int numberOfEdges = outputGate.getNumberOfEdges();
        for (int i = 0; i < numberOfEdges; i++) {
            ExecutionEdge edge = outputGate.getEdge(i);
            if (edge.isBroadcast()) {
                ExecutionVertex vertex = edge.getInputGate().getVertex();
                if (vertex.getExecutionState() == ExecutionState.ASSIGNED) {
                    if (arrayList == null) {
                        arrayList = new ArrayList();
                    }
                    arrayList.add(vertex);
                } else if (vertex.getExecutionState() != ExecutionState.RUNNING && vertex.getExecutionState() != ExecutionState.FINISHING) {
                    return false;
                }
            }
        }
        if (arrayList == null) {
            return true;
        }
        this.scheduler.deployAssignedVertices(arrayList);
        return false;
    }

    private LinkedList<TreeNode> extractTreeNodes(InstanceConnectionInfo instanceConnectionInfo, JobID jobID, ChannelID channelID, boolean z) {
        ExecutionGraph executionGraphByID = this.scheduler.getExecutionGraphByID(jobID);
        ExecutionGate outputGate = executionGraphByID.getEdgeByID(channelID).getOutputGate();
        LinkedList linkedList = new LinkedList();
        int numberOfEdges = outputGate.getNumberOfEdges();
        for (int i = 0; i < numberOfEdges; i++) {
            ExecutionEdge edge = outputGate.getEdge(i);
            if (edge.isBroadcast()) {
                linkedList.add(edge);
            }
        }
        LinkedList<TreeNode> linkedList2 = new LinkedList<>();
        LinkedList linkedList3 = new LinkedList();
        int i2 = 0;
        Iterator it = linkedList.iterator();
        while (it.hasNext()) {
            ExecutionEdge executionEdge = (ExecutionEdge) it.next();
            i2 = executionEdge.getConnectionID();
            if (executionEdge.getInputGate().getVertex().getAllocatedResource().getInstance().getInstanceConnectionInfo().equals(instanceConnectionInfo)) {
                linkedList3.add(executionEdge.getInputChannelID());
                it.remove();
            }
        }
        linkedList2.add(new TreeNode(executionGraphByID.getVertexByChannelID(channelID).getAllocatedResource().getInstance(), instanceConnectionInfo, i2, linkedList3));
        LinkedList linkedList4 = new LinkedList();
        while (linkedList.size() > 0) {
            ExecutionEdge executionEdge2 = (ExecutionEdge) linkedList.pollFirst();
            int connectionID = executionEdge2.getConnectionID();
            ExecutionVertex vertex = executionEdge2.getInputGate().getVertex();
            InstanceConnectionInfo instanceConnectionInfo2 = vertex.getAllocatedResource().getInstance().getInstanceConnectionInfo();
            LinkedList linkedList5 = new LinkedList();
            linkedList5.add(executionEdge2.getInputChannelID());
            Iterator it2 = linkedList.iterator();
            while (it2.hasNext()) {
                ExecutionEdge executionEdge3 = (ExecutionEdge) it2.next();
                if (executionEdge3.getInputGate().getVertex().getAllocatedResource().getInstance().getInstanceConnectionInfo().equals(instanceConnectionInfo2)) {
                    linkedList5.add(executionEdge3.getInputChannelID());
                    it2.remove();
                }
            }
            linkedList4.add(new TreeNode(vertex.getAllocatedResource().getInstance(), instanceConnectionInfo2, connectionID, linkedList5));
        }
        if (z) {
            Collections.shuffle(linkedList4);
        } else {
            Collections.sort(linkedList4);
        }
        linkedList2.addAll(linkedList4);
        return linkedList2;
    }
}
