package eu.stratosphere.nephele.io.channels.bytebuffered;

import eu.stratosphere.core.io.IOReadableWritable;
import eu.stratosphere.nephele.event.task.AbstractEvent;
import eu.stratosphere.nephele.event.task.AbstractTaskEvent;
import eu.stratosphere.nephele.io.OutputGate;
import eu.stratosphere.nephele.io.channels.AbstractOutputChannel;
import eu.stratosphere.nephele.io.channels.Buffer;
import eu.stratosphere.nephele.io.channels.ChannelID;
import eu.stratosphere.nephele.io.channels.ChannelType;
import eu.stratosphere.nephele.io.channels.SerializationBuffer;
import java.io.IOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

/* loaded from: input_file:eu/stratosphere/nephele/io/channels/bytebuffered/AbstractByteBufferedOutputChannel.class */
public abstract class AbstractByteBufferedOutputChannel<T extends IOReadableWritable> extends AbstractOutputChannel<T> {
    private final SerializationBuffer<T> serializationBuffer;
    private Buffer dataBuffer;
    private boolean closeRequested;
    private ByteBufferedOutputChannelBroker outputChannelBroker;
    private long amountOfDataTransmitted;
    private static final Log LOG = LogFactory.getLog(AbstractByteBufferedOutputChannel.class);

    /* JADX INFO: Access modifiers changed from: protected */
    public AbstractByteBufferedOutputChannel(OutputGate<T> outputGate, int i, ChannelID channelID, ChannelID channelID2) {
        super(outputGate, i, channelID, channelID2);
        this.serializationBuffer = new SerializationBuffer<>();
        this.dataBuffer = null;
        this.closeRequested = false;
        this.outputChannelBroker = null;
        this.amountOfDataTransmitted = 0L;
    }

    @Override // eu.stratosphere.nephele.io.channels.AbstractChannel
    public boolean isClosed() throws IOException, InterruptedException {
        return this.closeRequested && this.dataBuffer == null && !this.serializationBuffer.dataLeftFromPreviousSerialization() && !this.outputChannelBroker.hasDataLeftToTransmit();
    }

    @Override // eu.stratosphere.nephele.io.channels.AbstractOutputChannel
    public void requestClose() throws IOException, InterruptedException {
        if (this.closeRequested) {
            return;
        }
        this.closeRequested = true;
        if (this.serializationBuffer.dataLeftFromPreviousSerialization()) {
            flush();
        }
        if (getType() == ChannelType.INMEMORY || !isBroadcastChannel() || getChannelIndex() == 0) {
            transferEvent(new ByteBufferedChannelCloseEvent());
            flush();
        }
    }

    private void requestWriteBufferFromBroker() throws InterruptedException, IOException {
        if (Thread.interrupted()) {
            throw new InterruptedException();
        }
        this.dataBuffer = this.outputChannelBroker.requestEmptyWriteBuffer();
    }

    private void releaseWriteBuffer() throws IOException, InterruptedException {
        this.amountOfDataTransmitted += this.dataBuffer.size();
        this.outputChannelBroker.releaseWriteBuffer(this.dataBuffer);
        this.dataBuffer = null;
    }

    @Override // eu.stratosphere.nephele.io.channels.AbstractOutputChannel
    public void writeRecord(T t) throws IOException, InterruptedException {
        if (this.dataBuffer == null) {
            requestWriteBufferFromBroker();
        }
        if (this.closeRequested) {
            throw new IOException("Channel is aready requested to be closed");
        }
        while (this.serializationBuffer.dataLeftFromPreviousSerialization()) {
            this.serializationBuffer.read(this.dataBuffer);
            if (this.dataBuffer.remaining() == 0) {
                releaseWriteBuffer();
                requestWriteBufferFromBroker();
            }
        }
        if (this.serializationBuffer.dataLeftFromPreviousSerialization()) {
            throw new IOException("Serialization buffer is expected to be empty!");
        }
        this.serializationBuffer.serialize(t);
        this.serializationBuffer.read(this.dataBuffer);
        if (this.dataBuffer.remaining() == 0) {
            releaseWriteBuffer();
        }
    }

    public void setByteBufferedOutputChannelBroker(ByteBufferedOutputChannelBroker byteBufferedOutputChannelBroker) {
        this.outputChannelBroker = byteBufferedOutputChannelBroker;
    }

    public void processEvent(AbstractEvent abstractEvent) {
        if (abstractEvent instanceof AbstractTaskEvent) {
            getOutputGate().deliverEvent((AbstractTaskEvent) abstractEvent);
        } else {
            LOG.error("Channel " + getID() + " received unknown event " + abstractEvent);
        }
    }

    @Override // eu.stratosphere.nephele.io.channels.AbstractChannel
    public void transferEvent(AbstractEvent abstractEvent) throws IOException, InterruptedException {
        flush();
        this.outputChannelBroker.transferEventToInputChannel(abstractEvent);
    }

    @Override // eu.stratosphere.nephele.io.channels.AbstractOutputChannel
    public void flush() throws IOException, InterruptedException {
        while (this.serializationBuffer.dataLeftFromPreviousSerialization()) {
            if (this.dataBuffer == null) {
                try {
                    requestWriteBufferFromBroker();
                } catch (InterruptedException e) {
                    LOG.error(e);
                }
            }
            this.serializationBuffer.read(this.dataBuffer);
            if (this.dataBuffer.remaining() == 0) {
                releaseWriteBuffer();
            }
        }
        if (this.dataBuffer != null) {
            releaseWriteBuffer();
        }
    }

    @Override // eu.stratosphere.nephele.io.channels.AbstractChannel
    public void releaseAllResources() {
        this.closeRequested = true;
        this.serializationBuffer.clear();
        if (this.dataBuffer != null) {
            this.dataBuffer.recycleBuffer();
            this.dataBuffer = null;
        }
    }

    @Override // eu.stratosphere.nephele.io.channels.AbstractChannel
    public long getAmountOfDataTransmitted() {
        return this.amountOfDataTransmitted;
    }
}
