package eu.stratosphere.runtime.io.channels;

import eu.stratosphere.core.io.IOReadableWritable;
import eu.stratosphere.nephele.event.task.AbstractEvent;
import eu.stratosphere.nephele.event.task.AbstractTaskEvent;
import eu.stratosphere.nephele.jobgraph.JobID;
import eu.stratosphere.runtime.io.Buffer;
import eu.stratosphere.runtime.io.gates.InputChannelResult;
import eu.stratosphere.runtime.io.gates.InputGate;
import eu.stratosphere.runtime.io.network.Envelope;
import eu.stratosphere.runtime.io.network.bufferprovider.BufferAvailabilityListener;
import eu.stratosphere.runtime.io.network.bufferprovider.BufferProvider;
import eu.stratosphere.runtime.io.serialization.AdaptiveSpanningRecordDeserializer;
import eu.stratosphere.runtime.io.serialization.RecordDeserializer;
import java.io.IOException;
import java.util.ArrayDeque;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Queue;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

/* loaded from: input_file:eu/stratosphere/runtime/io/channels/InputChannel.class */
public class InputChannel<T extends IOReadableWritable> extends Channel implements BufferProvider {
    private final InputGate<T> inputGate;
    private static final Log LOG = LogFactory.getLog(InputChannel.class);
    private final RecordDeserializer<T> deserializer;
    private Buffer dataBuffer;
    private AbstractTaskEvent currentEvent;
    private volatile IOException ioException;
    private long amountOfDataTransmitted;
    private volatile boolean brokerAggreedToCloseChannel;
    private int lastReceivedEnvelope;
    private ChannelID lastSourceID;
    private boolean destroyCalled;
    private Queue<Envelope> queuedEnvelopes;
    private Iterator<AbstractEvent> pendingEvents;
    private RecordDeserializer.DeserializationResult lastDeserializationResult;

    public InputChannel(InputGate<T> inputGate, int i, ChannelID channelID, ChannelID channelID2, ChannelType channelType) {
        super(i, channelID, channelID2, channelType);
        this.lastReceivedEnvelope = -1;
        this.lastSourceID = null;
        this.destroyCalled = false;
        this.queuedEnvelopes = new ArrayDeque();
        this.inputGate = inputGate;
        this.deserializer = new AdaptiveSpanningRecordDeserializer();
    }

    public InputGate<T> getInputGate() {
        return this.inputGate;
    }

    @Override // eu.stratosphere.runtime.io.channels.Channel
    public boolean isInputChannel() {
        return true;
    }

    @Override // eu.stratosphere.runtime.io.channels.Channel
    public JobID getJobID() {
        return this.inputGate.getJobID();
    }

    public InputChannelResult readRecord(T t) throws IOException {
        if (this.dataBuffer == null) {
            if (isClosed()) {
                return InputChannelResult.END_OF_STREAM;
            }
            BufferOrEvent nextBufferOrEvent = getNextBufferOrEvent();
            if (nextBufferOrEvent == null) {
                throw new IllegalStateException("Input channel was queries for data even though none was announced available.");
            }
            if (nextBufferOrEvent.isEvent()) {
                if (this.deserializer.hasUnfinishedData()) {
                    throw new IllegalStateException("Channel received an event before completing the current partial record.");
                }
                AbstractEvent event = nextBufferOrEvent.getEvent();
                if (event.getClass() == ChannelCloseEvent.class) {
                    this.brokerAggreedToCloseChannel = true;
                    return InputChannelResult.END_OF_STREAM;
                }
                if (event.getClass() == EndOfSuperstepEvent.class) {
                    return InputChannelResult.END_OF_SUPERSTEP;
                }
                if (event instanceof AbstractTaskEvent) {
                    this.currentEvent = (AbstractTaskEvent) event;
                    return InputChannelResult.TASK_EVENT;
                }
                LOG.error("Received unknown event: " + event);
                return InputChannelResult.NONE;
            }
            this.dataBuffer = nextBufferOrEvent.getBuffer();
            this.deserializer.setNextMemorySegment(this.dataBuffer.getMemorySegment(), this.dataBuffer.size());
        }
        RecordDeserializer.DeserializationResult nextRecord = this.deserializer.getNextRecord(t);
        this.lastDeserializationResult = nextRecord;
        if (nextRecord.isBufferConsumed()) {
            releasedConsumedReadBuffer(this.dataBuffer);
            this.dataBuffer = null;
        }
        if (nextRecord == RecordDeserializer.DeserializationResult.INTERMEDIATE_RECORD_FROM_BUFFER) {
            return InputChannelResult.INTERMEDIATE_RECORD_FROM_BUFFER;
        }
        if (nextRecord == RecordDeserializer.DeserializationResult.LAST_RECORD_FROM_BUFFER) {
            return InputChannelResult.LAST_RECORD_FROM_BUFFER;
        }
        if (nextRecord == RecordDeserializer.DeserializationResult.PARTIAL_RECORD) {
            return InputChannelResult.NONE;
        }
        throw new IllegalStateException();
    }

