package eu.stratosphere.nephele.io;

import eu.stratosphere.core.io.IOReadableWritable;
import eu.stratosphere.nephele.event.task.AbstractEvent;
import eu.stratosphere.nephele.io.channels.AbstractOutputChannel;
import eu.stratosphere.nephele.io.channels.ChannelID;
import eu.stratosphere.nephele.io.channels.ChannelType;
import eu.stratosphere.nephele.io.channels.bytebuffered.InMemoryOutputChannel;
import eu.stratosphere.nephele.io.channels.bytebuffered.NetworkOutputChannel;
import eu.stratosphere.nephele.jobgraph.JobID;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

/* loaded from: input_file:eu/stratosphere/nephele/io/RuntimeOutputGate.class */
public class RuntimeOutputGate<T extends IOReadableWritable> extends AbstractGate<T> implements OutputGate<T> {
    private static final Log LOG = LogFactory.getLog(OutputGate.class);
    private final ArrayList<AbstractOutputChannel<T>> outputChannels;
    private final ChannelSelector<T> channelSelector;
    private final Class<T> type;
    private final boolean isBroadcast;

    public RuntimeOutputGate(JobID jobID, GateID gateID, Class<T> cls, int i, ChannelSelector<T> channelSelector, boolean z) {
        super(jobID, gateID, i);
        this.outputChannels = new ArrayList<>();
        this.isBroadcast = z;
        this.type = cls;
        if (this.isBroadcast) {
            this.channelSelector = null;
        } else if (channelSelector == null) {
            this.channelSelector = new DefaultChannelSelector();
        } else {
            this.channelSelector = channelSelector;
        }
    }

    @Override // eu.stratosphere.nephele.io.OutputGate
    public final Class<T> getType() {
        return this.type;
    }

    private void addOutputChannel(AbstractOutputChannel<T> abstractOutputChannel) {
        if (this.outputChannels.contains(abstractOutputChannel)) {
            return;
        }
        this.outputChannels.add(abstractOutputChannel);
    }

    public void removeOutputChannel(ChannelID channelID) {
        for (int i = 0; i < this.outputChannels.size(); i++) {
            if (this.outputChannels.get(i).getID().equals(channelID)) {
                this.outputChannels.remove(i);
                return;
            }
        }
        LOG.debug("Cannot find output channel with ID " + channelID + " to remove");
    }

    @Override // eu.stratosphere.nephele.io.OutputGate
    public void removeAllOutputChannels() {
        this.outputChannels.clear();
    }

    @Override // eu.stratosphere.nephele.io.Gate
    public boolean isInputGate() {
        return false;
    }

    @Override // eu.stratosphere.nephele.io.OutputGate
    public int getNumberOfOutputChannels() {
        return this.outputChannels.size();
    }

    @Override // eu.stratosphere.nephele.io.OutputGate
    public AbstractOutputChannel<T> getOutputChannel(int i) {
        if (i < this.outputChannels.size()) {
            return this.outputChannels.get(i);
        }
        return null;
    }

    @Override // eu.stratosphere.nephele.io.OutputGate
    public NetworkOutputChannel<T> createNetworkOutputChannel(OutputGate<T> outputGate, ChannelID channelID, ChannelID channelID2) {
        NetworkOutputChannel<T> networkOutputChannel = new NetworkOutputChannel<>(outputGate, this.outputChannels.size(), channelID, channelID2);
        addOutputChannel(networkOutputChannel);
        return networkOutputChannel;
    }

    @Override // eu.stratosphere.nephele.io.OutputGate
    public InMemoryOutputChannel<T> createInMemoryOutputChannel(OutputGate<T> outputGate, ChannelID channelID, ChannelID channelID2) {
        InMemoryOutputChannel<T> inMemoryOutputChannel = new InMemoryOutputChannel<>(outputGate, this.outputChannels.size(), channelID, channelID2);
        addOutputChannel(inMemoryOutputChannel);
        return inMemoryOutputChannel;
    }

    @Override // eu.stratosphere.nephele.io.OutputGate
    public void requestClose() throws IOException, InterruptedException {
        for (int i = 0; i < getNumberOfOutputChannels(); i++) {
            getOutputChannel(i).requestClose();
        }
    }

    @Override // eu.stratosphere.nephele.io.Gate
    public boolean isClosed() throws IOException, InterruptedException {
        boolean z = true;
        for (int i = 0; i < getNumberOfOutputChannels(); i++) {
            if (!getOutputChannel(i).isClosed()) {
                z = false;
            }
        }
        return z;
    }

    @Override // eu.stratosphere.nephele.io.OutputGate
    public void writeRecord(T t) throws IOException, InterruptedException {
        if (this.isBroadcast) {
            if (getChannelType() != ChannelType.INMEMORY) {
                this.outputChannels.get(0).writeRecord(t);
                return;
            }
            int size = this.outputChannels.size();
            for (int i = 0; i < size; i++) {
                this.outputChannels.get(i).writeRecord(t);
            }
            return;
        }
        int size2 = this.outputChannels.size();
        int[] selectChannels = this.channelSelector.selectChannels(t, size2);
        if (selectChannels == null) {
            return;
        }
        for (int i2 = 0; i2 < selectChannels.length; i2++) {
            if (selectChannels[i2] < size2) {
                this.outputChannels.get(selectChannels[i2]).writeRecord(t);
            }
        }
    }

    @Override // eu.stratosphere.nephele.io.OutputGate
    public List<AbstractOutputChannel<T>> getOutputChannels() {
        return this.outputChannels;
    }

    @Override // eu.stratosphere.nephele.io.AbstractGate
    public String toString() {
        return "Output " + super.toString();
    }

    @Override // eu.stratosphere.nephele.io.Gate
    public void publishEvent(AbstractEvent abstractEvent) throws IOException, InterruptedException {
        Iterator<AbstractOutputChannel<T>> it = this.outputChannels.iterator();
        while (it.hasNext()) {
            it.next().transferEvent(abstractEvent);
        }
    }

    @Override // eu.stratosphere.nephele.io.OutputGate
    public void flush() throws IOException, InterruptedException {
        Iterator<AbstractOutputChannel<T>> it = this.outputChannels.iterator();
        while (it.hasNext()) {
            it.next().flush();
        }
    }

    @Override // eu.stratosphere.nephele.io.OutputGate
    public boolean isBroadcast() {
        return this.isBroadcast;
    }

    @Override // eu.stratosphere.nephele.io.OutputGate
    public ChannelSelector<T> getChannelSelector() {
        return this.channelSelector;
    }

    @Override // eu.stratosphere.nephele.io.Gate
    public void releaseAllChannelResources() {
        Iterator<AbstractOutputChannel<T>> it = this.outputChannels.iterator();
        while (it.hasNext()) {
            it.next().releaseAllResources();
        }
    }
}
