package eu.stratosphere.nephele.execution;

import eu.stratosphere.configuration.Configuration;
import eu.stratosphere.core.fs.Path;
import eu.stratosphere.core.io.IOReadableWritable;
import eu.stratosphere.nephele.deployment.TaskDeploymentDescriptor;
import eu.stratosphere.nephele.execution.librarycache.LibraryCacheManager;
import eu.stratosphere.nephele.jobgraph.JobID;
import eu.stratosphere.nephele.protocols.AccumulatorProtocol;
import eu.stratosphere.nephele.services.iomanager.IOManager;
import eu.stratosphere.nephele.services.memorymanager.MemoryManager;
import eu.stratosphere.nephele.template.AbstractInvokable;
import eu.stratosphere.nephele.template.InputSplitProvider;
import eu.stratosphere.runtime.io.Buffer;
import eu.stratosphere.runtime.io.channels.ChannelID;
import eu.stratosphere.runtime.io.channels.OutputChannel;
import eu.stratosphere.runtime.io.gates.GateID;
import eu.stratosphere.runtime.io.gates.InputGate;
import eu.stratosphere.runtime.io.gates.OutputGate;
import eu.stratosphere.runtime.io.network.bufferprovider.BufferAvailabilityListener;
import eu.stratosphere.runtime.io.network.bufferprovider.BufferProvider;
import eu.stratosphere.runtime.io.network.bufferprovider.GlobalBufferPool;
import eu.stratosphere.runtime.io.network.bufferprovider.LocalBufferPool;
import eu.stratosphere.runtime.io.network.bufferprovider.LocalBufferPoolOwner;
import eu.stratosphere.util.StringUtils;
import java.io.IOException;
import java.util.ArrayDeque;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.FutureTask;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

/* loaded from: input_file:eu/stratosphere/nephele/execution/RuntimeEnvironment.class */
public class RuntimeEnvironment implements Environment, BufferProvider, LocalBufferPoolOwner, Runnable {
    private static final Log LOG = LogFactory.getLog(RuntimeEnvironment.class);
    private static final int SLEEPINTERVAL = 100;
    private final List<OutputGate> outputGates;
    private final List<InputGate<? extends IOReadableWritable>> inputGates;
    private final Queue<GateID> unboundInputGateIDs;
    private final MemoryManager memoryManager;
    private final IOManager ioManager;
    private final Class<? extends AbstractInvokable> invokableClass;
    private final AbstractInvokable invokable;
    private volatile Thread executingThread;
    private final JobID jobID;
    private final Configuration jobConfiguration;
    private final Configuration taskConfiguration;
    private final InputSplitProvider inputSplitProvider;
    private volatile ExecutionObserver executionObserver;
    private AccumulatorProtocol accumulatorProtocolProxy;
    private final int indexInSubtaskGroup;
    private final int currentNumberOfSubtasks;
    private final String taskName;
    private LocalBufferPool outputBufferPool;
    private Map<String, FutureTask<Path>> cacheCopyTasks;

    public RuntimeEnvironment(JobID jobID, String str, Class<? extends AbstractInvokable> cls, Configuration configuration, Configuration configuration2) throws Exception {
        this.outputGates = new CopyOnWriteArrayList();
        this.inputGates = new CopyOnWriteArrayList();
        this.unboundInputGateIDs = new ArrayDeque();
        this.executingThread = null;
        this.executionObserver = null;
        this.accumulatorProtocolProxy = null;
        this.cacheCopyTasks = new HashMap();
        this.jobID = jobID;
        this.taskName = str;
        this.invokableClass = cls;
        this.taskConfiguration = configuration;
        this.jobConfiguration = configuration2;
        this.indexInSubtaskGroup = 0;
        this.currentNumberOfSubtasks = 0;
        this.memoryManager = null;
        this.ioManager = null;
        this.inputSplitProvider = null;
        this.invokable = this.invokableClass.newInstance();
        this.invokable.setEnvironment(this);
        this.invokable.registerInputOutput();
    }

