package eu.stratosphere.nephele.io.channels.bytebuffered;

import eu.stratosphere.core.io.IOReadableWritable;
import eu.stratosphere.nephele.event.task.AbstractEvent;
import eu.stratosphere.nephele.event.task.AbstractTaskEvent;
import eu.stratosphere.nephele.io.InputChannelResult;
import eu.stratosphere.nephele.io.InputGate;
import eu.stratosphere.nephele.io.RecordDeserializer;
import eu.stratosphere.nephele.io.channels.AbstractInputChannel;
import eu.stratosphere.nephele.io.channels.Buffer;
import eu.stratosphere.nephele.io.channels.ChannelID;
import java.io.IOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

/* loaded from: input_file:eu/stratosphere/nephele/io/channels/bytebuffered/AbstractByteBufferedInputChannel.class */
public abstract class AbstractByteBufferedInputChannel<T extends IOReadableWritable> extends AbstractInputChannel<T> {
    private static final Log LOG = LogFactory.getLog(AbstractByteBufferedInputChannel.class);
    private final RecordDeserializer<T> deserializer;
    private Buffer dataBuffer;
    private ByteBufferedInputChannelBroker inputChannelBroker;
    private AbstractTaskEvent currentEvent;
    private volatile IOException ioException;
    private long amountOfDataTransmitted;
    private volatile boolean brokerAggreedToCloseChannel;

    public AbstractByteBufferedInputChannel(InputGate<T> inputGate, int i, RecordDeserializer<T> recordDeserializer, ChannelID channelID, ChannelID channelID2) {
        super(inputGate, i, channelID, channelID2);
        this.deserializer = recordDeserializer;
    }

    @Override // eu.stratosphere.nephele.io.channels.AbstractInputChannel
    public InputChannelResult readRecord(T t) throws IOException {
        if (this.dataBuffer == null) {
            if (isClosed()) {
                return InputChannelResult.END_OF_STREAM;
            }
            BufferOrEvent nextBufferOrEvent = this.inputChannelBroker.getNextBufferOrEvent();
            if (nextBufferOrEvent == null) {
                throw new IllegalStateException("Input channel was queries for data even though none was announced available.");
            }
            if (nextBufferOrEvent.isEvent()) {
                if (this.deserializer.hasUnfinishedData()) {
                    throw new IOException("Channel received an event before completing the current partial record.");
                }
                AbstractEvent event = nextBufferOrEvent.getEvent();
                if (event.getClass() == ByteBufferedChannelCloseEvent.class) {
                    this.brokerAggreedToCloseChannel = true;
                    return InputChannelResult.END_OF_STREAM;
                }
                if (event.getClass() == EndOfSuperstepEvent.class) {
                    return InputChannelResult.END_OF_SUPERSTEP;
                }
                if (event instanceof AbstractTaskEvent) {
                    this.currentEvent = (AbstractTaskEvent) event;
                    return InputChannelResult.TASK_EVENT;
                }
                LOG.error("Received unknown event: " + event);
                return InputChannelResult.NONE;
            }
            this.dataBuffer = nextBufferOrEvent.getBuffer();
        }
        T readData = this.deserializer.readData(t, this.dataBuffer);
        if (this.dataBuffer.remaining() != 0) {
            return readData == null ? InputChannelResult.NONE : InputChannelResult.INTERMEDIATE_RECORD_FROM_BUFFER;
        }
        releasedConsumedReadBuffer(this.dataBuffer);
        this.dataBuffer = null;
        return readData == null ? InputChannelResult.NONE : InputChannelResult.LAST_RECORD_FROM_BUFFER;
    }

    @Override // eu.stratosphere.nephele.io.channels.AbstractChannel
    public boolean isClosed() throws IOException {
        if (this.ioException != null) {
            throw new IOException("An error occurred in the channel: " + this.ioException.getMessage(), this.ioException);
        }
        return this.brokerAggreedToCloseChannel;
    }

    @Override // eu.stratosphere.nephele.io.channels.AbstractInputChannel
    public void close() throws IOException, InterruptedException {
        this.deserializer.clear();
        if (this.dataBuffer != null) {
            releasedConsumedReadBuffer(this.dataBuffer);
            this.dataBuffer = null;
        }
        while (!this.brokerAggreedToCloseChannel) {
            BufferOrEvent nextBufferOrEvent = this.inputChannelBroker.getNextBufferOrEvent();
            if (nextBufferOrEvent == null) {
                Thread.sleep(200L);
            } else if (!nextBufferOrEvent.isEvent()) {
                releasedConsumedReadBuffer(nextBufferOrEvent.getBuffer());
            } else if (nextBufferOrEvent.getEvent() instanceof ByteBufferedChannelCloseEvent) {
                this.brokerAggreedToCloseChannel = true;
            }
        }
        transferEvent(new ByteBufferedChannelCloseEvent());
    }

    private void releasedConsumedReadBuffer(Buffer buffer) {
        this.amountOfDataTransmitted += buffer.size();
        buffer.recycleBuffer();
    }

    public void setInputChannelBroker(ByteBufferedInputChannelBroker byteBufferedInputChannelBroker) {
        this.inputChannelBroker = byteBufferedInputChannelBroker;
    }

    public void notifyGateThatInputIsAvailable() {
        getInputGate().notifyRecordIsAvailable(getChannelIndex());
    }

    @Override // eu.stratosphere.nephele.io.channels.AbstractChannel
    public void transferEvent(AbstractEvent abstractEvent) throws IOException, InterruptedException {
        this.inputChannelBroker.transferEventToOutputChannel(abstractEvent);
    }

    public void reportIOException(IOException iOException) {
        this.ioException = iOException;
    }

    @Override // eu.stratosphere.nephele.io.channels.AbstractChannel
    public void releaseAllResources() {
        this.brokerAggreedToCloseChannel = true;
        this.deserializer.clear();
    }

    @Override // eu.stratosphere.nephele.io.channels.AbstractChannel
    public long getAmountOfDataTransmitted() {
        return this.amountOfDataTransmitted;
    }

    public void notifyDataUnitConsumed() {
        getInputGate().notifyDataUnitConsumed(getChannelIndex());
    }

    @Override // eu.stratosphere.nephele.io.channels.AbstractInputChannel
    public AbstractTaskEvent getCurrentEvent() {
        AbstractTaskEvent abstractTaskEvent = this.currentEvent;
        this.currentEvent = null;
        return abstractTaskEvent;
    }
}
