package eu.stratosphere.pact.runtime.iterative.task;

import eu.stratosphere.api.common.aggregators.Aggregator;
import eu.stratosphere.api.common.aggregators.LongSumAggregator;
import eu.stratosphere.api.common.functions.Function;
import eu.stratosphere.api.common.functions.IterationRuntimeContext;
import eu.stratosphere.api.common.typeutils.TypeSerializer;
import eu.stratosphere.api.common.typeutils.TypeSerializerFactory;
import eu.stratosphere.nephele.execution.Environment;
import eu.stratosphere.pact.runtime.iterative.concurrent.BlockingBackChannel;
import eu.stratosphere.pact.runtime.iterative.concurrent.BlockingBackChannelBroker;
import eu.stratosphere.pact.runtime.iterative.concurrent.IterationAggregatorBroker;
import eu.stratosphere.pact.runtime.iterative.concurrent.SolutionSetBroker;
import eu.stratosphere.pact.runtime.iterative.convergence.WorksetEmptyConvergenceCriterion;
import eu.stratosphere.pact.runtime.iterative.io.SolutionSetUpdateOutputCollector;
import eu.stratosphere.pact.runtime.iterative.io.WorksetUpdateOutputCollector;
import eu.stratosphere.pact.runtime.task.PactDriver;
import eu.stratosphere.pact.runtime.task.RegularPactTask;
import eu.stratosphere.pact.runtime.task.ResettablePactDriver;
import eu.stratosphere.pact.runtime.task.util.TaskConfig;
import eu.stratosphere.pact.runtime.udf.RuntimeUDFContext;
import eu.stratosphere.runtime.io.api.MutableReader;
import eu.stratosphere.types.Value;
import eu.stratosphere.util.Collector;
import eu.stratosphere.util.InstantiationUtil;
import eu.stratosphere.util.MutableObjectIterator;
import java.io.IOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

/* loaded from: input_file:eu/stratosphere/pact/runtime/iterative/task/AbstractIterativePactTask.class */
public abstract class AbstractIterativePactTask<S extends Function, OT> extends RegularPactTask<S, OT> implements Terminable {
    private static final Log log = LogFactory.getLog(AbstractIterativePactTask.class);
    protected LongSumAggregator worksetAggregator;
    protected BlockingBackChannel worksetBackChannel;
    protected boolean isWorksetIteration;
    protected boolean isWorksetUpdate;
    protected boolean isSolutionSetUpdate;
    private RuntimeAggregatorRegistry iterationAggregators;
    private String brokerKey;
    private int superstepNum = 1;
    private volatile boolean terminationRequested;

    /* loaded from: input_file:eu/stratosphere/pact/runtime/iterative/task/AbstractIterativePactTask$IterativeRuntimeUdfContext.class */
    private class IterativeRuntimeUdfContext extends RuntimeUDFContext implements IterationRuntimeContext {
        public IterativeRuntimeUdfContext(String str, int i, int i2) {
            super(str, i, i2);
        }

        public int getSuperstepNumber() {
            return AbstractIterativePactTask.this.superstepNum;
        }

        public <T extends Aggregator<?>> T getIterationAggregator(String str) {
            return (T) AbstractIterativePactTask.this.getIterationAggregators().getAggregator(str);
        }

