package eu.stratosphere.pact.runtime.task;

import eu.stratosphere.api.common.functions.GenericJoiner;
import eu.stratosphere.api.common.typeutils.TypeComparator;
import eu.stratosphere.api.common.typeutils.TypePairComparatorFactory;
import eu.stratosphere.api.common.typeutils.TypeSerializer;
import eu.stratosphere.core.memory.MemorySegment;
import eu.stratosphere.pact.runtime.hash.MutableHashTable;
import eu.stratosphere.pact.runtime.task.util.TaskConfig;
import eu.stratosphere.pact.runtime.util.EmptyMutableObjectIterator;
import eu.stratosphere.util.Collector;
import eu.stratosphere.util.MutableObjectIterator;
import java.util.List;

/* loaded from: input_file:eu/stratosphere/pact/runtime/task/AbstractCachedBuildSideMatchDriver.class */
public abstract class AbstractCachedBuildSideMatchDriver<IT1, IT2, OT> extends MatchDriver<IT1, IT2, OT> implements ResettablePactDriver<GenericJoiner<IT1, IT2, OT>, OT> {
    protected volatile MutableHashTable<?, ?> hashJoin;

    protected abstract int getBuildSideIndex();

    protected abstract int getProbeSideIndex();

    @Override // eu.stratosphere.pact.runtime.task.ResettablePactDriver
    public boolean isInputResettable(int i) {
        if (i < 0 || i > 1) {
            throw new IndexOutOfBoundsException();
        }
        return i == getBuildSideIndex();
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // eu.stratosphere.pact.runtime.task.ResettablePactDriver
    public void initialize() throws Exception {
        TaskConfig taskConfig = this.taskContext.getTaskConfig();
        TypeSerializer<X> inputSerializer = this.taskContext.getInputSerializer(0);
        TypeSerializer<X> inputSerializer2 = this.taskContext.getInputSerializer(1);
        TypeComparator<X> inputComparator = this.taskContext.getInputComparator(0);
        TypeComparator<X> inputComparator2 = this.taskContext.getInputComparator(1);
        MutableObjectIterator<X> input = this.taskContext.getInput(0);
        MutableObjectIterator<X> input2 = this.taskContext.getInput(1);
        TypePairComparatorFactory pairComparatorFactory = this.taskContext.getTaskConfig().getPairComparatorFactory(this.taskContext.getUserCodeClassLoader());
        List<MemorySegment> allocatePages = this.taskContext.getMemoryManager().allocatePages(this.taskContext.getOwningNepheleTask(), this.taskContext.getMemoryManager().computeNumberOfPages(taskConfig.getMemoryDriver()));
        if (getBuildSideIndex() == 0 && getProbeSideIndex() == 1) {
            MutableHashTable<?, ?> mutableHashTable = new MutableHashTable<>(inputSerializer, inputSerializer2, inputComparator, inputComparator2, pairComparatorFactory.createComparator21(inputComparator, inputComparator2), allocatePages, this.taskContext.getIOManager());
            this.hashJoin = mutableHashTable;
            mutableHashTable.open(input, EmptyMutableObjectIterator.get());
        } else {
            if (getBuildSideIndex() != 1 || getProbeSideIndex() != 0) {
                throw new Exception("Error: Inconcistent setup for repeatable hash join driver.");
            }
            MutableHashTable<?, ?> mutableHashTable2 = new MutableHashTable<>(inputSerializer2, inputSerializer, inputComparator2, inputComparator, pairComparatorFactory.createComparator12(inputComparator, inputComparator2), allocatePages, this.taskContext.getIOManager());
            this.hashJoin = mutableHashTable2;
            mutableHashTable2.open(input2, EmptyMutableObjectIterator.get());
        }
    }

    @Override // eu.stratosphere.pact.runtime.task.MatchDriver, eu.stratosphere.pact.runtime.task.PactDriver
    public void prepare() throws Exception {
    }

    @Override // eu.stratosphere.pact.runtime.task.MatchDriver, eu.stratosphere.pact.runtime.task.PactDriver
    public void run() throws Exception {
        GenericJoiner<IT1, IT2, OT> stub = this.taskContext.getStub();
        Collector<OT> outputCollector = this.taskContext.getOutputCollector();
        if (getBuildSideIndex() == 0) {
            TypeSerializer<X> inputSerializer = this.taskContext.getInputSerializer(0);
            TypeSerializer<X> inputSerializer2 = this.taskContext.getInputSerializer(1);
            Object createInstance = inputSerializer.createInstance();
            Object createInstance2 = inputSerializer.createInstance();
            Object createInstance3 = inputSerializer2.createInstance();
            Object createInstance4 = inputSerializer2.createInstance();
            MutableHashTable<?, ?> mutableHashTable = this.hashJoin;
            MutableObjectIterator<X> input = this.taskContext.getInput(1);
            while (this.running && input.next(createInstance3)) {
                MutableHashTable.HashBucketIterator<?, ?> matchesFor = mutableHashTable.getMatchesFor(createInstance3);
                if (matchesFor.next(createInstance)) {
                    while (matchesFor.next(createInstance2)) {
                        inputSerializer2.copyTo(createInstance3, createInstance4);
                        stub.join(createInstance2, createInstance4, outputCollector);
                    }
                    stub.join(createInstance, createInstance3, outputCollector);
                }
            }
            return;
        }
        if (getBuildSideIndex() != 1) {
            throw new Exception();
        }
        TypeSerializer<X> inputSerializer3 = this.taskContext.getInputSerializer(1);
        TypeSerializer<X> inputSerializer4 = this.taskContext.getInputSerializer(0);
        Object createInstance5 = inputSerializer3.createInstance();
        Object createInstance6 = inputSerializer3.createInstance();
        Object createInstance7 = inputSerializer4.createInstance();
        Object createInstance8 = inputSerializer4.createInstance();
        MutableHashTable<?, ?> mutableHashTable2 = this.hashJoin;
        MutableObjectIterator<X> input2 = this.taskContext.getInput(0);
        while (this.running && input2.next(createInstance7)) {
            MutableHashTable.HashBucketIterator<?, ?> matchesFor2 = mutableHashTable2.getMatchesFor(createInstance7);
            if (matchesFor2.next(createInstance5)) {
                while (matchesFor2.next(createInstance6)) {
                    inputSerializer4.copyTo(createInstance7, createInstance8);
                    stub.join(createInstance8, createInstance6, outputCollector);
                }
                stub.join(createInstance7, createInstance5, outputCollector);
            }
        }
    }

    @Override // eu.stratosphere.pact.runtime.task.MatchDriver, eu.stratosphere.pact.runtime.task.PactDriver
    public void cleanup() throws Exception {
    }

    @Override // eu.stratosphere.pact.runtime.task.ResettablePactDriver
    public void reset() throws Exception {
    }

    @Override // eu.stratosphere.pact.runtime.task.ResettablePactDriver
    public void teardown() {
        this.hashJoin.close();
    }

    @Override // eu.stratosphere.pact.runtime.task.MatchDriver, eu.stratosphere.pact.runtime.task.PactDriver
    public void cancel() {
        this.running = false;
        if (this.hashJoin != null) {
            this.hashJoin.close();
        }
    }
}
