package eu.stratosphere.pact.runtime.task.chaining;

import eu.stratosphere.api.common.functions.Function;
import eu.stratosphere.api.common.functions.GenericReducer;
import eu.stratosphere.api.common.typeutils.TypeComparator;
import eu.stratosphere.api.common.typeutils.TypeComparatorFactory;
import eu.stratosphere.api.common.typeutils.TypeSerializer;
import eu.stratosphere.api.common.typeutils.TypeSerializerFactory;
import eu.stratosphere.nephele.services.memorymanager.MemoryManager;
import eu.stratosphere.nephele.template.AbstractInvokable;
import eu.stratosphere.pact.runtime.sort.AsynchronousPartialSorterCollector;
import eu.stratosphere.pact.runtime.sort.UnilateralSortMerger;
import eu.stratosphere.pact.runtime.task.DriverStrategy;
import eu.stratosphere.pact.runtime.task.RegularPactTask;
import eu.stratosphere.pact.runtime.util.KeyGroupedIterator;
import eu.stratosphere.util.Collector;
import eu.stratosphere.util.MutableObjectIterator;

/* loaded from: input_file:eu/stratosphere/pact/runtime/task/chaining/ChainedCombineDriver.class */
public class ChainedCombineDriver<T> extends ChainedDriver<T, T> {
    private UnilateralSortMerger.InputDataCollector<T> inputCollector;
    private volatile Exception exception;
    private GenericReducer<T, ?> combiner;
    private AsynchronousPartialSorterCollector<T> sorter;
    private ChainedCombineDriver<T>.CombinerThread combinerThread;
    private AbstractInvokable parent;
    private volatile boolean canceled;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:eu/stratosphere/pact/runtime/task/chaining/ChainedCombineDriver$CombinerThread.class */
    public final class CombinerThread extends Thread {
        private final AsynchronousPartialSorterCollector<T> sorter;
        private final TypeSerializer<T> serializer;
        private final TypeComparator<T> comparator;
        private final GenericReducer<T, ?> stub;
        private final Collector<T> output;
        private volatile boolean running;

        private CombinerThread(AsynchronousPartialSorterCollector<T> asynchronousPartialSorterCollector, TypeSerializer<T> typeSerializer, TypeComparator<T> typeComparator, GenericReducer<T, ?> genericReducer, Collector<T> collector) {
            super("Combiner Thread");
            setDaemon(true);
            this.sorter = asynchronousPartialSorterCollector;
            this.serializer = typeSerializer;
            this.comparator = typeComparator;
            this.stub = genericReducer;
            this.output = collector;
            this.running = true;
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            MutableObjectIterator<T> mutableObjectIterator = null;
            while (mutableObjectIterator == null) {
                try {
                    try {
                        mutableObjectIterator = this.sorter.getIterator();
                    } catch (InterruptedException e) {
                        if (!this.running) {
                            return;
                        }
                    }
                } catch (Throwable th) {
                    ChainedCombineDriver.this.exception = new Exception("The combiner failed due to an exception.", th);
                    return;
                }
            }
            KeyGroupedIterator keyGroupedIterator = new KeyGroupedIterator(mutableObjectIterator, this.serializer, this.comparator);
            GenericReducer<T, ?> genericReducer = this.stub;
            Collector<T> collector = this.output;
            while (this.running && keyGroupedIterator.nextKey()) {
                genericReducer.combine(keyGroupedIterator.getValues(), collector);
            }
        }

        public void cancel() {
            this.running = false;
            interrupt();
        }
    }

    @Override // eu.stratosphere.pact.runtime.task.chaining.ChainedDriver
    public void setup(AbstractInvokable abstractInvokable) {
        this.parent = abstractInvokable;
        GenericReducer<T, ?> genericReducer = (GenericReducer) RegularPactTask.instantiateUserCode(this.config, this.userCodeClassLoader, GenericReducer.class);
        genericReducer.setRuntimeContext(getRuntimeContext(abstractInvokable, this.taskName));
        this.combiner = genericReducer;
    }