    public RuntimeEnvironment(TaskDeploymentDescriptor taskDeploymentDescriptor, MemoryManager memoryManager, IOManager iOManager, InputSplitProvider inputSplitProvider, AccumulatorProtocol accumulatorProtocol, Map<String, FutureTask<Path>> map) throws Exception {
        this.outputGates = new CopyOnWriteArrayList();
        this.inputGates = new CopyOnWriteArrayList();
        this.unboundInputGateIDs = new ArrayDeque();
        this.executingThread = null;
        this.executionObserver = null;
        this.accumulatorProtocolProxy = null;
        this.cacheCopyTasks = new HashMap();
        this.jobID = taskDeploymentDescriptor.getJobID();
        this.taskName = taskDeploymentDescriptor.getTaskName();
        this.invokableClass = taskDeploymentDescriptor.getInvokableClass();
        this.jobConfiguration = taskDeploymentDescriptor.getJobConfiguration();
        this.taskConfiguration = taskDeploymentDescriptor.getTaskConfiguration();
        this.indexInSubtaskGroup = taskDeploymentDescriptor.getIndexInSubtaskGroup();
        this.currentNumberOfSubtasks = taskDeploymentDescriptor.getCurrentNumberOfSubtasks();
        this.memoryManager = memoryManager;
        this.ioManager = iOManager;
        this.inputSplitProvider = inputSplitProvider;
        this.accumulatorProtocolProxy = accumulatorProtocol;
        this.cacheCopyTasks = map;
        this.invokable = this.invokableClass.newInstance();
        this.invokable.setEnvironment(this);
        this.invokable.registerInputOutput();
        int numberOfOutputGateDescriptors = taskDeploymentDescriptor.getNumberOfOutputGateDescriptors();
        for (int i = 0; i < numberOfOutputGateDescriptors; i++) {
            this.outputGates.get(i).initializeChannels(taskDeploymentDescriptor.getOutputGateDescriptor(i));
        }
        int numberOfInputGateDescriptors = taskDeploymentDescriptor.getNumberOfInputGateDescriptors();
        for (int i2 = 0; i2 < numberOfInputGateDescriptors; i2++) {
            this.inputGates.get(i2).initializeChannels(taskDeploymentDescriptor.getInputGateDescriptor(i2));
        }
    }

    public AbstractInvokable getInvokable() {
        return this.invokable;
    }

    @Override // eu.stratosphere.nephele.execution.Environment
    public JobID getJobID() {
        return this.jobID;
    }

    @Override // eu.stratosphere.nephele.execution.Environment
    public GateID getNextUnboundInputGateID() {
        return this.unboundInputGateIDs.poll();
    }

    @Override // eu.stratosphere.nephele.execution.Environment
    public OutputGate createAndRegisterOutputGate() {
        OutputGate outputGate = new OutputGate(getJobID(), new GateID(), getNumberOfOutputGates());
        this.outputGates.add(outputGate);
        return outputGate;
    }

    @Override // java.lang.Runnable
    public void run() {
        if (this.invokable == null) {
            LOG.fatal("ExecutionEnvironment has no Invokable set");
        }
        changeExecutionState(ExecutionState.RUNNING, null);
        if (this.executionObserver.isCanceled()) {
            changeExecutionState(ExecutionState.CANCELED, null);
            return;
        }
        try {
            Thread.currentThread().setContextClassLoader(LibraryCacheManager.getClassLoader(this.jobID));
            this.invokable.invoke();
            if (this.executionObserver.isCanceled()) {
                throw new InterruptedException();
            }
            changeExecutionState(ExecutionState.FINISHING, null);
            try {
                closeInputGates();
                requestAllOutputGatesToClose();
                waitForInputChannelsToBeClosed();
                waitForOutputChannelsToBeClosed();
                releaseAllChannelResources();
                changeExecutionState(ExecutionState.FINISHED, null);
            } catch (Throwable th) {
                releaseAllChannelResources();
                if (this.executionObserver.isCanceled() || (th instanceof CancelTaskException)) {
                    changeExecutionState(ExecutionState.CANCELED, null);
                } else {
                    changeExecutionState(ExecutionState.FAILED, StringUtils.stringifyException(th));
                }
            }
        } catch (Throwable th2) {
            if (!this.executionObserver.isCanceled()) {
                try {
                    this.invokable.cancel();
                } catch (Throwable th3) {
                    LOG.error(StringUtils.stringifyException(th3));
                }
            }
            releaseAllChannelResources();
            if (this.executionObserver.isCanceled() || (th2 instanceof CancelTaskException)) {
                changeExecutionState(ExecutionState.CANCELED, null);
            } else {
                changeExecutionState(ExecutionState.FAILED, StringUtils.stringifyException(th2));
            }
        }
    }

