package eu.stratosphere.nephele.taskmanager.bytebuffered;

import eu.stratosphere.configuration.GlobalConfiguration;
import eu.stratosphere.nephele.execution.Environment;
import eu.stratosphere.nephele.executiongraph.ExecutionVertexID;
import eu.stratosphere.nephele.instance.InstanceConnectionInfo;
import eu.stratosphere.nephele.io.AbstractID;
import eu.stratosphere.nephele.io.GateID;
import eu.stratosphere.nephele.io.channels.Buffer;
import eu.stratosphere.nephele.io.channels.ChannelID;
import eu.stratosphere.nephele.io.channels.ChannelType;
import eu.stratosphere.nephele.jobgraph.JobID;
import eu.stratosphere.nephele.protocols.ChannelLookupProtocol;
import eu.stratosphere.nephele.taskmanager.Task;
import eu.stratosphere.nephele.taskmanager.bufferprovider.BufferProvider;
import eu.stratosphere.nephele.taskmanager.bufferprovider.BufferProviderBroker;
import eu.stratosphere.nephele.taskmanager.bufferprovider.GlobalBufferPool;
import eu.stratosphere.nephele.taskmanager.bufferprovider.LocalBufferPool;
import eu.stratosphere.nephele.taskmanager.bufferprovider.LocalBufferPoolOwner;
import eu.stratosphere.nephele.taskmanager.transferenvelope.TransferEnvelope;
import eu.stratosphere.nephele.taskmanager.transferenvelope.TransferEnvelopeDispatcher;
import eu.stratosphere.nephele.taskmanager.transferenvelope.TransferEnvelopeReceiverList;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

/* loaded from: input_file:eu/stratosphere/nephele/taskmanager/bytebuffered/ByteBufferedChannelManager.class */
public final class ByteBufferedChannelManager implements TransferEnvelopeDispatcher, BufferProviderBroker {
    private static final Log LOG = LogFactory.getLog(ByteBufferedChannelManager.class);
    private static final boolean DEFAULT_ALLOW_SENDER_SIDE_SPILLING = false;
    private static final boolean DEFAULT_MERGE_SPILLED_BUFFERS = true;
    private static final int NUMBER_OF_CHANNELS_FOR_MULTICAST = 10;
    private final NetworkConnectionManager networkConnectionManager;
    private final ChannelLookupProtocol channelLookupService;
    private final InstanceConnectionInfo localConnectionInfo;
    private final LocalBufferPool transitBufferPool;
    private final boolean allowSenderSideSpilling;
    private final boolean mergeSpilledBuffers;
    private final Map<ChannelID, ChannelContext> registeredChannels = new ConcurrentHashMap();
    private final Map<AbstractID, LocalBufferPoolOwner> localBufferPoolOwner = new ConcurrentHashMap();
    private final boolean multicastEnabled = true;
    private final Map<ChannelID, TransferEnvelopeReceiverList> receiverCache = new ConcurrentHashMap();

    public ByteBufferedChannelManager(ChannelLookupProtocol channelLookupProtocol, InstanceConnectionInfo instanceConnectionInfo) throws IOException {
        this.channelLookupService = channelLookupProtocol;
        this.localConnectionInfo = instanceConnectionInfo;
        GlobalBufferPool.getInstance();
        this.transitBufferPool = new LocalBufferPool(128, true);
        this.networkConnectionManager = new NetworkConnectionManager(this, instanceConnectionInfo.getAddress(), instanceConnectionInfo.getDataPort());
        this.allowSenderSideSpilling = GlobalConfiguration.getBoolean("channel.network.allowSenderSideSpilling", false);
        this.mergeSpilledBuffers = GlobalConfiguration.getBoolean("channel.network.mergeSpilledBuffers", true);
        LOG.info("Initialized byte buffered channel manager with sender-side spilling " + (this.allowSenderSideSpilling ? "enabled" : "disabled") + (this.mergeSpilledBuffers ? " and spilled buffer merging enabled" : ""));
    }

