package eu.stratosphere.pact.runtime.iterative.task;

import eu.stratosphere.api.common.functions.Function;
import eu.stratosphere.api.common.typeutils.TypeComparator;
import eu.stratosphere.api.common.typeutils.TypeComparatorFactory;
import eu.stratosphere.api.common.typeutils.TypePairComparator;
import eu.stratosphere.api.common.typeutils.TypePairComparatorFactory;
import eu.stratosphere.api.common.typeutils.TypeSerializer;
import eu.stratosphere.api.common.typeutils.TypeSerializerFactory;
import eu.stratosphere.core.io.IOReadableWritable;
import eu.stratosphere.core.memory.DataInputView;
import eu.stratosphere.core.memory.MemorySegment;
import eu.stratosphere.nephele.io.AbstractRecordWriter;
import eu.stratosphere.nephele.io.RecordWriter;
import eu.stratosphere.pact.runtime.hash.MutableHashTable;
import eu.stratosphere.pact.runtime.io.InputViewIterator;
import eu.stratosphere.pact.runtime.iterative.concurrent.BlockingBackChannel;
import eu.stratosphere.pact.runtime.iterative.concurrent.BlockingBackChannelBroker;
import eu.stratosphere.pact.runtime.iterative.concurrent.IterationAggregatorBroker;
import eu.stratosphere.pact.runtime.iterative.concurrent.SolutionSetBroker;
import eu.stratosphere.pact.runtime.iterative.concurrent.SolutionSetUpdateBarrier;
import eu.stratosphere.pact.runtime.iterative.concurrent.SolutionSetUpdateBarrierBroker;
import eu.stratosphere.pact.runtime.iterative.concurrent.SuperstepBarrier;
import eu.stratosphere.pact.runtime.iterative.event.AllWorkersDoneEvent;
import eu.stratosphere.pact.runtime.iterative.event.TerminationEvent;
import eu.stratosphere.pact.runtime.iterative.event.WorkerDoneEvent;
import eu.stratosphere.pact.runtime.iterative.io.SerializedUpdateBuffer;
import eu.stratosphere.pact.runtime.task.RegularPactTask;
import eu.stratosphere.pact.runtime.task.util.TaskConfig;
import eu.stratosphere.pact.runtime.util.EmptyMutableObjectIterator;
import eu.stratosphere.util.Collector;
import eu.stratosphere.util.MutableObjectIterator;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

/* loaded from: input_file:eu/stratosphere/pact/runtime/iterative/task/IterationHeadPactTask.class */
public class IterationHeadPactTask<X, Y, S extends Function, OT> extends AbstractIterativePactTask<S, OT> {
    private static final Log log = LogFactory.getLog(IterationHeadPactTask.class);
    private Collector<X> finalOutputCollector;
    private List<AbstractRecordWriter<?>> finalOutputWriters;
    private TypeSerializer<Y> feedbackTypeSerializer;
    private TypeSerializer<X> solutionTypeSerializer;
    private RecordWriter<?> toSync;
    private int initialSolutionSetInput;
    private int feedbackDataInput;
    private RuntimeAggregatorRegistry aggregatorRegistry;