    @Override // eu.stratosphere.nephele.execution.Environment
    public <T extends IOReadableWritable> InputGate<T> createAndRegisterInputGate() {
        InputGate<T> inputGate = new InputGate<>(getJobID(), new GateID(), getNumberOfInputGates());
        this.inputGates.add(inputGate);
        return inputGate;
    }

    @Override // eu.stratosphere.nephele.execution.Environment
    public int getNumberOfOutputGates() {
        return this.outputGates.size();
    }

    @Override // eu.stratosphere.nephele.execution.Environment
    public int getNumberOfInputGates() {
        return this.inputGates.size();
    }

    @Override // eu.stratosphere.nephele.execution.Environment
    public int getNumberOfOutputChannels() {
        int i = 0;
        for (int i2 = 0; i2 < this.outputGates.size(); i2++) {
            i += this.outputGates.get(i2).getNumChannels();
        }
        return i;
    }

    @Override // eu.stratosphere.nephele.execution.Environment
    public int getNumberOfInputChannels() {
        int i = 0;
        for (int i2 = 0; i2 < this.inputGates.size(); i2++) {
            i += this.inputGates.get(i2).getNumberOfInputChannels();
        }
        return i;
    }

    public InputGate<? extends IOReadableWritable> getInputGate(int i) {
        if (i < this.inputGates.size()) {
            return this.inputGates.get(i);
        }
        return null;
    }

    public OutputGate getOutputGate(int i) {
        if (i < this.outputGates.size()) {
            return this.outputGates.get(i);
        }
        return null;
    }

    public Thread getExecutingThread() {
        Thread thread;
        synchronized (this) {
            if (this.executingThread == null) {
                if (this.taskName == null) {
                    this.executingThread = new Thread(this);
                } else {
                    this.executingThread = new Thread(this, getTaskNameWithIndex());
                }
            }
            thread = this.executingThread;
        }
        return thread;
    }

    private void waitForOutputChannelsToBeClosed() throws InterruptedException {
        if (this.executionObserver.isCanceled()) {
            return;
        }
        Iterator<OutputGate> it = this.outputGates.iterator();
        while (it.hasNext()) {
            it.next().waitForGateToBeClosed();
        }
    }

    private void waitForInputChannelsToBeClosed() throws IOException, InterruptedException {
        while (!this.executionObserver.isCanceled()) {
            boolean z = true;
            for (int i = 0; i < getNumberOfInputGates(); i++) {
                if (!this.inputGates.get(i).isClosed()) {
                    z = false;
                }
            }
            if (z) {
                return;
            } else {
                Thread.sleep(100L);
            }
        }
        throw new InterruptedException();
    }

    private void closeInputGates() throws IOException, InterruptedException {
        for (int i = 0; i < this.inputGates.size(); i++) {
            this.inputGates.get(i).close();
        }
    }

    private void requestAllOutputGatesToClose() throws IOException, InterruptedException {
        for (int i = 0; i < this.outputGates.size(); i++) {
            this.outputGates.get(i).requestClose();
        }
    }

    @Override // eu.stratosphere.nephele.execution.Environment
    public IOManager getIOManager() {
        return this.ioManager;
    }

    @Override // eu.stratosphere.nephele.execution.Environment
    public MemoryManager getMemoryManager() {
        return this.memoryManager;
    }

    @Override // eu.stratosphere.nephele.execution.Environment
    public Configuration getTaskConfiguration() {
        return this.taskConfiguration;
    }

    @Override // eu.stratosphere.nephele.execution.Environment
    public Configuration getJobConfiguration() {
        return this.jobConfiguration;
    }

    @Override // eu.stratosphere.nephele.execution.Environment
    public int getCurrentNumberOfSubtasks() {
        return this.currentNumberOfSubtasks;
    }

    @Override // eu.stratosphere.nephele.execution.Environment
    public int getIndexInSubtaskGroup() {
        return this.indexInSubtaskGroup;
    }

