package eu.stratosphere.nephele.taskmanager.runtime;

import eu.stratosphere.nephele.event.task.AbstractEvent;
import eu.stratosphere.nephele.io.channels.Buffer;
import eu.stratosphere.nephele.io.channels.ChannelID;
import eu.stratosphere.nephele.io.channels.ChannelType;
import eu.stratosphere.nephele.io.channels.bytebuffered.AbstractByteBufferedInputChannel;
import eu.stratosphere.nephele.io.channels.bytebuffered.BufferOrEvent;
import eu.stratosphere.nephele.io.channels.bytebuffered.ByteBufferedInputChannelBroker;
import eu.stratosphere.nephele.jobgraph.JobID;
import eu.stratosphere.nephele.taskmanager.bufferprovider.BufferAvailabilityListener;
import eu.stratosphere.nephele.taskmanager.bytebuffered.InputChannelContext;
import eu.stratosphere.nephele.taskmanager.bytebuffered.ReceiverNotFoundEvent;
import eu.stratosphere.nephele.taskmanager.transferenvelope.TransferEnvelope;
import eu.stratosphere.nephele.taskmanager.transferenvelope.TransferEnvelopeDispatcher;
import java.io.IOException;
import java.util.ArrayDeque;
import java.util.Iterator;
import java.util.Queue;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

/* loaded from: input_file:eu/stratosphere/nephele/taskmanager/runtime/RuntimeInputChannelContext.class */
final class RuntimeInputChannelContext implements InputChannelContext, ByteBufferedInputChannelBroker {
    private static final Log LOG = LogFactory.getLog(RuntimeInputChannelContext.class);
    private final RuntimeInputGateContext inputGateContext;
    private final AbstractByteBufferedInputChannel<?> byteBufferedInputChannel;
    private final TransferEnvelopeDispatcher transferEnvelopeDispatcher;
    private Iterator<AbstractEvent> pendingEvents;
    private final Queue<TransferEnvelope> queuedEnvelopes = new ArrayDeque();
    private int lastReceivedEnvelope = -1;
    private boolean destroyCalled = false;

    /* JADX INFO: Access modifiers changed from: package-private */
    public RuntimeInputChannelContext(RuntimeInputGateContext runtimeInputGateContext, TransferEnvelopeDispatcher transferEnvelopeDispatcher, AbstractByteBufferedInputChannel<?> abstractByteBufferedInputChannel) {
        this.inputGateContext = runtimeInputGateContext;
        this.transferEnvelopeDispatcher = transferEnvelopeDispatcher;
        this.byteBufferedInputChannel = abstractByteBufferedInputChannel;
        this.byteBufferedInputChannel.setInputChannelBroker(this);
    }

    @Override // eu.stratosphere.nephele.io.channels.bytebuffered.ByteBufferedInputChannelBroker
    public BufferOrEvent getNextBufferOrEvent() throws IOException {
        if (this.pendingEvents != null) {
            BufferOrEvent bufferOrEvent = new BufferOrEvent(this.pendingEvents.next());
            if (!this.pendingEvents.hasNext()) {
                this.pendingEvents = null;
            }
            return bufferOrEvent;
        }
        synchronized (this.queuedEnvelopes) {
            if (this.queuedEnvelopes.isEmpty()) {
                return null;
            }
            TransferEnvelope poll = this.queuedEnvelopes.poll();
            if (poll.getEventList() != null) {
                Iterator<AbstractEvent> it = poll.getEventList().iterator();
                if (it.hasNext()) {
                    this.pendingEvents = it;
                }
            }
            if (poll.getBuffer() != null) {
                return new BufferOrEvent(poll.getBuffer());
            }
            if (this.pendingEvents == null) {
                throw new IOException("Received an envelope with neither data nor events.");
            }
            BufferOrEvent bufferOrEvent2 = new BufferOrEvent(this.pendingEvents.next());
            if (!this.pendingEvents.hasNext()) {
                this.pendingEvents = null;
            }
            return bufferOrEvent2;
        }
    }

    @Override // eu.stratosphere.nephele.io.channels.bytebuffered.ByteBufferedInputChannelBroker
    public void transferEventToOutputChannel(AbstractEvent abstractEvent) throws IOException, InterruptedException {
        TransferEnvelope transferEnvelope = new TransferEnvelope(0, getJobID(), getChannelID());
        transferEnvelope.addEvent(abstractEvent);
        this.transferEnvelopeDispatcher.processEnvelopeFromInputChannel(transferEnvelope);
    }