    public void register(Task task, Set<ChannelID> set) throws InsufficientResourcesException {
        checkBufferAvailability(task);
        Environment environment = task.getEnvironment();
        TaskContext createTaskContext = task.createTaskContext(this, this.localBufferPoolOwner.remove(task.getVertexID()));
        for (GateID gateID : environment.getOutputGateIDs()) {
            OutputGateContext createOutputGateContext = createTaskContext.createOutputGateContext(gateID);
            for (ChannelID channelID : environment.getOutputChannelIDsOfGate(gateID)) {
                OutputChannelContext createOutputChannelContext = createOutputGateContext.createOutputChannelContext(channelID, (OutputChannelContext) this.registeredChannels.get(channelID), true, this.mergeSpilledBuffers);
                if (createOutputChannelContext.getType() == ChannelType.INMEMORY) {
                    addReceiverListHint(createOutputChannelContext.getChannelID(), createOutputChannelContext.getConnectedChannelID());
                }
                if (createOutputChannelContext.getType() == ChannelType.NETWORK) {
                    addReceiverListHint(createOutputChannelContext.getConnectedChannelID(), createOutputChannelContext.getChannelID());
                }
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Registering byte buffered output channel " + createOutputChannelContext.getChannelID() + " (active)");
                }
                this.registeredChannels.put(createOutputChannelContext.getChannelID(), createOutputChannelContext);
            }
        }
        for (GateID gateID2 : environment.getInputGateIDs()) {
            InputGateContext createInputGateContext = createTaskContext.createInputGateContext(gateID2);
            for (ChannelID channelID2 : environment.getInputChannelIDsOfGate(gateID2)) {
                InputChannelContext createInputChannelContext = createInputGateContext.createInputChannelContext(channelID2, (InputChannelContext) this.registeredChannels.get(channelID2));
                if (createInputChannelContext.getType() == ChannelType.INMEMORY) {
                    addReceiverListHint(createInputChannelContext.getChannelID(), createInputChannelContext.getConnectedChannelID());
                }
                this.registeredChannels.put(createInputChannelContext.getChannelID(), createInputChannelContext);
            }
            LocalBufferPoolOwner localBufferPoolOwner = createInputGateContext.getLocalBufferPoolOwner();
            if (localBufferPoolOwner != null) {
                this.localBufferPoolOwner.put(createInputGateContext.getGateID(), localBufferPoolOwner);
            }
        }
        this.localBufferPoolOwner.put(task.getVertexID(), createTaskContext);
        redistributeGlobalBuffers();
    }

    public void unregister(ExecutionVertexID executionVertexID, Task task) {
        Environment environment = task.getEnvironment();
        for (ChannelID channelID : environment.getOutputChannelIDs()) {
            ChannelContext remove = this.registeredChannels.remove(channelID);
            if (remove != null) {
                remove.destroy();
            }
            this.receiverCache.remove(channelID);
        }
        for (ChannelID channelID2 : environment.getInputChannelIDs()) {
            ChannelContext remove2 = this.registeredChannels.remove(channelID2);
            if (remove2 != null) {
                remove2.destroy();
            }
            this.receiverCache.remove(channelID2);
        }
        Iterator<GateID> it = environment.getInputGateIDs().iterator();
        while (it.hasNext()) {
            LocalBufferPoolOwner remove3 = this.localBufferPoolOwner.remove(it.next());
            if (remove3 != null) {
                remove3.clearLocalBufferPool();
            }
        }
        LocalBufferPoolOwner remove4 = this.localBufferPoolOwner.remove(executionVertexID);
        if (remove4 != null) {
            remove4.clearLocalBufferPool();
        }
        redistributeGlobalBuffers();
    }

    public void shutdown() {
        this.networkConnectionManager.shutDown();
    }

    public NetworkConnectionManager getNetworkConnectionManager() {
        return this.networkConnectionManager;
    }

    private void recycleBuffer(TransferEnvelope transferEnvelope) {
        Buffer buffer = transferEnvelope.getBuffer();
        if (buffer != null) {
            buffer.recycleBuffer();
        }
    }

    private void sendReceiverNotFoundEvent(TransferEnvelope transferEnvelope, ChannelID channelID) throws IOException, InterruptedException {
        if (ReceiverNotFoundEvent.isReceiverNotFoundEvent(transferEnvelope)) {
            LOG.info("Dropping request to send ReceiverNotFoundEvent as response to ReceiverNotFoundEvent");
            return;
        }
        JobID jobID = transferEnvelope.getJobID();
        TransferEnvelope createEnvelopeWithEvent = ReceiverNotFoundEvent.createEnvelopeWithEvent(jobID, channelID, transferEnvelope.getSequenceNumber());
        TransferEnvelopeReceiverList receiverList = getReceiverList(jobID, channelID);
        if (receiverList == null) {
            return;
        }
        processEnvelopeEnvelopeWithoutBuffer(createEnvelopeWithEvent, receiverList);
    }

    private void processEnvelope(TransferEnvelope transferEnvelope, boolean z) throws IOException, InterruptedException {
        try {
            TransferEnvelopeReceiverList receiverList = getReceiverList(transferEnvelope.getJobID(), transferEnvelope.getSource());
            if (receiverList == null) {
                recycleBuffer(transferEnvelope);
            } else if (transferEnvelope.getBuffer() == null) {
                processEnvelopeEnvelopeWithoutBuffer(transferEnvelope, receiverList);
            } else {
                processEnvelopeWithBuffer(transferEnvelope, receiverList, z);
            }
        } catch (IOException e) {
            recycleBuffer(transferEnvelope);
            throw e;
        } catch (InterruptedException e2) {
            recycleBuffer(transferEnvelope);
            throw e2;
        }
    }

    private void processEnvelopeWithBuffer(TransferEnvelope transferEnvelope, TransferEnvelopeReceiverList transferEnvelopeReceiverList, boolean z) throws IOException, InterruptedException {
        if (!z) {
            List<ChannelID> localReceivers = transferEnvelopeReceiverList.getLocalReceivers();
            if (localReceivers.size() != DEFAULT_MERGE_SPILLED_BUFFERS) {
                LOG.error("Expected receiver list to have exactly one element");
            }
            ChannelID channelID = localReceivers.get(DEFAULT_ALLOW_SENDER_SIDE_SPILLING);
            ChannelContext channelContext = this.registeredChannels.get(channelID);
            if (channelContext != null) {
                if (!channelContext.isInputChannel()) {
                    LOG.error("Local receiver " + channelID + " is not an input channel, but is supposed to accept a buffer");
                }
                channelContext.queueTransferEnvelope(transferEnvelope);
                return;
            } else {
                try {
                    sendReceiverNotFoundEvent(transferEnvelope, channelID);
                    recycleBuffer(transferEnvelope);
                    return;
                } catch (Throwable th) {
                    recycleBuffer(transferEnvelope);
                    throw th;
                }
            }
        }
        Buffer buffer = transferEnvelope.getBuffer();
        try {
            if (transferEnvelopeReceiverList.hasLocalReceivers()) {
                for (ChannelID channelID2 : transferEnvelopeReceiverList.getLocalReceivers()) {
                    ChannelContext channelContext2 = this.registeredChannels.get(channelID2);
                    if (channelContext2 == null) {
                        sendReceiverNotFoundEvent(transferEnvelope, channelID2);
                    } else if (channelContext2.isInputChannel()) {
                        InputChannelContext inputChannelContext = (InputChannelContext) channelContext2;
                        Buffer buffer2 = DEFAULT_ALLOW_SENDER_SIDE_SPILLING;
                        try {
                            buffer2 = inputChannelContext.requestEmptyBufferBlocking(buffer.size());
                            buffer.copyToBuffer(buffer2);
                            TransferEnvelope duplicateWithoutBuffer = transferEnvelope.duplicateWithoutBuffer();
                            duplicateWithoutBuffer.setBuffer(buffer2);
                            inputChannelContext.queueTransferEnvelope(duplicateWithoutBuffer);
                        } catch (IOException e) {
                            if (buffer2 != null) {
                                buffer2.recycleBuffer();
                            }
                            throw e;
                        }
                    } else {
                        LOG.error("Local receiver " + channelID2 + " is not an input channel, but is supposed to accept a buffer");
                    }
                }
            }
            if (transferEnvelopeReceiverList.hasRemoteReceivers()) {
                List<RemoteReceiver> remoteReceivers = transferEnvelopeReceiverList.getRemoteReceivers();
                if (transferEnvelope.getSequenceNumber() == 0) {
                    generateSenderHint(transferEnvelope, remoteReceivers);
                }
                Iterator<RemoteReceiver> it = remoteReceivers.iterator();
                while (it.hasNext()) {
                    this.networkConnectionManager.queueEnvelopeForTransfer(it.next(), transferEnvelope.duplicate());
                }
            }
        } finally {
            buffer.recycleBuffer();
        }
    }

    private void processEnvelopeEnvelopeWithoutBuffer(TransferEnvelope transferEnvelope, TransferEnvelopeReceiverList transferEnvelopeReceiverList) throws IOException, InterruptedException {
        for (ChannelID channelID : transferEnvelopeReceiverList.getLocalReceivers()) {
            ChannelContext channelContext = this.registeredChannels.get(channelID);
            if (channelContext == null) {
                sendReceiverNotFoundEvent(transferEnvelope, channelID);
            } else {
                channelContext.queueTransferEnvelope(transferEnvelope);
            }
        }
        if (transferEnvelopeReceiverList.hasRemoteReceivers()) {
            List<RemoteReceiver> remoteReceivers = transferEnvelopeReceiverList.getRemoteReceivers();
            if (transferEnvelope.getSequenceNumber() == 0) {
                generateSenderHint(transferEnvelope, remoteReceivers);
            }
            Iterator<RemoteReceiver> it = remoteReceivers.iterator();
            while (it.hasNext()) {
                this.networkConnectionManager.queueEnvelopeForTransfer(it.next(), transferEnvelope);
            }
        }
    }

    private void addReceiverListHint(ChannelID channelID, ChannelID channelID2) {
        if (this.receiverCache.put(channelID, new TransferEnvelopeReceiverList(channelID2)) != null) {
            LOG.warn("Receiver cache already contained entry for " + channelID);
        }
    }

    private void addReceiverListHint(ChannelID channelID, RemoteReceiver remoteReceiver) {
        if (this.receiverCache.put(channelID, new TransferEnvelopeReceiverList(remoteReceiver)) != null) {
            LOG.warn("Receiver cache already contained entry for " + channelID);
        }
    }

    private void generateSenderHint(TransferEnvelope transferEnvelope, List<RemoteReceiver> list) {
        ChannelContext channelContext = this.registeredChannels.get(transferEnvelope.getSource());
        if (channelContext == null) {
            LOG.error("Cannot find channel context for channel ID " + transferEnvelope.getSource());
            return;
        }
        if (channelContext.isInputChannel()) {
            return;
        }
        TransferEnvelope createEnvelopeWithEvent = SenderHintEvent.createEnvelopeWithEvent(transferEnvelope, channelContext.getConnectedChannelID(), new RemoteReceiver(new InetSocketAddress(this.localConnectionInfo.getAddress(), this.localConnectionInfo.getDataPort()), list.get(DEFAULT_ALLOW_SENDER_SIDE_SPILLING).getConnectionIndex()));
        Iterator<RemoteReceiver> it = list.iterator();
        while (it.hasNext()) {
            this.networkConnectionManager.queueEnvelopeForTransfer(it.next(), createEnvelopeWithEvent);
        }
    }

    private TransferEnvelopeReceiverList getReceiverList(JobID jobID, ChannelID channelID) throws IOException, InterruptedException {
        ConnectionInfoLookupResponse lookupConnectionInfo;
        TransferEnvelopeReceiverList transferEnvelopeReceiverList = this.receiverCache.get(channelID);
        if (transferEnvelopeReceiverList != null) {
            return transferEnvelopeReceiverList;
        }
        while (true) {
            if (!Thread.currentThread().isInterrupted()) {
                synchronized (this.channelLookupService) {
                    lookupConnectionInfo = this.channelLookupService.lookupConnectionInfo(this.localConnectionInfo, jobID, channelID);
                }
                if (lookupConnectionInfo.isJobAborting()) {
                    break;
                }
                if (lookupConnectionInfo.receiverNotFound()) {
                    LOG.error("Cannot find task(s) waiting for data from source channel with ID " + channelID);
                    break;
                }
                if (lookupConnectionInfo.receiverNotReady()) {
                    Thread.sleep(500L);
                } else if (lookupConnectionInfo.receiverReady()) {
                    transferEnvelopeReceiverList = new TransferEnvelopeReceiverList(lookupConnectionInfo);
                    break;
                }
            } else {
                break;
            }
        }
        if (transferEnvelopeReceiverList != null) {
            this.receiverCache.put(channelID, transferEnvelopeReceiverList);
            if (LOG.isDebugEnabled()) {
                StringBuilder sb = new StringBuilder();
                sb.append("Receiver list for source channel ID " + channelID + " at task manager " + this.localConnectionInfo + "\n");
                if (transferEnvelopeReceiverList.hasLocalReceivers()) {
                    sb.append("\tLocal receivers:\n");
                    Iterator<ChannelID> it = transferEnvelopeReceiverList.getLocalReceivers().iterator();
                    while (it.hasNext()) {
                        sb.append("\t\t" + it.next() + "\n");
                    }
                }
                if (transferEnvelopeReceiverList.hasRemoteReceivers()) {
                    sb.append("Remote receivers:\n");
                    Iterator<RemoteReceiver> it2 = transferEnvelopeReceiverList.getRemoteReceivers().iterator();
                    while (it2.hasNext()) {
                        sb.append("\t\t" + it2.next() + "\n");
                    }
                }
                LOG.debug(sb.toString());
            }
        }
        return transferEnvelopeReceiverList;
    }

    @Override // eu.stratosphere.nephele.taskmanager.transferenvelope.TransferEnvelopeDispatcher
    public void processEnvelopeFromOutputChannel(TransferEnvelope transferEnvelope) throws IOException, InterruptedException {
        processEnvelope(transferEnvelope, true);
    }

    @Override // eu.stratosphere.nephele.taskmanager.transferenvelope.TransferEnvelopeDispatcher
    public void processEnvelopeFromInputChannel(TransferEnvelope transferEnvelope) throws IOException, InterruptedException {
        processEnvelope(transferEnvelope, false);
    }

    @Override // eu.stratosphere.nephele.taskmanager.transferenvelope.TransferEnvelopeDispatcher
    public void processEnvelopeFromNetwork(TransferEnvelope transferEnvelope, boolean z) throws IOException, InterruptedException {
        if (SenderHintEvent.isSenderHintEvent(transferEnvelope)) {
            SenderHintEvent senderHintEvent = (SenderHintEvent) transferEnvelope.getEventList().get(DEFAULT_ALLOW_SENDER_SIDE_SPILLING);
            if (this.registeredChannels.get(senderHintEvent.getSource()) != null) {
                addReceiverListHint(senderHintEvent.getSource(), senderHintEvent.getRemoteReceiver());
                return;
            }
        }
        processEnvelope(transferEnvelope, z);
    }

    public void logBufferUtilization() {
        System.out.println("Buffer utilization at " + System.currentTimeMillis());
        System.out.println("\tUnused global buffers: " + GlobalBufferPool.getInstance().getCurrentNumberOfBuffers());
        System.out.println("\tLocal buffer pool status:");
        Iterator<LocalBufferPoolOwner> it = this.localBufferPoolOwner.values().iterator();
        while (it.hasNext()) {
            it.next().logBufferUtilization();
        }
        this.networkConnectionManager.logBufferUtilization();
        System.out.println("\tIncoming connections:");
        Iterator<Map.Entry<ChannelID, ChannelContext>> it2 = this.registeredChannels.entrySet().iterator();
        while (it2.hasNext()) {
            ChannelContext value = it2.next().getValue();
            if (value.isInputChannel()) {
                ((InputChannelContext) value).logQueuedEnvelopes();
            }
        }
    }

    @Override // eu.stratosphere.nephele.taskmanager.bufferprovider.BufferProviderBroker
    public BufferProvider getBufferProvider(JobID jobID, ChannelID channelID) throws IOException, InterruptedException {
        TransferEnvelopeReceiverList receiverList = getReceiverList(jobID, channelID);
        if (receiverList == null) {
            return this.transitBufferPool;
        }
        if (receiverList.hasLocalReceivers() && !receiverList.hasRemoteReceivers()) {
            List<ChannelID> localReceivers = receiverList.getLocalReceivers();
            if (localReceivers.size() == DEFAULT_MERGE_SPILLED_BUFFERS) {
                ChannelID channelID2 = localReceivers.get(DEFAULT_ALLOW_SENDER_SIDE_SPILLING);
                ChannelContext channelContext = this.registeredChannels.get(channelID2);
                if (channelContext == null) {
                    return this.transitBufferPool;
                }
                if (channelContext.isInputChannel()) {
                    return (InputChannelContext) channelContext;
                }
                throw new IOException("Channel context for local receiver " + channelID2 + " is not an input channel context");
            }
        }
        return this.transitBufferPool;
    }

    private void checkBufferAvailability(Task task) throws InsufficientResourcesException {
        int totalNumberOfBuffers = GlobalBufferPool.getInstance().getTotalNumberOfBuffers();
        int size = this.registeredChannels.size();
        getClass();
        int i = size + NUMBER_OF_CHANNELS_FOR_MULTICAST;
        Environment environment = task.getEnvironment();
        int numberOfOutputChannels = i + environment.getNumberOfOutputChannels() + environment.getNumberOfInputChannels();
        if (totalNumberOfBuffers / numberOfOutputChannels < 1.0d) {
            throw new InsufficientResourcesException(this.localConnectionInfo.getHostName() + " has not enough buffers available to safely execute " + environment.getTaskName() + " (" + (numberOfOutputChannels - totalNumberOfBuffers) + " buffers are currently missing)");
        }
    }

    private void redistributeGlobalBuffers() {
        int totalNumberOfBuffers = GlobalBufferPool.getInstance().getTotalNumberOfBuffers();
        int size = this.registeredChannels.size();
        getClass();
        int i = size + NUMBER_OF_CHANNELS_FOR_MULTICAST;
        double d = totalNumberOfBuffers / i;
        if (d < 1.0d) {
            LOG.warn("System is low on memory buffers. This may result in reduced performance.");
        }
        if (LOG.isDebugEnabled()) {
            LOG.debug("Total number of buffers is " + totalNumberOfBuffers);
            LOG.debug("Total number of channels is " + i);
        }
        if (this.localBufferPoolOwner.isEmpty()) {
            return;
        }
        Iterator<LocalBufferPoolOwner> it = this.localBufferPoolOwner.values().iterator();
        while (it.hasNext()) {
            it.next().setDesignatedNumberOfBuffers((int) Math.ceil(d * r0.getNumberOfChannels()));
        }
        getClass();
        this.transitBufferPool.setDesignatedNumberOfBuffers((int) Math.ceil(d * 10.0d));
    }

    public void invalidateLookupCacheEntries(Set<ChannelID> set) {
        Iterator<ChannelID> it = set.iterator();
        while (it.hasNext()) {
            this.receiverCache.remove(it.next());
        }
    }

    public void reportAsynchronousEvent(ExecutionVertexID executionVertexID) {
        LocalBufferPoolOwner localBufferPoolOwner = this.localBufferPoolOwner.get(executionVertexID);
        if (localBufferPoolOwner == null) {
            System.out.println("Cannot find local buffer pool owner for " + executionVertexID);
        } else {
            localBufferPoolOwner.reportAsynchronousEvent();
        }
    }
}
