package eu.stratosphere.nephele.instance.cluster;

import eu.stratosphere.configuration.Configuration;
import eu.stratosphere.configuration.GlobalConfiguration;
import eu.stratosphere.nephele.instance.AbstractInstance;
import eu.stratosphere.nephele.instance.AllocatedResource;
import eu.stratosphere.nephele.instance.HardwareDescription;
import eu.stratosphere.nephele.instance.HardwareDescriptionFactory;
import eu.stratosphere.nephele.instance.InstanceConnectionInfo;
import eu.stratosphere.nephele.instance.InstanceException;
import eu.stratosphere.nephele.instance.InstanceListener;
import eu.stratosphere.nephele.instance.InstanceManager;
import eu.stratosphere.nephele.instance.InstanceRequestMap;
import eu.stratosphere.nephele.instance.InstanceType;
import eu.stratosphere.nephele.instance.InstanceTypeDescription;
import eu.stratosphere.nephele.instance.InstanceTypeDescriptionFactory;
import eu.stratosphere.nephele.instance.InstanceTypeFactory;
import eu.stratosphere.nephele.jobgraph.JobID;
import eu.stratosphere.nephele.topology.NetworkNode;
import eu.stratosphere.nephele.topology.NetworkTopology;
import eu.stratosphere.nephele.util.SerializableHashMap;
import java.net.InetAddress;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Timer;
import java.util.TimerTask;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

/* loaded from: input_file:eu/stratosphere/nephele/instance/cluster/ClusterManager.class */
public class ClusterManager implements InstanceManager {
    private static final Log LOG = LogFactory.getLog(ClusterManager.class);
    private static final int DEFAULT_CLEANUP_INTERVAL = 120;
    private static final String CLEANUP_INTERVAL_KEY = "instancemanager.cluster.cleanupinterval";
    private final long cleanUpInterval;
    private final NetworkTopology networkTopology;
    private InstanceListener instanceListener;
    private boolean shutdown;
    private final Object lock = new Object();
    private final Map<InetAddress, InstanceType> ipToInstanceTypeMapping = new HashMap();
    private final Map<JobID, PendingRequestsMap> pendingRequestsOfJob = new LinkedHashMap();
    private final TimerTask cleanupStaleMachines = new TimerTask() { // from class: eu.stratosphere.nephele.instance.cluster.ClusterManager.1
        @Override // java.util.TimerTask, java.lang.Runnable
        public void run() {
            synchronized (ClusterManager.this.lock) {
                ArrayList arrayList = new ArrayList();
                HashMap hashMap = new HashMap();
                for (Map.Entry entry : ClusterManager.this.registeredHosts.entrySet()) {
                    ClusterInstance clusterInstance = (ClusterInstance) entry.getValue();
                    if (!clusterInstance.isStillAlive(ClusterManager.this.cleanUpInterval)) {
                        for (AllocatedSlice allocatedSlice : clusterInstance.removeAllAllocatedSlices()) {
                            JobID jobID = allocatedSlice.getJobID();
                            List list = (List) ClusterManager.this.slicesOfJobs.get(jobID);
                            if (list == null) {
                                ClusterManager.LOG.error("Cannot find allocated slices for job with ID + " + jobID);
                            } else {
                                list.remove(allocatedSlice);
                                if (list.isEmpty()) {
                                    ClusterManager.this.slicesOfJobs.remove(jobID);
                                }
                                List list2 = (List) hashMap.get(allocatedSlice.getJobID());
                                if (list2 == null) {
                                    list2 = new ArrayList();
                                    hashMap.put(allocatedSlice.getJobID(), list2);
                                }
                                list2.add(new AllocatedResource(allocatedSlice.getHostingInstance(), allocatedSlice.getType(), allocatedSlice.getAllocationID()));
                            }
                        }
                        arrayList.add(entry);
                    }
                }
                ClusterManager.this.registeredHosts.entrySet().removeAll(arrayList);
                ClusterManager.this.updateInstaceTypeDescriptionMap();
                for (Map.Entry entry2 : hashMap.entrySet()) {
                    if (ClusterManager.this.instanceListener != null) {
                        ClusterManager.this.instanceListener.allocatedResourcesDied((JobID) entry2.getKey(), (List) entry2.getValue());
                    }
                }
            }
        }
    };
    private final Map<InstanceConnectionInfo, ClusterInstance> registeredHosts = new HashMap();
    private final Map<JobID, List<AllocatedSlice>> slicesOfJobs = new HashMap();
    private final InstanceType defaultInstanceType = InstanceTypeFactory.constructFromDescription("default,1,1,1,1,0");
    private final InstanceType[] availableInstanceTypes = {this.defaultInstanceType};
    private final int[][] instanceAccommodationMatrix = calculateInstanceAccommodationMatrix();
    private final Map<InstanceType, InstanceTypeDescription> instanceTypeDescriptionMap = new SerializableHashMap();