        public <T extends Value> T getPreviousIterationAggregate(String str) {
            return (T) AbstractIterativePactTask.this.getIterationAggregators().getPreviousGlobalAggregate(str);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // eu.stratosphere.pact.runtime.task.RegularPactTask
    public void initialize() throws Exception {
        super.initialize();
        if (this.driver instanceof ResettablePactDriver) {
            ResettablePactDriver resettablePactDriver = (ResettablePactDriver) this.driver;
            for (int i = 0; i < resettablePactDriver.getNumberOfInputs(); i++) {
                if (resettablePactDriver.isInputResettable(i)) {
                    excludeFromReset(i);
                }
            }
        }
        TaskConfig lastTasksConfig = getLastTasksConfig();
        this.isWorksetIteration = lastTasksConfig.getIsWorksetIteration();
        this.isWorksetUpdate = lastTasksConfig.getIsWorksetUpdate();
        this.isSolutionSetUpdate = lastTasksConfig.getIsSolutionSetUpdate();
        if (this.isWorksetUpdate) {
            this.worksetBackChannel = BlockingBackChannelBroker.instance().getAndRemove(brokerKey());
            if (this.isWorksetIteration) {
                this.worksetAggregator = getIterationAggregators().getAggregator(WorksetEmptyConvergenceCriterion.AGGREGATOR_NAME);
                if (this.worksetAggregator == null) {
                    throw new RuntimeException("Missing workset elements count aggregator.");
                }
            }
        }
    }

    @Override // eu.stratosphere.pact.runtime.task.RegularPactTask
    public void run() throws Exception {
        if (!inFirstIteration()) {
            reinstantiateDriver();
            resetAllInputs();
            for (int i : this.iterativeBroadcastInputs) {
                readAndSetBroadcastInput(i, getTaskConfig().getBroadcastInputName(i), this.runtimeUdfContext);
            }
        } else if (this.driver instanceof ResettablePactDriver) {
            ((ResettablePactDriver) this.driver).initialize();
        }
        super.run();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // eu.stratosphere.pact.runtime.task.RegularPactTask
    public void closeLocalStrategiesAndCaches() {
        try {
            super.closeLocalStrategiesAndCaches();
            if (this.driver instanceof ResettablePactDriver) {
                try {
                    ((ResettablePactDriver) this.driver).teardown();
                } catch (Throwable th) {
                    log.error("Error while shutting down an iterative operator.", th);
                }
            }
        } catch (Throwable th2) {
            if (this.driver instanceof ResettablePactDriver) {
                try {
                    ((ResettablePactDriver) this.driver).teardown();
                } catch (Throwable th3) {
                    log.error("Error while shutting down an iterative operator.", th3);
                }
            }
            throw th2;
        }
    }

    @Override // eu.stratosphere.pact.runtime.task.RegularPactTask
    public RuntimeUDFContext createRuntimeContext(String str) {
        Environment environment = getEnvironment();
        return new IterativeRuntimeUdfContext(str, environment.getCurrentNumberOfSubtasks(), environment.getIndexInSubtaskGroup());
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public boolean inFirstIteration() {
        return this.superstepNum == 1;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public int currentIteration() {
        return this.superstepNum;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void incrementIterationCounter() {
        this.superstepNum++;
    }

    public String brokerKey() {
        if (this.brokerKey == null) {
            this.brokerKey = getEnvironment().getJobID().toString() + '#' + this.config.getIterationId() + '#' + getEnvironment().getIndexInSubtaskGroup();
        }
        return this.brokerKey;
    }

    private void reinstantiateDriver() throws Exception {
        if (this.driver instanceof ResettablePactDriver) {
            ((ResettablePactDriver) this.driver).reset();
            return;
        }
        this.driver = (PactDriver) InstantiationUtil.instantiate(this.config.getDriver(), PactDriver.class);
        try {
            this.driver.setup(this);
        } catch (Throwable th) {
            throw new Exception("The pact driver setup for '" + getEnvironment().getTaskName() + "' , caused an error: " + th.getMessage(), th);
        }
    }

    public RuntimeAggregatorRegistry getIterationAggregators() {
        if (this.iterationAggregators == null) {
            this.iterationAggregators = IterationAggregatorBroker.instance().get(brokerKey());
        }
        return this.iterationAggregators;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void checkForTerminationAndResetEndOfSuperstepState() throws IOException {
        Object next;
        if (this.iterativeInputs.length == 0 && this.iterativeBroadcastInputs.length == 0) {
            throw new IllegalStateException();
        }
        boolean z = false;
        boolean z2 = true;
        for (int i : this.iterativeInputs) {
            MutableReader<?> mutableReader = this.inputReaders[i];
            if (mutableReader.isInputClosed()) {
                z = true;
            } else if (mutableReader.hasReachedEndOfSuperstep()) {
                z2 = false;
                mutableReader.startNextSuperstep();
            } else {
                MutableObjectIterator<?> mutableObjectIterator = this.inputIterators[i];
                Object createInstance = this.inputSerializers[i].getSerializer().createInstance();
                do {
                    next = mutableObjectIterator.next(createInstance);
                    createInstance = next;
                } while (next != null);
                if (mutableReader.isInputClosed()) {
                    z = true;
                } else {
                    z2 = false;
                    mutableReader.startNextSuperstep();
                }
            }
        }
        for (int i2 : this.iterativeBroadcastInputs) {
            MutableReader<?> mutableReader2 = this.broadcastInputReaders[i2];
            if (mutableReader2.isInputClosed()) {
                z = true;
            } else {
                if (!mutableReader2.hasReachedEndOfSuperstep()) {
                    throw new IllegalStateException("An iterative broadcast input has not been fully consumed.");
                }
                z2 = false;
                mutableReader2.startNextSuperstep();
            }
        }
        if (z2 != z) {
            throw new IllegalStateException("Inconsistent state: Iteration termination received on some, but not all inputs.");
        }
        if (z2) {
            requestTermination();
        }
    }

    @Override // eu.stratosphere.pact.runtime.iterative.task.Terminable
    public boolean terminationRequested() {
        return this.terminationRequested;
    }

    @Override // eu.stratosphere.pact.runtime.iterative.task.Terminable
    public void requestTermination() {
        this.terminationRequested = true;
    }

    @Override // eu.stratosphere.pact.runtime.task.RegularPactTask, eu.stratosphere.nephele.template.AbstractInvokable
    public void cancel() throws Exception {
        requestTermination();
        super.cancel();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Collector<OT> createWorksetUpdateOutputCollector(Collector<OT> collector) {
        return new WorksetUpdateOutputCollector(this.worksetBackChannel.getWriteEnd(), getOutputSerializer(), collector);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Collector<OT> createWorksetUpdateOutputCollector() {
        return createWorksetUpdateOutputCollector(null);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Collector<OT> createSolutionSetUpdateOutputCollector(Collector<OT> collector) {
        return new SolutionSetUpdateOutputCollector(SolutionSetBroker.instance().get(brokerKey()), getOutputSerializer(), collector);
    }

    private TypeSerializer<OT> getOutputSerializer() {
        TypeSerializerFactory outputSerializer = getLastTasksConfig().getOutputSerializer(this.userCodeClassLoader);
        if (outputSerializer == null) {
            throw new RuntimeException("Missing output serializer for workset update.");
        }
        return outputSerializer.getSerializer();
    }
}