    @Override // eu.stratosphere.pact.runtime.task.RegularPactTask
    protected int getNumTaskInputs() {
        return this.driver.getNumberOfInputs() + (this.config.getIsWorksetIteration() ? 1 : 0);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // eu.stratosphere.pact.runtime.task.RegularPactTask
    public void initOutputs() throws Exception {
        super.initOutputs();
        this.finalOutputWriters = new ArrayList();
        TaskConfig iterationHeadFinalOutputConfig = this.config.getIterationHeadFinalOutputConfig();
        this.finalOutputCollector = RegularPactTask.getOutputCollector(this, iterationHeadFinalOutputConfig, this.userCodeClassLoader, this.finalOutputWriters, iterationHeadFinalOutputConfig.getNumOutputs());
        if (this.eventualOutputs.size() + this.finalOutputWriters.size() != this.config.getIterationHeadIndexOfSyncOutput()) {
            throw new Exception("Error: Inconsistent head task setup - wrong mapping of output gates.");
        }
        this.toSync = new RecordWriter<>(this, IOReadableWritable.class);
    }

    private BlockingBackChannel initBackChannel() throws Exception {
        long backChannelMemory = this.config.getBackChannelMemory();
        ArrayList arrayList = new ArrayList();
        int pageSize = getMemoryManager().getPageSize();
        getMemoryManager().allocatePages(this, arrayList, backChannelMemory);
        BlockingBackChannel blockingBackChannel = new BlockingBackChannel(new SerializedUpdateBuffer(arrayList, pageSize, getIOManager()));
        BlockingBackChannelBroker.instance().handIn(brokerKey(), blockingBackChannel);
        return blockingBackChannel;
    }

    private <BT, PT> MutableHashTable<BT, PT> initHashTable() throws Exception {
        long solutionSetMemory = this.config.getSolutionSetMemory();
        TypeSerializerFactory solutionSetSerializer = this.config.getSolutionSetSerializer(this.userCodeClassLoader);
        TypeSerializerFactory solutionSetProberSerializer = this.config.getSolutionSetProberSerializer(this.userCodeClassLoader);
        TypeComparatorFactory solutionSetComparator = this.config.getSolutionSetComparator(this.userCodeClassLoader);
        TypeComparatorFactory solutionSetProberComparator = this.config.getSolutionSetProberComparator(this.userCodeClassLoader);
        TypePairComparatorFactory solutionSetPairComparatorFactory = this.config.getSolutionSetPairComparatorFactory(this.userCodeClassLoader);
        TypeSerializer serializer = solutionSetSerializer.getSerializer();
        TypeSerializer serializer2 = solutionSetProberSerializer.getSerializer();
        TypeComparator createComparator = solutionSetComparator.createComparator();
        TypeComparator createComparator2 = solutionSetProberComparator.createComparator();
        TypePairComparator createComparator21 = solutionSetPairComparatorFactory.createComparator21(createComparator, createComparator2);
        MutableHashTable<BT, PT> mutableHashTable = null;
        List<MemorySegment> list = null;
        boolean z = false;
        try {
            list = getMemoryManager().allocatePages(getOwningNepheleTask(), solutionSetMemory);
            mutableHashTable = new MutableHashTable<>(serializer, serializer2, createComparator, createComparator2, createComparator21, list, getIOManager());
            z = true;
            if (1 == 0) {
                if (mutableHashTable != null) {
                    try {
                        mutableHashTable.close();
                    } catch (Throwable th) {
                        log.error("Error closing the solution set hash table after unsuccessful creation.", th);
                    }
                }
                if (list != null) {
                    try {
                        getMemoryManager().release(list);
                    } catch (Throwable th2) {
                        log.error("Error freeing memory after error during solution set hash table creation.", th2);
                    }
                }
            }
            return mutableHashTable;
        } catch (Throwable th3) {
            if (!z) {
                if (mutableHashTable != null) {
                    try {
                        mutableHashTable.close();
                    } catch (Throwable th4) {
                        log.error("Error closing the solution set hash table after unsuccessful creation.", th4);
                    }
                }
                if (list != null) {
                    try {
                        getMemoryManager().release(list);
                    } catch (Throwable th5) {
                        log.error("Error freeing memory after error during solution set hash table creation.", th5);
                    }
                }
            }
            throw th3;
        }
    }

    private <T> void readInitialSolutionSet(MutableHashTable<X, T> mutableHashTable, MutableObjectIterator<X> mutableObjectIterator) throws IOException {
        mutableHashTable.open(mutableObjectIterator, EmptyMutableObjectIterator.get());
    }

    private SuperstepBarrier initSuperstepBarrier() {
        SuperstepBarrier superstepBarrier = new SuperstepBarrier(this.userCodeClassLoader);
        this.toSync.subscribeToEvent(superstepBarrier, AllWorkersDoneEvent.class);
        this.toSync.subscribeToEvent(superstepBarrier, TerminationEvent.class);
        return superstepBarrier;
    }

    @Override // eu.stratosphere.pact.runtime.iterative.task.AbstractIterativePactTask, eu.stratosphere.pact.runtime.task.RegularPactTask
    public void run() throws Exception {
        String brokerKey = brokerKey();
        int indexInSubtaskGroup = getEnvironment().getIndexInSubtaskGroup();
        MutableHashTable<?, ?> mutableHashTable = null;
        boolean waitForSolutionSetUpdate = this.config.getWaitForSolutionSetUpdate();
        boolean isWorksetIteration = this.config.getIsWorksetIteration();
        try {
            BlockingBackChannel initBackChannel = initBackChannel();
            SuperstepBarrier initSuperstepBarrier = initSuperstepBarrier();
            SolutionSetUpdateBarrier solutionSetUpdateBarrier = null;
            this.feedbackDataInput = this.config.getIterationHeadPartialSolutionOrWorksetInputIndex();
            this.feedbackTypeSerializer = getInputSerializer(this.feedbackDataInput);
            excludeFromReset(this.feedbackDataInput);
            if (isWorksetIteration) {
                this.initialSolutionSetInput = this.config.getIterationHeadSolutionSetInputIndex();
                this.solutionTypeSerializer = this.config.getSolutionSetSerializer(this.userCodeClassLoader).getSerializer();
                mutableHashTable = initHashTable();
                readInitialSolutionSet(mutableHashTable, createInputIterator(this.initialSolutionSetInput, this.inputReaders[this.initialSolutionSetInput], this.solutionTypeSerializer));
                SolutionSetBroker.instance().handIn(brokerKey, mutableHashTable);
                if (waitForSolutionSetUpdate) {
                    solutionSetUpdateBarrier = new SolutionSetUpdateBarrier();
                    SolutionSetUpdateBarrierBroker.instance().handIn(brokerKey, solutionSetUpdateBarrier);
                }
            } else {
                this.initialSolutionSetInput = -1;
                this.solutionTypeSerializer = this.feedbackTypeSerializer;
            }
            this.aggregatorRegistry = new RuntimeAggregatorRegistry(this.config.getIterationAggregators());
            IterationAggregatorBroker.instance().handIn(brokerKey, this.aggregatorRegistry);
            DataInputView dataInputView = null;
            while (this.running && !terminationRequested()) {
                if (log.isInfoEnabled()) {
                    log.info(formatLogString("starting iteration [" + currentIteration() + "]"));
                }
                initSuperstepBarrier.setup();
                if (waitForSolutionSetUpdate) {
                    solutionSetUpdateBarrier.setup();
                }
                if (!inFirstIteration()) {
                    feedBackSuperstepResult(dataInputView);
                }
                super.run();
                sendEndOfSuperstepToAllIterationOutputs();
                if (waitForSolutionSetUpdate) {
                    solutionSetUpdateBarrier.waitForSolutionSetUpdate();
                }
                dataInputView = initBackChannel.getReadEndAfterSuperstepEnded();
                if (log.isInfoEnabled()) {
                    log.info(formatLogString("finishing iteration [" + currentIteration() + "]"));
                }
                sendEventToSync(new WorkerDoneEvent(indexInSubtaskGroup, this.aggregatorRegistry.getAllAggregators()));
                if (log.isInfoEnabled()) {
                    log.info(formatLogString("waiting for other workers in iteration [" + currentIteration() + "]"));
                }
                initSuperstepBarrier.waitForOtherWorkers();
                if (initSuperstepBarrier.terminationSignaled()) {
                    if (log.isInfoEnabled()) {
                        log.info(formatLogString("head received termination request in iteration [" + currentIteration() + "]"));
                    }
                    requestTermination();
                } else {
                    incrementIterationCounter();
                    this.aggregatorRegistry.updateGlobalAggregatesAndReset(initSuperstepBarrier.getAggregatorNames(), initSuperstepBarrier.getAggregates());
                }
            }
            if (log.isInfoEnabled()) {
                log.info(formatLogString("streaming out final result after [" + currentIteration() + "] iterations"));
            }
            if (isWorksetIteration) {
                streamSolutionSetToFinalOutput(mutableHashTable);
            } else {
                streamOutFinalOutputBulk(new InputViewIterator<>(dataInputView, this.solutionTypeSerializer));
            }
            IterationAggregatorBroker.instance().remove(brokerKey);
            BlockingBackChannelBroker.instance().remove(brokerKey);
            if (isWorksetIteration) {
                SolutionSetBroker.instance().remove(brokerKey);
                if (waitForSolutionSetUpdate) {
                    SolutionSetUpdateBarrierBroker.instance().remove(brokerKey);
                }
            }
            if (mutableHashTable != null) {
                mutableHashTable.close();
            }
        } catch (Throwable th) {
            IterationAggregatorBroker.instance().remove(brokerKey);
            BlockingBackChannelBroker.instance().remove(brokerKey);
            if (isWorksetIteration) {
                SolutionSetBroker.instance().remove(brokerKey);
                if (waitForSolutionSetUpdate) {
                    SolutionSetUpdateBarrierBroker.instance().remove(brokerKey);
                }
            }
            if (mutableHashTable != null) {
                mutableHashTable.close();
            }
            throw th;
        }
    }

    private void streamOutFinalOutputBulk(MutableObjectIterator<X> mutableObjectIterator) throws IOException {
        Collector<X> collector = this.finalOutputCollector;
        Object createInstance = this.solutionTypeSerializer.createInstance();
        while (mutableObjectIterator.next(createInstance)) {
            collector.collect(createInstance);
        }
    }

    private void streamSolutionSetToFinalOutput(MutableHashTable<X, ?> mutableHashTable) throws IOException, InterruptedException {
        MutableObjectIterator<X> partitionEntryIterator = mutableHashTable.getPartitionEntryIterator();
        Collector<X> collector = this.finalOutputCollector;
        Object createInstance = this.solutionTypeSerializer.createInstance();
        while (partitionEntryIterator.next(createInstance)) {
            collector.collect(createInstance);
        }
    }

    private void feedBackSuperstepResult(DataInputView dataInputView) {
        this.inputs[this.feedbackDataInput] = new InputViewIterator(dataInputView, this.feedbackTypeSerializer);
    }

    private void sendEndOfSuperstepToAllIterationOutputs() throws IOException, InterruptedException {
        if (log.isDebugEnabled()) {
            log.debug(formatLogString("Sending end-of-superstep to all iteration outputs."));
        }
        for (int i = 0; i < this.eventualOutputs.size(); i++) {
            this.eventualOutputs.get(i).sendEndOfSuperstep();
        }
    }

    private void sendEventToSync(WorkerDoneEvent workerDoneEvent) throws IOException, InterruptedException {
        if (log.isInfoEnabled()) {
            log.info(formatLogString("sending " + WorkerDoneEvent.class.getSimpleName() + " to sync"));
        }
        this.toSync.publishEvent(workerDoneEvent);
    }
}