    private void changeExecutionState(ExecutionState executionState, String str) {
        if (this.executionObserver != null) {
            this.executionObserver.executionStateChanged(executionState, str);
        }
    }

    @Override // eu.stratosphere.nephele.execution.Environment
    public String getTaskName() {
        return this.taskName;
    }

    public String getTaskNameWithIndex() {
        return String.format("%s (%d/%d)", this.taskName, Integer.valueOf(getIndexInSubtaskGroup() + 1), Integer.valueOf(getCurrentNumberOfSubtasks()));
    }

    public void setExecutionObserver(ExecutionObserver executionObserver) {
        this.executionObserver = executionObserver;
    }

    @Override // eu.stratosphere.nephele.execution.Environment
    public InputSplitProvider getInputSplitProvider() {
        return this.inputSplitProvider;
    }

    @Override // eu.stratosphere.nephele.execution.Environment
    public void userThreadStarted(Thread thread) {
        if (this.executionObserver != null) {
            this.executionObserver.userThreadStarted(thread);
        }
    }

    @Override // eu.stratosphere.nephele.execution.Environment
    public void userThreadFinished(Thread thread) {
        if (this.executionObserver != null) {
            this.executionObserver.userThreadFinished(thread);
        }
    }

    private void releaseAllChannelResources() {
        for (int i = 0; i < this.inputGates.size(); i++) {
            this.inputGates.get(i).releaseAllChannelResources();
        }
        for (int i2 = 0; i2 < this.outputGates.size(); i2++) {
            this.outputGates.get(i2).releaseAllChannelResources();
        }
    }

    @Override // eu.stratosphere.nephele.execution.Environment
    public Set<ChannelID> getOutputChannelIDs() {
        HashSet hashSet = new HashSet();
        Iterator<OutputGate> it = this.outputGates.iterator();
        while (it.hasNext()) {
            for (OutputChannel outputChannel : it.next().channels()) {
                hashSet.add(outputChannel.getID());
            }
        }
        return Collections.unmodifiableSet(hashSet);
    }

    @Override // eu.stratosphere.nephele.execution.Environment
    public Set<ChannelID> getInputChannelIDs() {
        HashSet hashSet = new HashSet();
        for (InputGate<? extends IOReadableWritable> inputGate : this.inputGates) {
            for (int i = 0; i < inputGate.getNumberOfInputChannels(); i++) {
                hashSet.add(inputGate.getInputChannel(i).getID());
            }
        }
        return Collections.unmodifiableSet(hashSet);
    }

    @Override // eu.stratosphere.nephele.execution.Environment
    public Set<GateID> getInputGateIDs() {
        HashSet hashSet = new HashSet();
        Iterator<InputGate<? extends IOReadableWritable>> it = this.inputGates.iterator();
        while (it.hasNext()) {
            hashSet.add(it.next().getGateID());
        }
        return Collections.unmodifiableSet(hashSet);
    }

    @Override // eu.stratosphere.nephele.execution.Environment
    public Set<GateID> getOutputGateIDs() {
        HashSet hashSet = new HashSet();
        Iterator<OutputGate> it = this.outputGates.iterator();
        while (it.hasNext()) {
            hashSet.add(it.next().getGateID());
        }
        return Collections.unmodifiableSet(hashSet);
    }

    @Override // eu.stratosphere.nephele.execution.Environment
    public Set<ChannelID> getOutputChannelIDsOfGate(GateID gateID) {
        OutputGate outputGate = null;
        Iterator<OutputGate> it = this.outputGates.iterator();
        while (true) {
            if (!it.hasNext()) {
                break;
            }
            OutputGate next = it.next();
            if (next.getGateID().equals(gateID)) {
                outputGate = next;
                break;
            }
        }
        if (outputGate == null) {
            throw new IllegalArgumentException("Cannot find output gate with ID " + gateID);
        }
        HashSet hashSet = new HashSet();
        for (int i = 0; i < outputGate.getNumChannels(); i++) {
            hashSet.add(outputGate.getChannel(i).getID());
        }
        return Collections.unmodifiableSet(hashSet);
    }

