package eu.stratosphere.nephele.taskmanager.runtime;

import eu.stratosphere.nephele.event.task.AbstractEvent;
import eu.stratosphere.nephele.event.task.AbstractTaskEvent;
import eu.stratosphere.nephele.io.channels.Buffer;
import eu.stratosphere.nephele.io.channels.bytebuffered.AbstractByteBufferedOutputChannel;
import eu.stratosphere.nephele.io.channels.bytebuffered.ByteBufferedChannelCloseEvent;
import eu.stratosphere.nephele.io.channels.bytebuffered.ByteBufferedOutputChannelBroker;
import eu.stratosphere.nephele.taskmanager.bytebuffered.AbstractOutputChannelForwarder;
import eu.stratosphere.nephele.taskmanager.bytebuffered.OutputChannelForwardingChain;
import eu.stratosphere.nephele.taskmanager.bytebuffered.ReceiverNotFoundEvent;
import eu.stratosphere.nephele.taskmanager.transferenvelope.TransferEnvelope;
import java.io.IOException;

/* loaded from: input_file:eu/stratosphere/nephele/taskmanager/runtime/RuntimeOutputChannelBroker.class */
final class RuntimeOutputChannelBroker extends AbstractOutputChannelForwarder implements ByteBufferedOutputChannelBroker {
    private final AbstractByteBufferedOutputChannel<?> byteBufferedOutputChannel;
    private final RuntimeOutputGateContext outputGateContext;
    private OutputChannelForwardingChain forwardingChain;
    private TransferEnvelope outgoingTransferEnvelope;
    private boolean closeAcknowledgmentReceived;
    private int lastSequenceNumberWithReceiverNotFound;
    private int sequenceNumber;

    /* JADX INFO: Access modifiers changed from: package-private */
    public RuntimeOutputChannelBroker(RuntimeOutputGateContext runtimeOutputGateContext, AbstractByteBufferedOutputChannel<?> abstractByteBufferedOutputChannel, AbstractOutputChannelForwarder abstractOutputChannelForwarder) {
        super(abstractOutputChannelForwarder);
        this.outgoingTransferEnvelope = null;
        this.closeAcknowledgmentReceived = false;
        this.lastSequenceNumberWithReceiverNotFound = -1;
        this.sequenceNumber = 0;
        if (abstractOutputChannelForwarder == null) {
            throw new IllegalArgumentException("Argument next must not be null");
        }
        this.outputGateContext = runtimeOutputGateContext;
        this.byteBufferedOutputChannel = abstractByteBufferedOutputChannel;
        this.byteBufferedOutputChannel.setByteBufferedOutputChannelBroker(this);
    }

    public void setForwardingChain(OutputChannelForwardingChain outputChannelForwardingChain) {
        this.forwardingChain = outputChannelForwardingChain;
    }

    @Override // eu.stratosphere.nephele.taskmanager.bytebuffered.AbstractOutputChannelForwarder
    public boolean hasDataLeft() throws IOException, InterruptedException {
        if (this.closeAcknowledgmentReceived || this.lastSequenceNumberWithReceiverNotFound + 1 == this.sequenceNumber) {
            return getNext().hasDataLeft();
        }
        return true;
    }

    @Override // eu.stratosphere.nephele.taskmanager.bytebuffered.AbstractOutputChannelForwarder
    public void processEvent(AbstractEvent abstractEvent) {
        if (abstractEvent instanceof ByteBufferedChannelCloseEvent) {
            this.closeAcknowledgmentReceived = true;
        } else if (abstractEvent instanceof ReceiverNotFoundEvent) {
            this.lastSequenceNumberWithReceiverNotFound = ((ReceiverNotFoundEvent) abstractEvent).getSequenceNumber();
        } else if (abstractEvent instanceof AbstractTaskEvent) {
            throw new IllegalStateException("Received synchronous task event " + abstractEvent);
        }
        getNext().processEvent(abstractEvent);
    }

    @Override // eu.stratosphere.nephele.io.channels.bytebuffered.ByteBufferedOutputChannelBroker
    public Buffer requestEmptyWriteBuffer() throws InterruptedException, IOException {
        if (this.outgoingTransferEnvelope == null) {
            this.outgoingTransferEnvelope = createNewOutgoingTransferEnvelope();
        }
        return this.outputGateContext.requestEmptyBufferBlocking(calculateBufferSize());
    }

    private TransferEnvelope createNewOutgoingTransferEnvelope() {
        int i = this.sequenceNumber;
        this.sequenceNumber = i + 1;
        return new TransferEnvelope(i, this.byteBufferedOutputChannel.getJobID(), this.byteBufferedOutputChannel.getID());
    }

    private int calculateBufferSize() {
        return this.outputGateContext.getMaximumBufferSize();
    }

    @Override // eu.stratosphere.nephele.io.channels.bytebuffered.ByteBufferedOutputChannelBroker
    public void releaseWriteBuffer(Buffer buffer) throws IOException, InterruptedException {
        this.forwardingChain.processQueuedEvents();
        if (this.outgoingTransferEnvelope == null) {
            throw new IllegalStateException("Cannot find transfer envelope for channel with ID " + this.byteBufferedOutputChannel.getID());
        }
        if (this.outgoingTransferEnvelope.getBuffer() != null) {
            throw new IllegalStateException("Channel " + this.byteBufferedOutputChannel.getID() + " has already a buffer attached");
        }
        buffer.flip();
        this.outgoingTransferEnvelope.setBuffer(buffer);
        this.forwardingChain.pushEnvelope(this.outgoingTransferEnvelope);
        this.outgoingTransferEnvelope = null;
    }

    @Override // eu.stratosphere.nephele.io.channels.bytebuffered.ByteBufferedOutputChannelBroker
    public boolean hasDataLeftToTransmit() throws IOException, InterruptedException {
        this.forwardingChain.processQueuedEvents();
        return this.forwardingChain.anyForwarderHasDataLeft();
    }

    @Override // eu.stratosphere.nephele.io.channels.bytebuffered.ByteBufferedOutputChannelBroker
    public void transferEventToInputChannel(AbstractEvent abstractEvent) throws IOException, InterruptedException {
        if (this.outgoingTransferEnvelope != null) {
            this.outgoingTransferEnvelope.addEvent(abstractEvent);
            return;
        }
        TransferEnvelope createNewOutgoingTransferEnvelope = createNewOutgoingTransferEnvelope();
        createNewOutgoingTransferEnvelope.addEvent(abstractEvent);
        this.forwardingChain.pushEnvelope(createNewOutgoingTransferEnvelope);
    }
}
