package eu.stratosphere.nephele.taskmanager.bytebuffered;

import eu.stratosphere.nephele.io.channels.ChannelID;
import eu.stratosphere.nephele.taskmanager.transferenvelope.DefaultSerializer;
import eu.stratosphere.nephele.taskmanager.transferenvelope.TransferEnvelope;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
import java.nio.channels.WritableByteChannel;
import java.util.ArrayDeque;
import java.util.Iterator;
import java.util.Queue;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

/* loaded from: input_file:eu/stratosphere/nephele/taskmanager/bytebuffered/OutgoingConnection.class */
public class OutgoingConnection {
    private final RemoteReceiver remoteReceiver;
    private final OutgoingConnectionThread connectionThread;
    private final int numberOfConnectionRetries;
    private static final Log LOG = LogFactory.getLog(OutgoingConnection.class);
    private static long RETRYINTERVAL = 1000;
    private final Queue<TransferEnvelope> queuedEnvelopes = new ArrayDeque();
    private final DefaultSerializer serializer = new DefaultSerializer();
    private TransferEnvelope currentEnvelope = null;
    private boolean isConnected = false;
    private boolean isSubscribedToWriteEvent = false;
    private int retriesLeft = 0;
    private long timstampOfLastRetry = 0;
    private SelectionKey selectionKey = null;

    public OutgoingConnection(RemoteReceiver remoteReceiver, OutgoingConnectionThread outgoingConnectionThread, int i) {
        this.remoteReceiver = remoteReceiver;
        this.connectionThread = outgoingConnectionThread;
        this.numberOfConnectionRetries = i;
    }

    public void queueEnvelope(TransferEnvelope transferEnvelope) {
        synchronized (this.queuedEnvelopes) {
            checkConnection();
            this.queuedEnvelopes.add(transferEnvelope);
        }
    }

    private void checkConnection() {
        synchronized (this.queuedEnvelopes) {
            if (!this.isConnected) {
                this.retriesLeft = this.numberOfConnectionRetries;
                this.timstampOfLastRetry = System.currentTimeMillis();
                this.connectionThread.triggerConnect(this);
                this.isConnected = true;
                this.isSubscribedToWriteEvent = true;
            } else if (!this.isSubscribedToWriteEvent) {
                this.connectionThread.subscribeToWriteEvent(this.selectionKey);
                this.isSubscribedToWriteEvent = true;
            }
        }
    }

    public InetSocketAddress getConnectionAddress() {
        return this.remoteReceiver.getConnectionAddress();
    }

    public void reportConnectionProblem(IOException iOException) {
        long currentTimeMillis = System.currentTimeMillis();
        if (currentTimeMillis - this.timstampOfLastRetry >= RETRYINTERVAL) {
            LOG.error("Cannot connect to " + this.remoteReceiver + ", " + this.retriesLeft + " retries left");
        }
        synchronized (this.queuedEnvelopes) {
            if (this.selectionKey != null) {
                SocketChannel socketChannel = (SocketChannel) this.selectionKey.channel();
                if (socketChannel != null) {
                    try {
                        socketChannel.close();
                    } catch (IOException e) {
                        LOG.debug("Error while trying to close the socket channel to " + this.remoteReceiver);
                    }
                }
                this.selectionKey.cancel();
                this.selectionKey = null;
                this.isConnected = false;
                this.isSubscribedToWriteEvent = false;
            }
            if (hasRetriesLeft(currentTimeMillis)) {
                this.connectionThread.triggerConnect(this);
                this.isConnected = true;
                this.isSubscribedToWriteEvent = true;
                return;
            }
            LOG.error(iOException);
            if (this.currentEnvelope != null && this.currentEnvelope.getBuffer() != null) {
                this.currentEnvelope.getBuffer().recycleBuffer();
                this.currentEnvelope = null;
            }
            Iterator<TransferEnvelope> it = this.queuedEnvelopes.iterator();
            while (it.hasNext()) {
                TransferEnvelope next = it.next();
                it.remove();
                if (next.getBuffer() != null) {
                    next.getBuffer().recycleBuffer();
                }
            }
            this.queuedEnvelopes.clear();
        }
    }