    public ClusterManager() {
        long integer = GlobalConfiguration.getInteger(CLEANUP_INTERVAL_KEY, DEFAULT_CLEANUP_INTERVAL) * 1000;
        if (integer < 10) {
            LOG.warn("Invalid clean up interval. Reverting to default cleanup interval of 120 secs.");
            integer = 120;
        }
        this.cleanUpInterval = integer;
        sortAvailableInstancesByNumberOfCPUCores();
        this.networkTopology = NetworkTopology.createEmptyTopology();
        new Timer(true).schedule(this.cleanupStaleMachines, 1000L, 1000L);
        updateInstaceTypeDescriptionMap();
    }

    private void sortAvailableInstancesByNumberOfCPUCores() {
        if (this.availableInstanceTypes.length < 2) {
            return;
        }
        for (int i = 1; i < this.availableInstanceTypes.length; i++) {
            InstanceType instanceType = this.availableInstanceTypes[i];
            int i2 = i;
            while (i2 > 0 && this.availableInstanceTypes[i2 - 1].getNumberOfCores() < instanceType.getNumberOfCores()) {
                this.availableInstanceTypes[i2] = this.availableInstanceTypes[i2 - 1];
                i2--;
            }
            this.availableInstanceTypes[i2] = instanceType;
        }
    }

    @Override // eu.stratosphere.nephele.instance.InstanceManager
    public void shutdown() {
        synchronized (this.lock) {
            if (this.shutdown) {
                return;
            }
            this.cleanupStaleMachines.cancel();
            Iterator<ClusterInstance> it = this.registeredHosts.values().iterator();
            while (it.hasNext()) {
                it.next().destroyProxies();
            }
            this.registeredHosts.clear();
            this.shutdown = true;
        }
    }

    @Override // eu.stratosphere.nephele.instance.InstanceManager
    public InstanceType getDefaultInstanceType() {
        return this.defaultInstanceType;
    }

    @Override // eu.stratosphere.nephele.instance.InstanceManager
    public InstanceType getInstanceTypeByName(String str) {
        synchronized (this.lock) {
            for (InstanceType instanceType : this.availableInstanceTypes) {
                if (instanceType.getIdentifier().equals(str)) {
                    return instanceType;
                }
            }
            return null;
        }
    }

    @Override // eu.stratosphere.nephele.instance.InstanceManager
    public InstanceType getSuitableInstanceType(int i, int i2, int i3, int i4, int i5) {
        synchronized (this.lock) {
            for (InstanceType instanceType : this.availableInstanceTypes) {
                if (instanceType.getNumberOfComputeUnits() >= i && instanceType.getNumberOfCores() >= i2 && instanceType.getMemorySize() >= i3 && instanceType.getDiskCapacity() >= i4 && instanceType.getPricePerHour() <= i5) {
                    return instanceType;
                }
            }
            return null;
        }
    }

