package eu.stratosphere.nephele.taskmanager.runtime;

import eu.stratosphere.nephele.event.task.AbstractEvent;
import eu.stratosphere.nephele.io.channels.ChannelID;
import eu.stratosphere.nephele.taskmanager.bytebuffered.AbstractOutputChannelForwarder;
import eu.stratosphere.nephele.taskmanager.bytebuffered.UnexpectedEnvelopeEvent;
import eu.stratosphere.nephele.taskmanager.transferenvelope.TransferEnvelope;
import java.io.IOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

/* loaded from: input_file:eu/stratosphere/nephele/taskmanager/runtime/ForwardingBarrier.class */
public final class ForwardingBarrier extends AbstractOutputChannelForwarder {
    private static final Log LOG = LogFactory.getLog(ForwardingBarrier.class);
    private final ChannelID outputChannelID;
    private int forwardingBarrier;

    public ForwardingBarrier(ChannelID channelID, AbstractOutputChannelForwarder abstractOutputChannelForwarder) {
        super(abstractOutputChannelForwarder);
        this.forwardingBarrier = -1;
        if (abstractOutputChannelForwarder == null) {
            throw new IllegalArgumentException("Argument next must not be null");
        }
        this.outputChannelID = channelID;
    }

    @Override // eu.stratosphere.nephele.taskmanager.bytebuffered.AbstractOutputChannelForwarder
    public void push(TransferEnvelope transferEnvelope) throws IOException, InterruptedException {
        if (transferEnvelope.getSequenceNumber() < this.forwardingBarrier) {
            recycleTransferEnvelope(transferEnvelope);
        } else {
            getNext().push(transferEnvelope);
        }
    }

    @Override // eu.stratosphere.nephele.taskmanager.bytebuffered.AbstractOutputChannelForwarder
    public void processEvent(AbstractEvent abstractEvent) {
        if (abstractEvent instanceof UnexpectedEnvelopeEvent) {
            UnexpectedEnvelopeEvent unexpectedEnvelopeEvent = (UnexpectedEnvelopeEvent) abstractEvent;
            if (unexpectedEnvelopeEvent.getExpectedSequenceNumber() > this.forwardingBarrier) {
                this.forwardingBarrier = unexpectedEnvelopeEvent.getExpectedSequenceNumber();
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Setting forwarding barrier to sequence number " + this.forwardingBarrier + " for output channel " + this.outputChannelID);
                }
            }
        }
        getNext().processEvent(abstractEvent);
    }
}
