package eu.stratosphere.nephele.taskmanager.bytebuffered;

import eu.stratosphere.configuration.Configuration;
import eu.stratosphere.configuration.GlobalConfiguration;
import eu.stratosphere.nephele.taskmanager.transferenvelope.TransferEnvelope;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;

/* loaded from: input_file:eu/stratosphere/nephele/taskmanager/bytebuffered/NetworkConnectionManager.class */
public final class NetworkConnectionManager {
    private static final int DEFAULT_NUMBER_OF_OUTGOING_CONNECTION_THREADS = 1;
    private static final int DEFAULT_NUMBER_OF_CONNECTION_RETRIES = 10;
    private final IncomingConnectionThread incomingConnectionThread;
    private final int numberOfConnectionRetries;
    private final ByteBufferedChannelManager byteBufferedChannelManager;
    private final List<OutgoingConnectionThread> outgoingConnectionThreads = new CopyOnWriteArrayList();
    private final ConcurrentMap<RemoteReceiver, OutgoingConnection> outgoingConnections = new ConcurrentHashMap();

    public NetworkConnectionManager(ByteBufferedChannelManager byteBufferedChannelManager, InetAddress inetAddress, int i) throws IOException {
        Configuration configuration = GlobalConfiguration.getConfiguration();
        this.byteBufferedChannelManager = byteBufferedChannelManager;
        int integer = configuration.getInteger("channel.network.numberOfOutgoingConnectionThreads", DEFAULT_NUMBER_OF_OUTGOING_CONNECTION_THREADS);
        for (int i2 = 0; i2 < integer; i2 += DEFAULT_NUMBER_OF_OUTGOING_CONNECTION_THREADS) {
            OutgoingConnectionThread outgoingConnectionThread = new OutgoingConnectionThread();
            outgoingConnectionThread.start();
            this.outgoingConnectionThreads.add(outgoingConnectionThread);
        }
        this.incomingConnectionThread = new IncomingConnectionThread(this.byteBufferedChannelManager, true, new InetSocketAddress(inetAddress, i));
        this.incomingConnectionThread.start();
        this.numberOfConnectionRetries = configuration.getInteger("channel.network.numberOfConnectionRetries", DEFAULT_NUMBER_OF_CONNECTION_RETRIES);
    }

    private OutgoingConnectionThread getOutgoingConnectionThread() {
        return this.outgoingConnectionThreads.get((int) (this.outgoingConnectionThreads.size() * Math.random()));
    }

    public void queueEnvelopeForTransfer(RemoteReceiver remoteReceiver, TransferEnvelope transferEnvelope) {
        getOutgoingConnection(remoteReceiver).queueEnvelope(transferEnvelope);
    }

    private OutgoingConnection getOutgoingConnection(RemoteReceiver remoteReceiver) {
        OutgoingConnection outgoingConnection = this.outgoingConnections.get(remoteReceiver);
        if (outgoingConnection == null) {
            outgoingConnection = new OutgoingConnection(remoteReceiver, getOutgoingConnectionThread(), this.numberOfConnectionRetries);
            OutgoingConnection putIfAbsent = this.outgoingConnections.putIfAbsent(remoteReceiver, outgoingConnection);
            if (putIfAbsent != null) {
                outgoingConnection = putIfAbsent;
            }
        }
        return outgoingConnection;
    }

    public void shutDown() {
        this.incomingConnectionThread.interrupt();
        Iterator<OutgoingConnectionThread> it = this.outgoingConnectionThreads.iterator();
        while (it.hasNext()) {
            it.next().interrupt();
        }
    }

    public void logBufferUtilization() {
        System.out.println("\tOutgoing connections:");
        for (Map.Entry<RemoteReceiver, OutgoingConnection> entry : this.outgoingConnections.entrySet()) {
            System.out.println("\t\tOC " + entry.getKey() + ": " + entry.getValue().getNumberOfQueuedWriteBuffers());
        }
    }
}
