package eu.stratosphere.nephele.taskmanager.bytebuffered;

import eu.stratosphere.util.StringUtils;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.ArrayDeque;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Queue;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

/* loaded from: input_file:eu/stratosphere/nephele/taskmanager/bytebuffered/OutgoingConnectionThread.class */
public class OutgoingConnectionThread extends Thread {
    private static final long MIN_IDLE_TIME_BEFORE_CLOSE = 80000;
    private static final Log LOG = LogFactory.getLog(OutgoingConnectionThread.class);
    private final Selector selector;
    private final Queue<OutgoingConnection> pendingConnectionRequests;
    private final Queue<SelectionKey> pendingWriteEventSubscribeRequests;
    private final Map<OutgoingConnection, Long> connectionsToClose;

    public OutgoingConnectionThread() throws IOException {
        super("Outgoing Connection Thread");
        this.pendingConnectionRequests = new ArrayDeque();
        this.pendingWriteEventSubscribeRequests = new ArrayDeque();
        this.connectionsToClose = new HashMap();
        this.selector = Selector.open();
    }

    @Override // java.lang.Thread, java.lang.Runnable
    public void run() {
        while (!isInterrupted()) {
            synchronized (this.pendingConnectionRequests) {
                if (!this.pendingConnectionRequests.isEmpty()) {
                    final OutgoingConnection poll = this.pendingConnectionRequests.poll();
                    try {
                        SocketChannel open = SocketChannel.open();
                        open.configureBlocking(false);
                        SelectionKey register = open.register(this.selector, 8);
                        open.connect(poll.getConnectionAddress());
                        register.attach(poll);
                    } catch (IOException e) {
                        new Thread(new Runnable() { // from class: eu.stratosphere.nephele.taskmanager.bytebuffered.OutgoingConnectionThread.1
                            @Override // java.lang.Runnable
                            public void run() {
                                poll.reportConnectionProblem(e);
                            }
                        }).start();
                    }
                }
            }
            synchronized (this.pendingWriteEventSubscribeRequests) {
                if (!this.pendingWriteEventSubscribeRequests.isEmpty()) {
                    SelectionKey poll2 = this.pendingWriteEventSubscribeRequests.poll();
                    final OutgoingConnection outgoingConnection = (OutgoingConnection) poll2.attachment();
                    try {
                        SelectionKey register2 = ((SocketChannel) poll2.channel()).register(this.selector, 5);
                        register2.attach(outgoingConnection);
                        outgoingConnection.setSelectionKey(register2);
                    } catch (IOException e2) {
                        new Thread(new Runnable() { // from class: eu.stratosphere.nephele.taskmanager.bytebuffered.OutgoingConnectionThread.2
                            @Override // java.lang.Runnable
                            public void run() {
                                outgoingConnection.reportTransmissionProblem(e2);
                            }
                        }).start();
                    }
                }
            }
            synchronized (this.connectionsToClose) {
                Iterator<Map.Entry<OutgoingConnection, Long>> it = this.connectionsToClose.entrySet().iterator();
                long currentTimeMillis = System.currentTimeMillis();
                while (it.hasNext()) {
                    Map.Entry<OutgoingConnection, Long> next = it.next();
                    if (next.getValue().longValue() + MIN_IDLE_TIME_BEFORE_CLOSE < currentTimeMillis) {
                        final OutgoingConnection key = next.getKey();
                        it.remove();
                        new Thread(new Runnable() { // from class: eu.stratosphere.nephele.taskmanager.bytebuffered.OutgoingConnectionThread.3
                            @Override // java.lang.Runnable
                            public void run() {
                                try {
                                    key.closeConnection();
                                } catch (IOException e3) {
                                    key.reportTransmissionProblem(e3);
                                }
                            }
                        }).start();
                    }
                }
            }
            try {
                this.selector.select(10L);
            } catch (IOException e3) {
                LOG.error(e3);
            }
            Iterator<SelectionKey> it2 = this.selector.selectedKeys().iterator();
            while (it2.hasNext()) {
                SelectionKey next2 = it2.next();
                it2.remove();
                if (!next2.isValid()) {
                    LOG.error("Received invalid key: " + next2);
                } else if (next2.isConnectable()) {
                    doConnect(next2);
                } else if (next2.isReadable()) {
                    doRead(next2);
                } else if (next2.isWritable()) {
                    doWrite(next2);
                }
            }
        }
        try {
            this.selector.close();
        } catch (IOException e4) {
            LOG.debug(StringUtils.stringifyException(e4));
        }
    }

    private void doConnect(SelectionKey selectionKey) {
        OutgoingConnection outgoingConnection = (OutgoingConnection) selectionKey.attachment();
        SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
        while (!socketChannel.finishConnect()) {
            try {
                try {
                    Thread.sleep(100L);
                } catch (InterruptedException e) {
                    LOG.error(e);
                }
            } catch (IOException e2) {
                outgoingConnection.reportConnectionProblem(e2);
                return;
            }
        }
        SelectionKey register = socketChannel.register(this.selector, 5);
        outgoingConnection.setSelectionKey(register);
        register.attach(outgoingConnection);
    }

    private void doWrite(SelectionKey selectionKey) {
        OutgoingConnection outgoingConnection = (OutgoingConnection) selectionKey.attachment();
        try {
            if (!outgoingConnection.write()) {
                outgoingConnection.requestClose();
            }
        } catch (IOException e) {
            outgoingConnection.reportTransmissionProblem(e);
        }
    }

    private void doRead(SelectionKey selectionKey) {
        SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
        OutgoingConnection outgoingConnection = (OutgoingConnection) selectionKey.attachment();
        try {
            if (socketChannel.read(ByteBuffer.allocate(8)) == -1) {
                outgoingConnection.reportTransmissionProblem(new IOException("Read unexpected EOF from channel"));
            } else {
                LOG.error("Outgoing connection read real data from channel");
            }
        } catch (IOException e) {
            outgoingConnection.reportTransmissionProblem(e);
        }
    }

    public void triggerConnect(OutgoingConnection outgoingConnection) {
        synchronized (this.pendingConnectionRequests) {
            this.pendingConnectionRequests.add(outgoingConnection);
        }
    }

    public void unsubscribeFromWriteEvent(SelectionKey selectionKey) throws IOException {
        SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
        OutgoingConnection outgoingConnection = (OutgoingConnection) selectionKey.attachment();
        SelectionKey register = socketChannel.register(this.selector, 1);
        register.attach(outgoingConnection);
        outgoingConnection.setSelectionKey(register);
        synchronized (this.connectionsToClose) {
            this.connectionsToClose.put(outgoingConnection, Long.valueOf(System.currentTimeMillis()));
        }
    }

    public void subscribeToWriteEvent(SelectionKey selectionKey) {
        synchronized (this.pendingWriteEventSubscribeRequests) {
            this.pendingWriteEventSubscribeRequests.add(selectionKey);
        }
        synchronized (this.connectionsToClose) {
            this.connectionsToClose.remove((OutgoingConnection) selectionKey.attachment());
        }
    }
}