    @Override // eu.stratosphere.nephele.execution.Environment
    public Set<ChannelID> getInputChannelIDsOfGate(GateID gateID) {
        InputGate<? extends IOReadableWritable> inputGate = null;
        Iterator<InputGate<? extends IOReadableWritable>> it = this.inputGates.iterator();
        while (true) {
            if (!it.hasNext()) {
                break;
            }
            InputGate<? extends IOReadableWritable> next = it.next();
            if (next.getGateID().equals(gateID)) {
                inputGate = next;
                break;
            }
        }
        if (inputGate == null) {
            throw new IllegalArgumentException("Cannot find input gate with ID " + gateID);
        }
        HashSet hashSet = new HashSet();
        for (int i = 0; i < inputGate.getNumberOfInputChannels(); i++) {
            hashSet.add(inputGate.getInputChannel(i).getID());
        }
        return Collections.unmodifiableSet(hashSet);
    }

    public List<OutputGate> outputGates() {
        return this.outputGates;
    }

    public List<InputGate<? extends IOReadableWritable>> inputGates() {
        return this.inputGates;
    }

    @Override // eu.stratosphere.nephele.execution.Environment
    public AccumulatorProtocol getAccumulatorProtocolProxy() {
        return this.accumulatorProtocolProxy;
    }

    public void addCopyTaskForCacheFile(String str, FutureTask<Path> futureTask) {
        this.cacheCopyTasks.put(str, futureTask);
    }

    @Override // eu.stratosphere.nephele.execution.Environment
    public Map<String, FutureTask<Path>> getCopyTask() {
        return this.cacheCopyTasks;
    }

    @Override // eu.stratosphere.nephele.execution.Environment
    public BufferProvider getOutputBufferProvider() {
        return this;
    }

    @Override // eu.stratosphere.runtime.io.network.bufferprovider.BufferProvider
    public Buffer requestBuffer(int i) throws IOException {
        return this.outputBufferPool.requestBuffer(i);
    }

    @Override // eu.stratosphere.runtime.io.network.bufferprovider.BufferProvider
    public Buffer requestBufferBlocking(int i) throws IOException, InterruptedException {
        return this.outputBufferPool.requestBufferBlocking(i);
    }

    @Override // eu.stratosphere.runtime.io.network.bufferprovider.BufferProvider
    public int getBufferSize() {
        return this.outputBufferPool.getBufferSize();
    }

    @Override // eu.stratosphere.runtime.io.network.bufferprovider.BufferProvider
    public BufferProvider.BufferAvailabilityRegistration registerBufferAvailabilityListener(BufferAvailabilityListener bufferAvailabilityListener) {
        return this.outputBufferPool.registerBufferAvailabilityListener(bufferAvailabilityListener);
    }

    @Override // eu.stratosphere.runtime.io.network.bufferprovider.LocalBufferPoolOwner
    public int getNumberOfChannels() {
        return getNumberOfOutputChannels();
    }

    @Override // eu.stratosphere.runtime.io.network.bufferprovider.LocalBufferPoolOwner
    public void setDesignatedNumberOfBuffers(int i) {
        this.outputBufferPool.setNumDesignatedBuffers(i);
    }

    @Override // eu.stratosphere.runtime.io.network.bufferprovider.LocalBufferPoolOwner
    public void clearLocalBufferPool() {
        this.outputBufferPool.destroy();
    }

    @Override // eu.stratosphere.runtime.io.network.bufferprovider.LocalBufferPoolOwner
    public void registerGlobalBufferPool(GlobalBufferPool globalBufferPool) {
        if (this.outputBufferPool == null) {
            this.outputBufferPool = new LocalBufferPool(globalBufferPool, 1);
        }
    }

    @Override // eu.stratosphere.runtime.io.network.bufferprovider.LocalBufferPoolOwner
    public void logBufferUtilization() {
        LOG.info(String.format("\t%s: %d available, %d requested, %d designated", getTaskNameWithIndex(), Integer.valueOf(this.outputBufferPool.numAvailableBuffers()), Integer.valueOf(this.outputBufferPool.numRequestedBuffers()), Integer.valueOf(this.outputBufferPool.numDesignatedBuffers())));
    }

    @Override // eu.stratosphere.runtime.io.network.bufferprovider.BufferProvider, eu.stratosphere.runtime.io.network.bufferprovider.LocalBufferPoolOwner
    public void reportAsynchronousEvent() {
        this.outputBufferPool.reportAsynchronousEvent();
    }
}