    public void reportTransmissionProblem(IOException iOException) {
        SocketChannel socketChannel = (SocketChannel) this.selectionKey.channel();
        if (this.currentEnvelope != null) {
            LOG.error("The connection between " + socketChannel.socket().getLocalAddress() + " and " + socketChannel.socket().getRemoteSocketAddress() + " experienced an IOException for transfer envelope " + this.currentEnvelope.getSequenceNumber());
        } else {
            LOG.error("The connection between " + socketChannel.socket().getLocalAddress() + " and " + socketChannel.socket().getRemoteSocketAddress() + " experienced an IOException");
        }
        synchronized (this.queuedEnvelopes) {
            try {
                LOG.debug("Closing connection to " + socketChannel.socket().getRemoteSocketAddress());
                socketChannel.close();
            } catch (IOException e) {
                LOG.debug("An error occurred while responding to an IOException");
                LOG.debug(e);
            }
            this.selectionKey.cancel();
            LOG.error(iOException);
            if (this.queuedEnvelopes.isEmpty()) {
                this.isConnected = false;
                this.isSubscribedToWriteEvent = false;
            } else {
                this.connectionThread.triggerConnect(this);
                this.isConnected = true;
                this.isSubscribedToWriteEvent = true;
            }
            if (this.currentEnvelope != null && this.currentEnvelope.getBuffer() != null) {
                this.currentEnvelope.getBuffer().recycleBuffer();
                this.currentEnvelope = null;
            }
        }
    }

    private boolean hasRetriesLeft(long j) {
        if (j - this.timstampOfLastRetry < RETRYINTERVAL) {
            return true;
        }
        this.retriesLeft--;
        this.timstampOfLastRetry = j;
        return this.retriesLeft != 0;
    }

    public boolean write() throws IOException {
        WritableByteChannel writableByteChannel = (WritableByteChannel) this.selectionKey.channel();
        if (this.currentEnvelope == null) {
            synchronized (this.queuedEnvelopes) {
                if (this.queuedEnvelopes.isEmpty()) {
                    return false;
                }
                this.currentEnvelope = this.queuedEnvelopes.peek();
                this.serializer.setTransferEnvelope(this.currentEnvelope);
            }
        }
        if (this.serializer.write(writableByteChannel)) {
            return true;
        }
        if (this.currentEnvelope.getBuffer() != null) {
            this.currentEnvelope.getBuffer().recycleBuffer();
        }
        synchronized (this.queuedEnvelopes) {
            this.queuedEnvelopes.poll();
            this.currentEnvelope = null;
        }
        return true;
    }

    public void requestClose() throws IOException {
        synchronized (this.queuedEnvelopes) {
            if (this.queuedEnvelopes.isEmpty() && this.isSubscribedToWriteEvent) {
                this.connectionThread.unsubscribeFromWriteEvent(this.selectionKey);
                this.isSubscribedToWriteEvent = false;
            }
        }
    }

    public void closeConnection() throws IOException {
        synchronized (this.queuedEnvelopes) {
            if (this.queuedEnvelopes.isEmpty()) {
                if (this.selectionKey != null) {
                    ((SocketChannel) this.selectionKey.channel()).close();
                    this.selectionKey.cancel();
                    this.selectionKey = null;
                }
                this.isConnected = false;
                this.isSubscribedToWriteEvent = false;
            }
        }
    }

    public int getNumberOfQueuedEnvelopesFromChannel(ChannelID channelID) {
        int i;
        synchronized (this.queuedEnvelopes) {
            int i2 = 0;
            Iterator<TransferEnvelope> it = this.queuedEnvelopes.iterator();
            while (it.hasNext()) {
                if (channelID.equals(it.next().getSource())) {
                    i2++;
                }
            }
            i = i2;
        }
        return i;
    }

    public void dropAllQueuedEnvelopesFromChannel(ChannelID channelID) {
        synchronized (this.queuedEnvelopes) {
            Iterator<TransferEnvelope> it = this.queuedEnvelopes.iterator();
            while (it.hasNext()) {
                TransferEnvelope next = it.next();
                if (channelID.equals(next.getSource())) {
                    it.remove();
                    if (next.getBuffer() != null) {
                        next.getBuffer().recycleBuffer();
                    }
                }
            }
        }
    }

    public boolean canBeRemoved() {
        synchronized (this.queuedEnvelopes) {
            if (this.isConnected) {
                return false;
            }
            if (this.currentEnvelope != null) {
                return false;
            }
            return this.queuedEnvelopes.isEmpty();
        }
    }

    public void setSelectionKey(SelectionKey selectionKey) {
        this.selectionKey = selectionKey;
    }

    public int getNumberOfQueuedWriteBuffers() {
        int i = 0;
        synchronized (this.queuedEnvelopes) {
            Iterator<TransferEnvelope> it = this.queuedEnvelopes.iterator();
            while (it.hasNext()) {
                if (it.next().getBuffer() != null) {
                    i++;
                }
            }
        }
        return i;
    }
}