    @Override // eu.stratosphere.nephele.taskmanager.bytebuffered.ChannelContext
    public void queueTransferEnvelope(TransferEnvelope transferEnvelope) {
        if (ReceiverNotFoundEvent.isReceiverNotFoundEvent(transferEnvelope)) {
            return;
        }
        int sequenceNumber = transferEnvelope.getSequenceNumber();
        synchronized (this.queuedEnvelopes) {
            if (this.destroyCalled) {
                Buffer buffer = transferEnvelope.getBuffer();
                if (buffer != null) {
                    buffer.recycleBuffer();
                }
                return;
            }
            int i = this.lastReceivedEnvelope + 1;
            if (sequenceNumber != i) {
                this.byteBufferedInputChannel.reportIOException(new IOException("Expected data packet " + i + " but received " + sequenceNumber));
                this.byteBufferedInputChannel.notifyGateThatInputIsAvailable();
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Input channel " + getChannelName() + " expected envelope " + i + " but received " + sequenceNumber);
                }
                Buffer buffer2 = transferEnvelope.getBuffer();
                if (buffer2 != null) {
                    buffer2.recycleBuffer();
                }
            } else {
                this.queuedEnvelopes.add(transferEnvelope);
                this.lastReceivedEnvelope = sequenceNumber;
                if (transferEnvelope.getBuffer() != null) {
                    this.byteBufferedInputChannel.notifyGateThatInputIsAvailable();
                }
                if (transferEnvelope.getEventList() != null) {
                    for (int i2 = 0; i2 < transferEnvelope.getEventList().size(); i2++) {
                        this.byteBufferedInputChannel.notifyGateThatInputIsAvailable();
                    }
                }
            }
        }
    }

    @Override // eu.stratosphere.nephele.taskmanager.bytebuffered.ChannelContext
    public ChannelID getChannelID() {
        return this.byteBufferedInputChannel.getID();
    }

    @Override // eu.stratosphere.nephele.taskmanager.bytebuffered.ChannelContext
    public ChannelID getConnectedChannelID() {
        return this.byteBufferedInputChannel.getConnectedChannelID();
    }

    @Override // eu.stratosphere.nephele.taskmanager.bytebuffered.ChannelContext
    public JobID getJobID() {
        return this.byteBufferedInputChannel.getJobID();
    }

    @Override // eu.stratosphere.nephele.taskmanager.bytebuffered.ChannelContext
    public boolean isInputChannel() {
        return this.byteBufferedInputChannel.isInputChannel();
    }

    @Override // eu.stratosphere.nephele.taskmanager.bytebuffered.ChannelContext
    public void destroy() {
        ArrayDeque arrayDeque = new ArrayDeque();
        synchronized (this.queuedEnvelopes) {
            this.destroyCalled = true;
            while (!this.queuedEnvelopes.isEmpty()) {
                TransferEnvelope poll = this.queuedEnvelopes.poll();
                if (poll.getBuffer() != null) {
                    arrayDeque.add(poll.getBuffer());
                }
            }
        }
        while (!arrayDeque.isEmpty()) {
            ((Buffer) arrayDeque.poll()).recycleBuffer();
        }
    }

    @Override // eu.stratosphere.nephele.taskmanager.bytebuffered.InputChannelContext
    public void logQueuedEnvelopes() {
        int i = 0;
        int i2 = 0;
        int i3 = 0;
        synchronized (this.queuedEnvelopes) {
            Iterator<TransferEnvelope> it = this.queuedEnvelopes.iterator();
            while (it.hasNext()) {
                i++;
                Buffer buffer = it.next().getBuffer();
                if (buffer != null) {
                    if (buffer.isBackedByMemory()) {
                        i2++;
                    } else {
                        i3++;
                    }
                }
            }
        }
        System.out.println("\t\t" + getChannelName() + ": " + i + " (" + i2 + ", " + i3 + ")");
    }

    @Override // eu.stratosphere.nephele.taskmanager.bufferprovider.BufferProvider
    public Buffer requestEmptyBuffer(int i) throws IOException {
        return this.inputGateContext.requestEmptyBuffer(i);
    }

    @Override // eu.stratosphere.nephele.taskmanager.bufferprovider.BufferProvider
    public Buffer requestEmptyBufferBlocking(int i) throws IOException, InterruptedException {
        return this.inputGateContext.requestEmptyBufferBlocking(i);
    }

    @Override // eu.stratosphere.nephele.taskmanager.bufferprovider.BufferProvider
    public int getMaximumBufferSize() {
        return this.inputGateContext.getMaximumBufferSize();
    }

    @Override // eu.stratosphere.nephele.taskmanager.bufferprovider.BufferProvider
    public boolean isShared() {
        return this.inputGateContext.isShared();
    }

    @Override // eu.stratosphere.nephele.taskmanager.bufferprovider.BufferProvider
    public void reportAsynchronousEvent() {
        this.inputGateContext.reportAsynchronousEvent();
    }

    @Override // eu.stratosphere.nephele.taskmanager.bytebuffered.ChannelContext
    public ChannelType getType() {
        return this.byteBufferedInputChannel.getType();
    }

    private String getChannelName() {
        return this.inputGateContext.getTaskName() + " (" + this.byteBufferedInputChannel.getChannelIndex() + ", " + this.byteBufferedInputChannel.getID() + ')';
    }

    @Override // eu.stratosphere.nephele.taskmanager.bufferprovider.BufferProvider
    public boolean registerBufferAvailabilityListener(BufferAvailabilityListener bufferAvailabilityListener) {
        return this.inputGateContext.registerBufferAvailabilityListener(bufferAvailabilityListener);
    }
}