    @Override // eu.stratosphere.nephele.instance.InstanceManager
    public void releaseAllocatedResource(JobID jobID, Configuration configuration, AllocatedResource allocatedResource) throws InstanceException {
        synchronized (this.lock) {
            AllocatedSlice removeAllocatedSlice = ((ClusterInstance) allocatedResource.getInstance()).removeAllocatedSlice(allocatedResource.getAllocationID());
            List<AllocatedSlice> list = this.slicesOfJobs.get(jobID);
            if (list == null) {
                LOG.error("Cannot find allocated slice to release allocated slice for job " + jobID);
                return;
            }
            list.remove(removeAllocatedSlice);
            if (list.isEmpty()) {
                this.slicesOfJobs.remove(jobID);
            }
            checkPendingRequests();
        }
    }

    private ClusterInstance createNewHost(InstanceConnectionInfo instanceConnectionInfo, HardwareDescription hardwareDescription) {
        NetworkNode nodeByName;
        int lastIndexOf;
        InstanceType instanceType = this.ipToInstanceTypeMapping.get(instanceConnectionInfo.address());
        if (instanceType != null) {
            LOG.info("Found user-defined instance type for cluster instance with IP " + instanceConnectionInfo.address() + ": " + instanceType);
        } else {
            instanceType = matchHardwareDescriptionWithInstanceType(hardwareDescription);
            if (instanceType == null) {
                LOG.error("No matching instance type, cannot create cluster instance");
                return null;
            }
            LOG.info("Hardware profile of cluster instance with IP " + instanceConnectionInfo.address() + " matches with instance type " + instanceType);
        }
        String hostname = instanceConnectionInfo.hostname();
        NetworkNode rootNode = this.networkTopology.getRootNode();
        while (true) {
            nodeByName = this.networkTopology.getNodeByName(hostname);
            if (nodeByName == null && (lastIndexOf = hostname.lastIndexOf(46)) != -1) {
                hostname = hostname.substring(0, lastIndexOf);
            }
        }
        if (nodeByName == null) {
            nodeByName = this.networkTopology.getNodeByName(instanceConnectionInfo.address().toString().replaceAll("/", ""));
        }
        if (nodeByName != null) {
            if (nodeByName.getParentNode() != null) {
                rootNode = nodeByName.getParentNode();
            }
            nodeByName.remove();
        }
        LOG.info("Creating instance of type " + instanceType + " for " + instanceConnectionInfo + ", parent is " + rootNode.getName());
        return new ClusterInstance(instanceConnectionInfo, instanceType, rootNode, this.networkTopology, hardwareDescription);
    }

    private InstanceType matchHardwareDescriptionWithInstanceType(HardwareDescription hardwareDescription) {
        for (int i = 0; i < this.availableInstanceTypes.length; i++) {
            InstanceType instanceType = this.availableInstanceTypes[i];
            if (instanceType.getNumberOfCores() <= hardwareDescription.getNumberOfCPUCores()) {
                if (instanceType.getMemorySize() <= ((int) (hardwareDescription.getSizeOfPhysicalMemory() / 1048576))) {
                    return instanceType;
                }
            }
        }
        LOG.error("Cannot find matching instance type for hardware description (" + hardwareDescription.getNumberOfCPUCores() + " cores, " + hardwareDescription.getSizeOfPhysicalMemory() + " bytes of memory)");
        return null;
    }

    @Override // eu.stratosphere.nephele.instance.InstanceManager
    public void reportHeartBeat(InstanceConnectionInfo instanceConnectionInfo, HardwareDescription hardwareDescription) {
        synchronized (this.lock) {
            ClusterInstance clusterInstance = this.registeredHosts.get(instanceConnectionInfo);
            if (clusterInstance == null) {
                clusterInstance = createNewHost(instanceConnectionInfo, hardwareDescription);
                if (clusterInstance == null) {
                    LOG.error("Could not create a new host object for incoming heart-beat. Probably the configuration file is lacking some entries.");
                    return;
                }
                this.registeredHosts.put(instanceConnectionInfo, clusterInstance);
                LOG.info("New number of registered hosts is " + this.registeredHosts.size());
                updateInstaceTypeDescriptionMap();
                checkPendingRequests();
            }
            clusterInstance.reportHeartBeat();
        }
    }