    @Override // eu.stratosphere.pact.runtime.task.chaining.ChainedDriver
    public void openTask() throws Exception {
        RegularPactTask.openUserCode(this.combiner, this.config.getStubParameters());
        long memoryDriver = this.config.getMemoryDriver();
        DriverStrategy driverStrategy = this.config.getDriverStrategy();
        MemoryManager memoryManager = this.parent.getEnvironment().getMemoryManager();
        TypeSerializerFactory<T> inputSerializer = this.config.getInputSerializer(0, this.userCodeClassLoader);
        TypeComparatorFactory<T> driverComparator = this.config.getDriverComparator(0, this.userCodeClassLoader);
        TypeSerializer serializer = inputSerializer.getSerializer();
        TypeComparator createComparator = driverComparator.createComparator();
        switch (driverStrategy) {
            case PARTIAL_GROUP:
                this.sorter = new AsynchronousPartialSorterCollector<>(memoryManager, this.parent, serializer, createComparator.duplicate(), memoryDriver);
                this.inputCollector = this.sorter.getInputCollector();
                this.combinerThread = new CombinerThread(this.sorter, serializer, createComparator, this.combiner, this.outputCollector);
                this.combinerThread.start();
                if (this.parent != null) {
                    this.parent.userThreadStarted(this.combinerThread);
                    return;
                }
                return;
            default:
                throw new RuntimeException("Invalid local strategy provided for CombineTask.");
        }
    }

    @Override // eu.stratosphere.pact.runtime.task.chaining.ChainedDriver
    public void closeTask() throws Exception {
        while (!this.canceled && this.combinerThread.isAlive()) {
            try {
                this.combinerThread.join();
            } catch (InterruptedException e) {
                cancelTask();
                throw e;
            }
        }
        if (this.parent != null && this.combinerThread != null) {
            this.parent.userThreadFinished(this.combinerThread);
        }
        if (this.exception != null) {
            throw new ExceptionInChainedStubException(this.taskName, this.exception);
        }
        this.sorter.close();
        if (this.canceled) {
            return;
        }
        RegularPactTask.closeUserCode(this.combiner);
    }

    @Override // eu.stratosphere.pact.runtime.task.chaining.ChainedDriver
    public void cancelTask() {
        this.canceled = true;
        this.exception = new Exception("Task has been canceled");
        this.combinerThread.cancel();
        this.inputCollector.close();
        this.sorter.close();
        try {
            this.combinerThread.join();
            if (this.parent == null || this.combinerThread == null) {
                return;
            }
            this.parent.userThreadFinished(this.combinerThread);
        } catch (InterruptedException e) {
            if (this.parent == null || this.combinerThread == null) {
                return;
            }
            this.parent.userThreadFinished(this.combinerThread);
        } catch (Throwable th) {
            if (this.parent != null && this.combinerThread != null) {
                this.parent.userThreadFinished(this.combinerThread);
            }
            throw th;
        }
    }

    @Override // eu.stratosphere.pact.runtime.task.chaining.ChainedDriver
    public Function getStub() {
        return this.combiner;
    }

    @Override // eu.stratosphere.pact.runtime.task.chaining.ChainedDriver
    public String getTaskName() {
        return this.taskName;
    }

    @Override // eu.stratosphere.pact.runtime.task.chaining.ChainedDriver
    public void collect(T t) {
        if (this.exception != null) {
            throw new RuntimeException("The combiner failed due to an exception.", this.exception.getCause() == null ? this.exception : this.exception.getCause());
        }
        this.inputCollector.collect(t);
    }

    public void close() {
        this.inputCollector.close();
        if (this.exception != null) {
            throw new RuntimeException("The combiner failed due to an exception.", this.exception.getCause() == null ? this.exception : this.exception.getCause());
        }
    }
}
