package eu.stratosphere.nephele.taskmanager.bytebuffered;

import eu.stratosphere.nephele.taskmanager.bufferprovider.BufferAvailabilityListener;
import eu.stratosphere.nephele.taskmanager.transferenvelope.NoBufferAvailableException;
import eu.stratosphere.util.StringUtils;
import java.io.EOFException;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.ArrayDeque;
import java.util.Iterator;
import java.util.Queue;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

/* loaded from: input_file:eu/stratosphere/nephele/taskmanager/bytebuffered/IncomingConnectionThread.class */
public class IncomingConnectionThread extends Thread {
    private static final Log LOG = LogFactory.getLog(IncomingConnectionThread.class);
    private final ByteBufferedChannelManager byteBufferedChannelManager;
    private final Selector selector;
    private final Queue<SelectionKey> pendingReadEventSubscribeRequests;
    private final ServerSocketChannel listeningSocket;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:eu/stratosphere/nephele/taskmanager/bytebuffered/IncomingConnectionThread$IncomingConnectionBufferAvailListener.class */
    public static final class IncomingConnectionBufferAvailListener implements BufferAvailabilityListener {
        private final Queue<SelectionKey> pendingReadEventSubscribeRequests;
        private final SelectionKey key;

        private IncomingConnectionBufferAvailListener(Queue<SelectionKey> queue, SelectionKey selectionKey) {
            this.pendingReadEventSubscribeRequests = queue;
            this.key = selectionKey;
        }

        @Override // eu.stratosphere.nephele.taskmanager.bufferprovider.BufferAvailabilityListener
        public void bufferAvailable() {
            synchronized (this.pendingReadEventSubscribeRequests) {
                this.pendingReadEventSubscribeRequests.add(this.key);
            }
        }
    }

    public IncomingConnectionThread(ByteBufferedChannelManager byteBufferedChannelManager, boolean z, InetSocketAddress inetSocketAddress) throws IOException {
        super("Incoming Connection Thread");
        this.pendingReadEventSubscribeRequests = new ArrayDeque();
        this.selector = Selector.open();
        this.byteBufferedChannelManager = byteBufferedChannelManager;
        if (!z) {
            this.listeningSocket = null;
            return;
        }
        this.listeningSocket = ServerSocketChannel.open();
        this.listeningSocket.configureBlocking(false);
        this.listeningSocket.register(this.selector, 16);
        this.listeningSocket.socket().bind(inetSocketAddress);
        LOG.debug("Listening on " + this.listeningSocket.socket().getLocalSocketAddress());
    }

    @Override // java.lang.Thread, java.lang.Runnable
    public void run() {
        while (!isInterrupted()) {
            synchronized (this.pendingReadEventSubscribeRequests) {
                while (!this.pendingReadEventSubscribeRequests.isEmpty()) {
                    SelectionKey poll = this.pendingReadEventSubscribeRequests.poll();
                    IncomingConnection incomingConnection = (IncomingConnection) poll.attachment();
                    try {
                        ((SocketChannel) poll.channel()).register(this.selector, 1).attach(incomingConnection);
                    } catch (ClosedChannelException e) {
                        incomingConnection.reportTransmissionProblem(poll, e);
                    }
                }
            }
            try {
                this.selector.select(500L);
            } catch (IOException e2) {
                LOG.error(e2);
            }
            Iterator<SelectionKey> it = this.selector.selectedKeys().iterator();
            while (it.hasNext()) {
                SelectionKey next = it.next();
                it.remove();
                if (!next.isValid()) {
                    LOG.error("Received invalid key: " + next);
                } else if (next.isReadable()) {
                    doRead(next);
                } else if (next.isAcceptable()) {
                    doAccept(next);
                } else {
                    LOG.error("Unknown key: " + next);
                }
            }
        }
        if (this.listeningSocket != null) {
            try {
                this.listeningSocket.close();
            } catch (IOException e3) {
                LOG.debug(e3);
            }
        }
        try {
            this.selector.close();
        } catch (IOException e4) {
            LOG.debug(StringUtils.stringifyException(e4));
        }
    }

    private void doAccept(SelectionKey selectionKey) {
        try {
            SocketChannel accept = this.listeningSocket.accept();
            if (accept == null) {
                LOG.error("Client socket is null");
                return;
            }
            IncomingConnection incomingConnection = new IncomingConnection(this.byteBufferedChannelManager, accept);
            SelectionKey selectionKey2 = null;
            try {
                accept.configureBlocking(false);
                selectionKey2 = accept.register(this.selector, 1);
                selectionKey2.attach(incomingConnection);
            } catch (IOException e) {
                incomingConnection.reportTransmissionProblem(selectionKey2, e);
            }
        } catch (IOException e2) {
            LOG.error(e2);
        }
    }

    private void doRead(SelectionKey selectionKey) {
        IncomingConnection incomingConnection = (IncomingConnection) selectionKey.attachment();
        try {
            incomingConnection.read();
        } catch (NoBufferAvailableException e) {
            SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
            try {
                socketChannel.register(this.selector, 0).attach(incomingConnection);
            } catch (ClosedChannelException e2) {
                incomingConnection.reportTransmissionProblem(selectionKey, e2);
            }
            if (e.getBufferProvider().registerBufferAvailabilityListener(new IncomingConnectionBufferAvailListener(this.pendingReadEventSubscribeRequests, selectionKey))) {
                return;
            }
            try {
                socketChannel.register(this.selector, 1).attach(incomingConnection);
            } catch (ClosedChannelException e3) {
                incomingConnection.reportTransmissionProblem(selectionKey, e3);
            }
        } catch (EOFException e4) {
            if (!incomingConnection.isCloseUnexpected()) {
                incomingConnection.closeConnection(selectionKey);
                return;
            }
            LOG.error("Connection from " + ((SocketChannel) selectionKey.channel()).socket().getRemoteSocketAddress() + " was closed unexpectedly");
            incomingConnection.reportTransmissionProblem(selectionKey, e4);
        } catch (IOException e5) {
            incomingConnection.reportTransmissionProblem(selectionKey, e5);
        } catch (InterruptedException e6) {
        }
    }
}