    private void checkPendingRequests() {
        for (Map.Entry<JobID, PendingRequestsMap> entry : this.pendingRequestsOfJob.entrySet()) {
            ArrayList arrayList = new ArrayList();
            JobID key = entry.getKey();
            PendingRequestsMap value = entry.getValue();
            Iterator<Map.Entry<InstanceType, Integer>> it = value.iterator();
            while (it.hasNext()) {
                Map.Entry<InstanceType, Integer> next = it.next();
                InstanceType key2 = next.getKey();
                int intValue = next.getValue().intValue();
                if (intValue <= 0) {
                    LOG.error("Inconsistency: Job " + key + " has " + intValue + " requests for instance type " + key2.getIdentifier());
                } else {
                    while (intValue > 0) {
                        if (LOG.isDebugEnabled()) {
                            LOG.debug("Trying to allocate instance of type " + key2.getIdentifier());
                        }
                        AllocatedSlice sliceOfType = getSliceOfType(key, key2);
                        if (sliceOfType == null) {
                            break;
                        }
                        LOG.info("Allocated instance of type " + key2.getIdentifier() + " as a result of pending request for job " + key);
                        intValue--;
                        value.decreaseNumberOfPendingInstances(key2);
                        List<AllocatedSlice> list = this.slicesOfJobs.get(key);
                        if (list == null) {
                            list = new ArrayList();
                            this.slicesOfJobs.put(key, list);
                        }
                        list.add(sliceOfType);
                        arrayList.add(new AllocatedResource(sliceOfType.getHostingInstance(), sliceOfType.getType(), sliceOfType.getAllocationID()));
                    }
                }
            }
            if (!arrayList.isEmpty() && this.instanceListener != null) {
                new ClusterInstanceNotifier(this.instanceListener, key, arrayList).start();
            }
        }
    }

    private AllocatedSlice getSliceOfType(JobID jobID, InstanceType instanceType) {
        AllocatedSlice allocatedSlice = null;
        for (ClusterInstance clusterInstance : this.registeredHosts.values()) {
            if (clusterInstance.getType().equals(instanceType)) {
                allocatedSlice = clusterInstance.createSlice(instanceType, jobID);
                if (allocatedSlice != null) {
                    break;
                }
            }
        }
        if (allocatedSlice == null) {
            Iterator<ClusterInstance> it = this.registeredHosts.values().iterator();
            while (it.hasNext()) {
                allocatedSlice = it.next().createSlice(instanceType, jobID);
                if (allocatedSlice != null) {
                    break;
                }
            }
        }
        return allocatedSlice;
    }