    @Override // eu.stratosphere.runtime.io.channels.Channel
    public ChannelType getChannelType() {
        return null;
    }

    @Override // eu.stratosphere.runtime.io.channels.Channel
    public boolean isClosed() throws IOException {
        if (this.ioException != null) {
            throw new IOException("An error occurred in the channel: " + this.ioException.getMessage(), this.ioException);
        }
        return this.brokerAggreedToCloseChannel;
    }

    public void close() throws IOException, InterruptedException {
        this.deserializer.clear();
        if (this.dataBuffer != null) {
            releasedConsumedReadBuffer(this.dataBuffer);
            this.dataBuffer = null;
        }
        while (!this.brokerAggreedToCloseChannel) {
            BufferOrEvent nextBufferOrEvent = getNextBufferOrEvent();
            if (nextBufferOrEvent == null) {
                Thread.sleep(200L);
            } else if (!nextBufferOrEvent.isEvent()) {
                releasedConsumedReadBuffer(nextBufferOrEvent.getBuffer());
            } else if (nextBufferOrEvent.getEvent() instanceof ChannelCloseEvent) {
                this.brokerAggreedToCloseChannel = true;
            }
        }
        transferEventToOutputChannel(new ChannelCloseEvent());
    }

    private void releasedConsumedReadBuffer(Buffer buffer) {
        this.amountOfDataTransmitted += buffer.size();
        buffer.recycleBuffer();
    }

    public void notifyGateThatInputIsAvailable() {
        getInputGate().notifyRecordIsAvailable(getIndex());
    }

    @Override // eu.stratosphere.runtime.io.channels.Channel
    public void transferEvent(AbstractEvent abstractEvent) throws IOException, InterruptedException {
        transferEventToOutputChannel(abstractEvent);
    }

    public void reportIOException(IOException iOException) {
        this.ioException = iOException;
    }

    @Override // eu.stratosphere.runtime.io.channels.Channel
    public void releaseAllResources() {
        this.brokerAggreedToCloseChannel = true;
        this.deserializer.clear();
    }

    public void notifyDataUnitConsumed() {
        getInputGate().notifyDataUnitConsumed(getIndex());
    }

    public AbstractTaskEvent getCurrentEvent() {
        AbstractTaskEvent abstractTaskEvent = this.currentEvent;
        this.currentEvent = null;
        return abstractTaskEvent;
    }

