package eu.stratosphere.nephele.io;

import eu.stratosphere.core.io.IOReadableWritable;
import eu.stratosphere.nephele.event.task.AbstractEvent;
import eu.stratosphere.nephele.event.task.AbstractTaskEvent;
import eu.stratosphere.nephele.io.channels.AbstractInputChannel;
import eu.stratosphere.nephele.io.channels.ChannelID;
import eu.stratosphere.nephele.io.channels.bytebuffered.InMemoryInputChannel;
import eu.stratosphere.nephele.io.channels.bytebuffered.NetworkInputChannel;
import eu.stratosphere.nephele.jobgraph.JobID;
import eu.stratosphere.nephele.profiling.ProfilingUtils;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

/* loaded from: input_file:eu/stratosphere/nephele/io/RuntimeInputGate.class */
public class RuntimeInputGate<T extends IOReadableWritable> extends AbstractGate<T> implements InputGate<T> {
    private static final Log LOG = LogFactory.getLog(InputGate.class);
    private final RecordDeserializerFactory<T> deserializerFactory;
    private final ArrayList<AbstractInputChannel<T>> inputChannels;
    private final BlockingQueue<Integer> availableChannels;
    private final AtomicReference<RecordAvailabilityListener<T>> recordAvailabilityListener;
    private AbstractTaskEvent currentEvent;
    private boolean isClosed;
    private int channelToReadFrom;

    /* renamed from: eu.stratosphere.nephele.io.RuntimeInputGate$1, reason: invalid class name */
    /* loaded from: input_file:eu/stratosphere/nephele/io/RuntimeInputGate$1.class */
    static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$eu$stratosphere$nephele$io$InputChannelResult = new int[InputChannelResult.values().length];