    @Override // eu.stratosphere.nephele.instance.InstanceManager
    public void requestInstance(JobID jobID, Configuration configuration, InstanceRequestMap instanceRequestMap, List<String> list) throws InstanceException {
        ArrayList<AllocatedSlice> arrayList = new ArrayList();
        HashMap hashMap = new HashMap();
        synchronized (this.lock) {
            Iterator<Map.Entry<InstanceType, Integer>> maximumIterator = instanceRequestMap.getMaximumIterator();
            while (maximumIterator.hasNext()) {
                Map.Entry<InstanceType, Integer> next = maximumIterator.next();
                int intValue = next.getValue().intValue();
                int i = 0;
                while (true) {
                    if (i < intValue) {
                        LOG.info("Trying to allocate instance of type " + next.getKey().getIdentifier());
                        AllocatedSlice sliceOfType = getSliceOfType(jobID, next.getKey());
                        if (sliceOfType != null) {
                            arrayList.add(sliceOfType);
                            i++;
                        } else {
                            if (i < instanceRequestMap.getMinimumNumberOfInstances(next.getKey())) {
                                for (AllocatedSlice allocatedSlice : arrayList) {
                                    allocatedSlice.getHostingInstance().removeAllocatedSlice(allocatedSlice.getAllocationID());
                                }
                                throw new InstanceException("Could not find a suitable instance");
                            }
                            int i2 = intValue - i;
                            if (i2 > 0) {
                                Integer num = (Integer) hashMap.get(next.getKey());
                                if (num == null) {
                                    num = 0;
                                }
                                hashMap.put(next.getKey(), Integer.valueOf(num.intValue() + i2));
                            }
                        }
                    }
                }
            }
            List<AllocatedSlice> list2 = this.slicesOfJobs.get(jobID);
            if (list2 == null) {
                list2 = new ArrayList();
                this.slicesOfJobs.put(jobID, list2);
            }
            list2.addAll(arrayList);
            PendingRequestsMap pendingRequestsMap = this.pendingRequestsOfJob.get(jobID);
            if (pendingRequestsMap == null) {
                pendingRequestsMap = new PendingRequestsMap();
                this.pendingRequestsOfJob.put(jobID, pendingRequestsMap);
            }
            for (Map.Entry entry : hashMap.entrySet()) {
                pendingRequestsMap.addRequest((InstanceType) entry.getKey(), ((Integer) entry.getValue()).intValue());
            }
            ArrayList arrayList2 = new ArrayList();
            for (AllocatedSlice allocatedSlice2 : arrayList) {
                arrayList2.add(new AllocatedResource(allocatedSlice2.getHostingInstance(), allocatedSlice2.getType(), allocatedSlice2.getAllocationID()));
            }
            if (this.instanceListener != null) {
                new ClusterInstanceNotifier(this.instanceListener, jobID, arrayList2).start();
            }
        }
    }

    @Override // eu.stratosphere.nephele.instance.InstanceManager
    public NetworkTopology getNetworkTopology(JobID jobID) {
        return this.networkTopology;
    }

    @Override // eu.stratosphere.nephele.instance.InstanceManager
    public void setInstanceListener(InstanceListener instanceListener) {
        synchronized (this.lock) {
            this.instanceListener = instanceListener;
        }
    }

