package eu.stratosphere.pact.runtime.sort;

import eu.stratosphere.api.common.functions.GenericJoiner;
import eu.stratosphere.api.common.typeutils.TypeComparator;
import eu.stratosphere.api.common.typeutils.TypePairComparator;
import eu.stratosphere.api.common.typeutils.TypeSerializer;
import eu.stratosphere.core.memory.MemorySegment;
import eu.stratosphere.nephele.services.iomanager.IOManager;
import eu.stratosphere.nephele.services.memorymanager.MemoryAllocationException;
import eu.stratosphere.nephele.services.memorymanager.MemoryManager;
import eu.stratosphere.nephele.template.AbstractInvokable;
import eu.stratosphere.pact.runtime.resettable.BlockResettableIterator;
import eu.stratosphere.pact.runtime.resettable.SpillingResettableIterator;
import eu.stratosphere.pact.runtime.task.util.JoinTaskIterator;
import eu.stratosphere.pact.runtime.util.KeyGroupedIterator;
import eu.stratosphere.util.Collector;
import eu.stratosphere.util.MutableObjectIterator;
import java.io.IOException;
import java.util.Iterator;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

/* loaded from: input_file:eu/stratosphere/pact/runtime/sort/MergeMatchIterator.class */
public class MergeMatchIterator<T1, T2, O> implements JoinTaskIterator<T1, T2, O> {
    private static final Log LOG = LogFactory.getLog(MergeMatchIterator.class);
    private TypePairComparator<T1, T2> comp;
    private KeyGroupedIterator<T1> iterator1;
    private KeyGroupedIterator<T2> iterator2;
    private final TypeSerializer<T1> serializer1;
    private final TypeSerializer<T2> serializer2;
    private T1 copy1;
    private T1 spillHeadCopy;
    private T2 copy2;
    private T2 blockHeadCopy;
    private final BlockResettableIterator<T2> blockIt;
    private final List<MemorySegment> memoryForSpillingIterator;
    private final MemoryManager memoryManager;
    private final IOManager ioManager;

    public MergeMatchIterator(MutableObjectIterator<T1> mutableObjectIterator, MutableObjectIterator<T2> mutableObjectIterator2, TypeSerializer<T1> typeSerializer, TypeComparator<T1> typeComparator, TypeSerializer<T2> typeSerializer2, TypeComparator<T2> typeComparator2, TypePairComparator<T1, T2> typePairComparator, MemoryManager memoryManager, IOManager iOManager, int i, AbstractInvokable abstractInvokable) throws MemoryAllocationException {
        if (i < 2) {
            throw new IllegalArgumentException("Merger needs at least 2 memory pages.");
        }
        this.comp = typePairComparator;
        this.serializer1 = typeSerializer;
        this.serializer2 = typeSerializer2;
        this.copy1 = (T1) typeSerializer.createInstance();
        this.spillHeadCopy = (T1) typeSerializer.createInstance();
        this.copy2 = (T2) typeSerializer2.createInstance();
        this.blockHeadCopy = (T2) typeSerializer2.createInstance();
        this.memoryManager = memoryManager;
        this.ioManager = iOManager;
        this.iterator1 = new KeyGroupedIterator<>(mutableObjectIterator, this.serializer1, typeComparator.duplicate());
        this.iterator2 = new KeyGroupedIterator<>(mutableObjectIterator2, this.serializer2, typeComparator2.duplicate());
        int i2 = i > 20 ? 2 : 1;
        this.blockIt = new BlockResettableIterator<>(this.memoryManager, this.serializer2, i - i2, abstractInvokable);
        this.memoryForSpillingIterator = memoryManager.allocatePages(abstractInvokable, i2);
    }

    @Override // eu.stratosphere.pact.runtime.task.util.JoinTaskIterator
    public void open() throws IOException {
    }

    @Override // eu.stratosphere.pact.runtime.task.util.JoinTaskIterator
    public void close() {
        if (this.blockIt != null) {
            try {
                this.blockIt.close();
            } catch (Throwable th) {
                LOG.error("Error closing block memory iterator: " + th.getMessage(), th);
            }
        }
        this.memoryManager.release(this.memoryForSpillingIterator);
    }

    @Override // eu.stratosphere.pact.runtime.task.util.JoinTaskIterator
    public void abort() {
        close();
    }

    @Override // eu.stratosphere.pact.runtime.task.util.JoinTaskIterator
    public boolean callWithNextKey(GenericJoiner<T1, T2, O> genericJoiner, Collector<O> collector) throws Exception {
        if (!this.iterator1.nextKey() || !this.iterator2.nextKey()) {
            do {
            } while (this.iterator1.nextKey());
            do {
            } while (this.iterator2.nextKey());
            return false;
        }
        TypePairComparator<T1, T2> typePairComparator = this.comp;
        typePairComparator.setReference(this.iterator1.getCurrent());
        T2 current = this.iterator2.getCurrent();
        while (true) {
            int compareToReference = typePairComparator.compareToReference(current);
            if (compareToReference == 0) {
                Iterator<T1> values = this.iterator1.getValues();
                Iterator<T2> values2 = this.iterator2.getValues();
                T1 next = values.next();
                T2 next2 = values2.next();
                boolean hasNext = values.hasNext();
                boolean hasNext2 = values2.hasNext();
                if (hasNext) {
                    if (hasNext2) {
                        crossMwithNValues(next, values, next2, values2, genericJoiner, collector);
                        return true;
                    }
                    crossSecond1withNValues(next2, next, values, genericJoiner, collector);
                    return true;
                }
                if (hasNext2) {
                    crossFirst1withNValues(next, next2, values2, genericJoiner, collector);
                    return true;
                }
                genericJoiner.join(next, next2, collector);
                return true;
            }
            if (compareToReference < 0) {
                if (!this.iterator2.nextKey()) {
                    return false;
                }
                current = this.iterator2.getCurrent();
            } else {
                if (!this.iterator1.nextKey()) {
                    return false;
                }
                typePairComparator.setReference(this.iterator1.getCurrent());
            }
        }
    }