    @Override // eu.stratosphere.runtime.io.channels.Channel
    public void queueEnvelope(Envelope envelope) {
        int sequenceNumber = envelope.getSequenceNumber();
        synchronized (this.queuedEnvelopes) {
            if (this.destroyCalled) {
                Buffer buffer = envelope.getBuffer();
                if (buffer != null) {
                    buffer.recycleBuffer();
                }
                return;
            }
            int i = this.lastReceivedEnvelope + 1;
            if (sequenceNumber != i) {
                reportIOException(new IOException("Expected data packet " + i + " but received " + sequenceNumber));
                notifyGateThatInputIsAvailable();
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Input channel " + toString() + " expected envelope " + i + " but received " + sequenceNumber);
                }
                Buffer buffer2 = envelope.getBuffer();
                if (buffer2 != null) {
                    buffer2.recycleBuffer();
                }
            } else {
                this.queuedEnvelopes.add(envelope);
                this.lastReceivedEnvelope = sequenceNumber;
                this.lastSourceID = envelope.getSource();
                if (envelope.getBuffer() != null) {
                    notifyGateThatInputIsAvailable();
                }
                List<? extends AbstractEvent> deserializeEvents = envelope.deserializeEvents();
                if (deserializeEvents != null) {
                    for (int i2 = 0; i2 < deserializeEvents.size(); i2++) {
                        notifyGateThatInputIsAvailable();
                    }
                }
            }
        }
    }

    @Override // eu.stratosphere.runtime.io.channels.Channel
    public void destroy() {
        ArrayDeque arrayDeque = new ArrayDeque();
        synchronized (this.queuedEnvelopes) {
            this.destroyCalled = true;
            while (!this.queuedEnvelopes.isEmpty()) {
                Envelope poll = this.queuedEnvelopes.poll();
                if (poll.getBuffer() != null) {
                    arrayDeque.add(poll.getBuffer());
                }
            }
        }
        while (!arrayDeque.isEmpty()) {
            ((Buffer) arrayDeque.poll()).recycleBuffer();
        }
    }

    public void logQueuedEnvelopes() {
        int i = 0;
        int i2 = 0;
        synchronized (this.queuedEnvelopes) {
            Iterator<Envelope> it = this.queuedEnvelopes.iterator();
            while (it.hasNext()) {
                i++;
                if (it.next().getBuffer() != null) {
                    i2++;
                }
            }
        }
        System.out.println("\t\t" + toString() + ": " + i + " (" + i2 + ", 0)");
    }

    @Override // eu.stratosphere.runtime.io.network.bufferprovider.BufferProvider
    public Buffer requestBuffer(int i) throws IOException {
        return this.inputGate.requestBuffer(i);
    }

    @Override // eu.stratosphere.runtime.io.network.bufferprovider.BufferProvider
    public Buffer requestBufferBlocking(int i) throws IOException, InterruptedException {
        return this.inputGate.requestBufferBlocking(i);
    }

    @Override // eu.stratosphere.runtime.io.network.bufferprovider.BufferProvider
    public int getBufferSize() {
        return this.inputGate.getBufferSize();
    }

    @Override // eu.stratosphere.runtime.io.network.bufferprovider.BufferProvider, eu.stratosphere.runtime.io.network.bufferprovider.LocalBufferPoolOwner
    public void reportAsynchronousEvent() {
        this.inputGate.reportAsynchronousEvent();
    }

    @Override // eu.stratosphere.runtime.io.network.bufferprovider.BufferProvider
    public BufferProvider.BufferAvailabilityRegistration registerBufferAvailabilityListener(BufferAvailabilityListener bufferAvailabilityListener) {
        return this.inputGate.registerBufferAvailabilityListener(bufferAvailabilityListener);
    }

    public BufferOrEvent getNextBufferOrEvent() throws IOException {
        if (this.pendingEvents != null) {
            BufferOrEvent bufferOrEvent = new BufferOrEvent(this.pendingEvents.next());
            if (!this.pendingEvents.hasNext()) {
                this.pendingEvents = null;
            }
            return bufferOrEvent;
        }
        synchronized (this.queuedEnvelopes) {
            if (this.queuedEnvelopes.isEmpty()) {
                return null;
            }
            Envelope poll = this.queuedEnvelopes.poll();
            Iterator it = poll.deserializeEvents().iterator();
            if (it.hasNext()) {
                this.pendingEvents = it;
            }
            if (poll.getBuffer() != null) {
                return new BufferOrEvent(poll.getBuffer());
            }
            if (this.pendingEvents == null) {
                throw new IOException("Received an envelope with neither data nor events.");
            }
            BufferOrEvent bufferOrEvent2 = new BufferOrEvent(this.pendingEvents.next());
            if (!this.pendingEvents.hasNext()) {
                this.pendingEvents = null;
            }
            return bufferOrEvent2;
        }
    }

    public void transferEventToOutputChannel(AbstractEvent abstractEvent) throws IOException, InterruptedException {
        Envelope envelope = new Envelope(0, getJobID(), getID());
        envelope.serializeEventList(Arrays.asList(abstractEvent));
        this.envelopeDispatcher.dispatchFromInputChannel(envelope);
    }
}