        static {
            try {
                $SwitchMap$eu$stratosphere$nephele$io$InputChannelResult[InputChannelResult.INTERMEDIATE_RECORD_FROM_BUFFER.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$eu$stratosphere$nephele$io$InputChannelResult[InputChannelResult.LAST_RECORD_FROM_BUFFER.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$eu$stratosphere$nephele$io$InputChannelResult[InputChannelResult.END_OF_SUPERSTEP.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
            try {
                $SwitchMap$eu$stratosphere$nephele$io$InputChannelResult[InputChannelResult.TASK_EVENT.ordinal()] = 4;
            } catch (NoSuchFieldError e4) {
            }
            try {
                $SwitchMap$eu$stratosphere$nephele$io$InputChannelResult[InputChannelResult.NONE.ordinal()] = 5;
            } catch (NoSuchFieldError e5) {
            }
            try {
                $SwitchMap$eu$stratosphere$nephele$io$InputChannelResult[InputChannelResult.END_OF_STREAM.ordinal()] = 6;
            } catch (NoSuchFieldError e6) {
            }
        }
    }

    public RuntimeInputGate(JobID jobID, GateID gateID, RecordDeserializerFactory<T> recordDeserializerFactory, int i) {
        super(jobID, gateID, i);
        this.inputChannels = new ArrayList<>();
        this.availableChannels = new LinkedBlockingQueue();
        this.recordAvailabilityListener = new AtomicReference<>(null);
        this.isClosed = false;
        this.channelToReadFrom = -1;
        this.deserializerFactory = recordDeserializerFactory;
    }

    private void addInputChannel(AbstractInputChannel<T> abstractInputChannel) {
        if (this.inputChannels.contains(abstractInputChannel)) {
            return;
        }
        this.inputChannels.add(abstractInputChannel);
    }

    public void removeInputChannel(ChannelID channelID) {
        for (int i = 0; i < this.inputChannels.size(); i++) {
            if (this.inputChannels.get(i).getID().equals(channelID)) {
                this.inputChannels.remove(i);
                return;
            }
        }
        if (LOG.isDebugEnabled()) {
            LOG.debug("Cannot find output channel with ID " + channelID + " to remove");
        }
    }

    @Override // eu.stratosphere.nephele.io.Gate
    public boolean isInputGate() {
        return true;
    }

    @Override // eu.stratosphere.nephele.io.InputGate
    public int getNumberOfInputChannels() {
        return this.inputChannels.size();
    }

    @Override // eu.stratosphere.nephele.io.InputGate
    public AbstractInputChannel<T> getInputChannel(int i) {
        return this.inputChannels.get(i);
    }

    @Override // eu.stratosphere.nephele.io.InputGate
    public NetworkInputChannel<T> createNetworkInputChannel(InputGate<T> inputGate, ChannelID channelID, ChannelID channelID2) {
        NetworkInputChannel<T> networkInputChannel = new NetworkInputChannel<>(inputGate, this.inputChannels.size(), this.deserializerFactory.createDeserializer(), channelID, channelID2);
        addInputChannel(networkInputChannel);
        return networkInputChannel;
    }

    @Override // eu.stratosphere.nephele.io.InputGate
    public InMemoryInputChannel<T> createInMemoryInputChannel(InputGate<T> inputGate, ChannelID channelID, ChannelID channelID2) {
        InMemoryInputChannel<T> inMemoryInputChannel = new InMemoryInputChannel<>(inputGate, this.inputChannels.size(), this.deserializerFactory.createDeserializer(), channelID, channelID2);
        addInputChannel(inMemoryInputChannel);
        return inMemoryInputChannel;
    }

    @Override // eu.stratosphere.nephele.io.InputGate
    public InputChannelResult readRecord(T t) throws IOException, InterruptedException {
        if (this.channelToReadFrom == -1) {
            if (isClosed()) {
                return InputChannelResult.END_OF_STREAM;
            }
            if (Thread.interrupted()) {
                throw new InterruptedException();
            }
            this.channelToReadFrom = waitForAnyChannelToBecomeAvailable();
        }
        switch (AnonymousClass1.$SwitchMap$eu$stratosphere$nephele$io$InputChannelResult[getInputChannel(this.channelToReadFrom).readRecord(t).ordinal()]) {
            case 1:
                return InputChannelResult.INTERMEDIATE_RECORD_FROM_BUFFER;
            case ProfilingUtils.DEFAULT_TASKMANAGER_REPORTINTERVAL /* 2 */:
                this.channelToReadFrom = -1;
                return InputChannelResult.LAST_RECORD_FROM_BUFFER;
            case 3:
                this.channelToReadFrom = -1;
                return InputChannelResult.END_OF_SUPERSTEP;
            case 4:
                this.currentEvent = getInputChannel(this.channelToReadFrom).getCurrentEvent();
                this.channelToReadFrom = -1;
                return InputChannelResult.TASK_EVENT;
            case 5:
                this.channelToReadFrom = -1;
                return InputChannelResult.NONE;
            case 6:
                this.channelToReadFrom = -1;
                return isClosed() ? InputChannelResult.END_OF_STREAM : InputChannelResult.NONE;
            default:
                throw new RuntimeException();
        }
    }

    @Override // eu.stratosphere.nephele.io.InputGate
    public AbstractTaskEvent getCurrentEvent() {
        AbstractTaskEvent abstractTaskEvent = this.currentEvent;
        this.currentEvent = null;
        return abstractTaskEvent;
    }

    @Override // eu.stratosphere.nephele.io.InputGate
    public void notifyRecordIsAvailable(int i) {
        this.availableChannels.add(Integer.valueOf(i));
        RecordAvailabilityListener<T> recordAvailabilityListener = this.recordAvailabilityListener.get();
        if (recordAvailabilityListener != null) {
            recordAvailabilityListener.reportRecordAvailability(this);
        }
    }

    public int waitForAnyChannelToBecomeAvailable() throws InterruptedException {
        return this.availableChannels.take().intValue();
    }

    @Override // eu.stratosphere.nephele.io.Gate
    public boolean isClosed() throws IOException, InterruptedException {
        if (this.isClosed) {
            return true;
        }
        for (int i = 0; i < getNumberOfInputChannels(); i++) {
            if (!this.inputChannels.get(i).isClosed()) {
                return false;
            }
        }
        this.isClosed = true;
        return true;
    }

    @Override // eu.stratosphere.nephele.io.InputGate
    public void close() throws IOException, InterruptedException {
        for (int i = 0; i < getNumberOfInputChannels(); i++) {
            this.inputChannels.get(i).close();
        }
    }

    @Override // eu.stratosphere.nephele.io.AbstractGate
    public String toString() {
        return "Input " + super.toString();
    }

    @Override // eu.stratosphere.nephele.io.Gate
    public void publishEvent(AbstractEvent abstractEvent) throws IOException, InterruptedException {
        Iterator<AbstractInputChannel<T>> it = this.inputChannels.iterator();
        while (it.hasNext()) {
            it.next().transferEvent(abstractEvent);
        }
    }

    public RecordDeserializerFactory<T> getRecordDeserializerFactory() {
        return this.deserializerFactory;
    }

    @Override // eu.stratosphere.nephele.io.Gate
    public void releaseAllChannelResources() {
        Iterator<AbstractInputChannel<T>> it = this.inputChannels.iterator();
        while (it.hasNext()) {
            it.next().releaseAllResources();
        }
    }

    @Override // eu.stratosphere.nephele.io.InputGate
    public void registerRecordAvailabilityListener(RecordAvailabilityListener<T> recordAvailabilityListener) {
        if (!this.recordAvailabilityListener.compareAndSet(null, recordAvailabilityListener)) {
            throw new IllegalStateException(this.recordAvailabilityListener + " is already registered as a record availability listener");
        }
    }

    @Override // eu.stratosphere.nephele.io.InputGate
    public void notifyDataUnitConsumed(int i) {
        this.channelToReadFrom = -1;
    }
}