    private void crossFirst1withNValues(T1 t1, T2 t2, Iterator<T2> it, GenericJoiner<T1, T2, O> genericJoiner, Collector<O> collector) throws Exception {
        this.copy1 = (T1) this.serializer1.copy(t1, this.copy1);
        genericJoiner.join(this.copy1, t2, collector);
        boolean z = true;
        do {
            T2 next = it.next();
            if (it.hasNext()) {
                this.copy1 = (T1) this.serializer1.copy(t1, this.copy1);
                genericJoiner.join(this.copy1, next, collector);
            } else {
                genericJoiner.join(t1, next, collector);
                z = false;
            }
        } while (z);
    }

    private void crossSecond1withNValues(T2 t2, T1 t1, Iterator<T1> it, GenericJoiner<T1, T2, O> genericJoiner, Collector<O> collector) throws Exception {
        this.copy2 = (T2) this.serializer2.copy(t2, this.copy2);
        genericJoiner.join(t1, this.copy2, collector);
        boolean z = true;
        do {
            T1 next = it.next();
            if (it.hasNext()) {
                this.copy2 = (T2) this.serializer2.copy(t2, this.copy2);
                genericJoiner.join(next, this.copy2, collector);
            } else {
                genericJoiner.join(next, t2, collector);
                z = false;
            }
        } while (z);
    }

    /* JADX WARN: Finally extract failed */
    private void crossMwithNValues(T1 t1, Iterator<T1> it, T2 t2, Iterator<T2> it2, GenericJoiner<T1, T2, O> genericJoiner, Collector<O> collector) throws Exception {
        Iterator<T1> it3;
        this.copy1 = (T1) this.serializer1.copy(t1, this.copy1);
        this.blockHeadCopy = (T2) this.serializer2.copy(t2, this.blockHeadCopy);
        genericJoiner.join(this.copy1, t2, collector);
        SpillingResettableIterator spillingResettableIterator = null;
        try {
            this.blockIt.reopen(it2);
            while (this.blockIt.hasNext()) {
                T2 next = this.blockIt.next();
                this.copy1 = (T1) this.serializer1.copy(t1, this.copy1);
                genericJoiner.join(this.copy1, next, collector);
            }
            this.blockIt.reset();
            boolean hasFurtherInput = this.blockIt.hasFurtherInput();
            if (hasFurtherInput) {
                spillingResettableIterator = new SpillingResettableIterator(it, this.serializer1, this.memoryManager, this.ioManager, this.memoryForSpillingIterator);
                it3 = spillingResettableIterator;
                spillingResettableIterator.open();
                this.spillHeadCopy = (T1) this.serializer1.copy(t1, this.spillHeadCopy);
            } else {
                it3 = it;
            }
            while (it3.hasNext()) {
                T1 next2 = it3.next();
                this.copy1 = (T1) this.serializer1.copy(next2, this.copy1);
                this.copy2 = (T2) this.serializer2.copy(this.blockHeadCopy, this.copy2);
                genericJoiner.join(this.copy1, this.copy2, collector);
                while (this.blockIt.hasNext()) {
                    T2 next3 = this.blockIt.next();
                    this.copy1 = (T1) this.serializer1.copy(next2, this.copy1);
                    genericJoiner.join(this.copy1, next3, collector);
                }
                this.blockIt.reset();
            }
            if (!hasFurtherInput) {
                if (spillingResettableIterator != null) {
                    this.memoryForSpillingIterator.addAll(spillingResettableIterator.close());
                    return;
                }
                return;
            }
            while (this.blockIt.nextBlock()) {
                spillingResettableIterator.reset();
                while (this.blockIt.hasNext()) {
                    this.copy1 = (T1) this.serializer1.copy(this.spillHeadCopy, this.copy1);
                    genericJoiner.join(this.copy1, this.blockIt.next(), collector);
                }
                this.blockIt.reset();
                while (spillingResettableIterator.hasNext()) {
                    Object next4 = spillingResettableIterator.next();
                    while (this.blockIt.hasNext()) {
                        T2 next5 = this.blockIt.next();
                        this.copy1 = (T1) this.serializer1.copy(next4, this.copy1);
                        genericJoiner.join(this.copy1, next5, collector);
                    }
                    this.blockIt.reset();
                }
                spillingResettableIterator.reset();
            }
            if (spillingResettableIterator != null) {
                this.memoryForSpillingIterator.addAll(spillingResettableIterator.close());
            }
        } catch (Throwable th) {
            if (spillingResettableIterator != null) {
                this.memoryForSpillingIterator.addAll(spillingResettableIterator.close());
            }
            throw th;
        }
    }
}