    @Override // eu.stratosphere.nephele.instance.InstanceManager
    public Map<InstanceType, InstanceTypeDescription> getMapOfAvailableInstanceTypes() {
        SerializableHashMap serializableHashMap = new SerializableHashMap();
        synchronized (this.lock) {
            serializableHashMap.putAll(this.instanceTypeDescriptionMap);
        }
        return serializableHashMap;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void updateInstaceTypeDescriptionMap() {
        this.instanceTypeDescriptionMap.clear();
        ArrayList<InstanceTypeDescription> arrayList = new ArrayList();
        int[] iArr = new int[this.availableInstanceTypes.length];
        for (int i = 0; i < iArr.length; i++) {
            iArr[i] = 0;
        }
        for (int i2 = 0; i2 < this.availableInstanceTypes.length; i2++) {
            InstanceType instanceType = this.availableInstanceTypes[i2];
            int i3 = 0;
            int i4 = Integer.MAX_VALUE;
            long j = Long.MAX_VALUE;
            long j2 = Long.MAX_VALUE;
            for (ClusterInstance clusterInstance : this.registeredHosts.values()) {
                if (clusterInstance.getType().equals(instanceType)) {
                    i3++;
                    HardwareDescription hardwareDescription = clusterInstance.getHardwareDescription();
                    i4 = Math.min(i4, hardwareDescription.getNumberOfCPUCores());
                    j = Math.min(j, hardwareDescription.getSizeOfPhysicalMemory());
                    j2 = Math.min(j2, hardwareDescription.getSizeOfFreeMemory());
                }
            }
            int i5 = -1;
            int i6 = -1;
            for (int i7 = 0; i7 < this.availableInstanceTypes.length; i7++) {
                int canBeAccommodated = canBeAccommodated(i7, i2);
                if (canBeAccommodated > 0) {
                    int i8 = i7;
                    iArr[i8] = iArr[i8] + (i3 * canBeAccommodated);
                    if (canBeAccommodated > i5) {
                        i5 = canBeAccommodated;
                        i6 = i7;
                    }
                }
            }
            HardwareDescription hardwareDescription2 = null;
            if (i4 < Integer.MAX_VALUE && j < Long.MAX_VALUE && j2 < Long.MAX_VALUE) {
                hardwareDescription2 = HardwareDescriptionFactory.construct(i4, j, j2);
            } else if (i6 < i2) {
                InstanceTypeDescription instanceTypeDescription = (InstanceTypeDescription) arrayList.get(i6);
                if (instanceTypeDescription.getHardwareDescription() != null) {
                    HardwareDescription hardwareDescription3 = instanceTypeDescription.getHardwareDescription();
                    hardwareDescription2 = HardwareDescriptionFactory.construct(hardwareDescription3.getNumberOfCPUCores() / i5, hardwareDescription3.getSizeOfPhysicalMemory() / i5, hardwareDescription3.getSizeOfFreeMemory() / i5);
                }
            }
            arrayList.add(InstanceTypeDescriptionFactory.construct(instanceType, hardwareDescription2, iArr[i2]));
        }
        for (InstanceTypeDescription instanceTypeDescription2 : arrayList) {
            this.instanceTypeDescriptionMap.put(instanceTypeDescription2.getInstanceType(), instanceTypeDescription2);
        }
    }

    private int[][] calculateInstanceAccommodationMatrix() {
        if (this.availableInstanceTypes == null) {
            LOG.error("Cannot compute instance accommodation matrix: availableInstanceTypes is null");
            return (int[][]) null;
        }
        int length = this.availableInstanceTypes.length;
        int[][] iArr = new int[length][length];
        for (int i = 0; i < length; i++) {
            for (int i2 = 0; i2 < length; i2++) {
                if (i == i2) {
                    iArr[i][i2] = 1;
                } else {
                    InstanceType instanceType = this.availableInstanceTypes[i];
                    InstanceType instanceType2 = this.availableInstanceTypes[i2];
                    iArr[i][i2] = Math.min(instanceType2.getNumberOfCores() / instanceType.getNumberOfCores(), Math.min(instanceType2.getNumberOfComputeUnits() / instanceType.getNumberOfComputeUnits(), Math.min(instanceType2.getMemorySize() / instanceType.getMemorySize(), instanceType2.getDiskCapacity() / instanceType.getDiskCapacity())));
                }
            }
        }
        return iArr;
    }

    private int canBeAccommodated(int i, int i2) {
        if (i < this.availableInstanceTypes.length && i2 < this.availableInstanceTypes.length) {
            return this.instanceAccommodationMatrix[i2][i];
        }
        LOG.error("Cannot determine number of instance accomodations: invalid index");
        return 0;
    }

    @Override // eu.stratosphere.nephele.instance.InstanceManager
    public AbstractInstance getInstanceByName(String str) {
        if (str == null) {
            throw new IllegalArgumentException("Argument name must not be null");
        }
        synchronized (this.lock) {
            for (ClusterInstance clusterInstance : this.registeredHosts.values()) {
                if (str.equals(clusterInstance.getName())) {
                    return clusterInstance;
                }
            }
            return null;
        }
    }

    @Override // eu.stratosphere.nephele.instance.InstanceManager
    public void cancelPendingRequests(JobID jobID) {
        synchronized (this.lock) {
            this.pendingRequestsOfJob.remove(jobID);
        }
    }

    @Override // eu.stratosphere.nephele.instance.InstanceManager
    public int getNumberOfTaskTrackers() {
        return this.registeredHosts.size();
    }
}
