package eu.stratosphere.pact.runtime.task;

import eu.stratosphere.api.common.functions.GenericCrosser;
import eu.stratosphere.api.common.typeutils.TypeSerializer;
import eu.stratosphere.nephele.profiling.ProfilingUtils;
import eu.stratosphere.nephele.services.memorymanager.MemoryManager;
import eu.stratosphere.pact.runtime.resettable.BlockResettableMutableObjectIterator;
import eu.stratosphere.pact.runtime.resettable.SpillingResettableMutableObjectIterator;
import eu.stratosphere.pact.runtime.task.util.TaskConfig;
import eu.stratosphere.util.Collector;
import eu.stratosphere.util.MutableObjectIterator;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

/* loaded from: input_file:eu/stratosphere/pact/runtime/task/CrossDriver.class */
public class CrossDriver<T1, T2, OT> implements PactDriver<GenericCrosser<T1, T2, OT>, OT> {
    private static final Log LOG = LogFactory.getLog(CrossDriver.class);
    private PactTaskContext<GenericCrosser<T1, T2, OT>, OT> taskContext;
    private MemoryManager memManager;
    private SpillingResettableMutableObjectIterator<?> spillIter;
    private BlockResettableMutableObjectIterator<?> blockIter;
    private int memPagesForBlockSide;
    private int memPagesForSpillingSide;
    private boolean blocked;
    private boolean firstIsOuter;
    private volatile boolean running;

    /* renamed from: eu.stratosphere.pact.runtime.task.CrossDriver$1, reason: invalid class name */
    /* loaded from: input_file:eu/stratosphere/pact/runtime/task/CrossDriver$1.class */
    static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$eu$stratosphere$pact$runtime$task$DriverStrategy = new int[DriverStrategy.values().length];

        static {
            try {
                $SwitchMap$eu$stratosphere$pact$runtime$task$DriverStrategy[DriverStrategy.NESTEDLOOP_BLOCKED_OUTER_FIRST.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$eu$stratosphere$pact$runtime$task$DriverStrategy[DriverStrategy.NESTEDLOOP_BLOCKED_OUTER_SECOND.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$eu$stratosphere$pact$runtime$task$DriverStrategy[DriverStrategy.NESTEDLOOP_STREAMED_OUTER_FIRST.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
            try {
                $SwitchMap$eu$stratosphere$pact$runtime$task$DriverStrategy[DriverStrategy.NESTEDLOOP_STREAMED_OUTER_SECOND.ordinal()] = 4;
            } catch (NoSuchFieldError e4) {
            }
        }
    }

    @Override // eu.stratosphere.pact.runtime.task.PactDriver
    public void setup(PactTaskContext<GenericCrosser<T1, T2, OT>, OT> pactTaskContext) {
        this.taskContext = pactTaskContext;
        this.running = true;
    }

    @Override // eu.stratosphere.pact.runtime.task.PactDriver
    public int getNumberOfInputs() {
        return 2;
    }

    @Override // eu.stratosphere.pact.runtime.task.PactDriver
    public Class<GenericCrosser<T1, T2, OT>> getStubType() {
        return GenericCrosser.class;
    }

    @Override // eu.stratosphere.pact.runtime.task.PactDriver
    public boolean requiresComparatorOnInput() {
        return false;
    }

    @Override // eu.stratosphere.pact.runtime.task.PactDriver
    public void prepare() throws Exception {
        TaskConfig taskConfig = this.taskContext.getTaskConfig();
        DriverStrategy driverStrategy = taskConfig.getDriverStrategy();
        switch (AnonymousClass1.$SwitchMap$eu$stratosphere$pact$runtime$task$DriverStrategy[driverStrategy.ordinal()]) {
            case 1:
                this.blocked = true;
                this.firstIsOuter = true;
                break;
            case ProfilingUtils.DEFAULT_TASKMANAGER_REPORTINTERVAL /* 2 */:
                this.blocked = true;
                this.firstIsOuter = false;
                break;
            case 3:
                this.blocked = false;
                this.firstIsOuter = true;
                break;
            case 4:
                this.blocked = false;
                this.firstIsOuter = false;
                break;
            default:
                throw new RuntimeException("Invalid local strategy for CROSS: " + driverStrategy);
        }
        this.memManager = this.taskContext.getMemoryManager();
        int computeNumberOfPages = this.memManager.computeNumberOfPages(taskConfig.getMemoryDriver());
        if (computeNumberOfPages < 2) {
            throw new RuntimeException("The Cross task was initialized with too little memory. Cross requires at least 2 memory pages.");
        }
        if (driverStrategy == DriverStrategy.NESTEDLOOP_STREAMED_OUTER_FIRST || driverStrategy == DriverStrategy.NESTEDLOOP_STREAMED_OUTER_SECOND) {
            this.memPagesForSpillingSide = computeNumberOfPages;
            this.memPagesForBlockSide = 0;
        } else {
            if (computeNumberOfPages > 32) {
                this.memPagesForSpillingSide = 2;
            } else {
                this.memPagesForSpillingSide = 1;
            }
            this.memPagesForBlockSide = computeNumberOfPages - this.memPagesForSpillingSide;
        }
    }

    @Override // eu.stratosphere.pact.runtime.task.PactDriver
    public void run() throws Exception {
        if (this.blocked) {
            if (this.firstIsOuter) {
                runBlockedOuterFirst();
                return;
            } else {
                runBlockedOuterSecond();
                return;
            }
        }
        if (this.firstIsOuter) {
            runStreamedOuterFirst();
        } else {
            runStreamedOuterSecond();
        }
    }

    @Override // eu.stratosphere.pact.runtime.task.PactDriver
    public void cleanup() throws Exception {
        if (this.spillIter != null) {
            this.spillIter.close();
            this.spillIter = null;
        }
        if (this.blockIter != null) {
            this.blockIter.close();
            this.blockIter = null;
        }
    }

    @Override // eu.stratosphere.pact.runtime.task.PactDriver
    public void cancel() {
        this.running = false;
    }

    private void runBlockedOuterFirst() throws Exception {
        Object next;
        if (LOG.isDebugEnabled()) {
            LOG.debug(this.taskContext.formatLogString("Running Cross with Block-Nested-Loops: First input is outer (blocking) side, second input is inner (spilling) side."));
        }
        MutableObjectIterator<X> input = this.taskContext.getInput(0);
        MutableObjectIterator<X> input2 = this.taskContext.getInput(1);
        TypeSerializer serializer = this.taskContext.getInputSerializer(0).getSerializer();
        TypeSerializer serializer2 = this.taskContext.getInputSerializer(1).getSerializer();
        BlockResettableMutableObjectIterator<?> blockResettableMutableObjectIterator = new BlockResettableMutableObjectIterator<>(this.memManager, input, serializer, this.memPagesForBlockSide, this.taskContext.getOwningNepheleTask());
        this.blockIter = blockResettableMutableObjectIterator;
        SpillingResettableMutableObjectIterator<?> spillingResettableMutableObjectIterator = new SpillingResettableMutableObjectIterator<>((MutableObjectIterator<?>) input2, (TypeSerializer<?>) serializer2, this.memManager, this.taskContext.getIOManager(), this.memPagesForSpillingSide, this.taskContext.getOwningNepheleTask());
        this.spillIter = spillingResettableMutableObjectIterator;
        Object createInstance = serializer.createInstance();
        Object createInstance2 = serializer2.createInstance();
        Object createInstance3 = serializer2.createInstance();
        GenericCrosser<T1, T2, OT> stub = this.taskContext.getStub();
        Collector<OT> outputCollector = this.taskContext.getOutputCollector();
        while (true) {
            if (!this.running || (next = spillingResettableMutableObjectIterator.next(createInstance2)) == null) {
                spillingResettableMutableObjectIterator.reset();
                if (!this.running || !blockResettableMutableObjectIterator.nextBlock()) {
                    return;
                }
            } else {
                while (true) {
                    Object next2 = blockResettableMutableObjectIterator.next(createInstance);
                    if (next2 == null) {
                        break;
                    }
                    createInstance3 = serializer2.copy(next, createInstance3);
                    stub.cross(next2, createInstance3, outputCollector);
                }
                blockResettableMutableObjectIterator.reset();
            }
        }
    }

    private void runBlockedOuterSecond() throws Exception {
        Object next;
        Object next2;
        if (LOG.isDebugEnabled()) {
            LOG.debug(this.taskContext.formatLogString("Running Cross with Block-Nested-Loops: First input is inner (spilling) side, second input is outer (blocking) side."));
        }
        MutableObjectIterator<X> input = this.taskContext.getInput(0);
        MutableObjectIterator<X> input2 = this.taskContext.getInput(1);
        TypeSerializer serializer = this.taskContext.getInputSerializer(0).getSerializer();
        TypeSerializer serializer2 = this.taskContext.getInputSerializer(1).getSerializer();
        SpillingResettableMutableObjectIterator<?> spillingResettableMutableObjectIterator = new SpillingResettableMutableObjectIterator<>((MutableObjectIterator<?>) input, (TypeSerializer<?>) serializer, this.memManager, this.taskContext.getIOManager(), this.memPagesForSpillingSide, this.taskContext.getOwningNepheleTask());
        this.spillIter = spillingResettableMutableObjectIterator;
        BlockResettableMutableObjectIterator<?> blockResettableMutableObjectIterator = new BlockResettableMutableObjectIterator<>(this.memManager, input2, serializer2, this.memPagesForBlockSide, this.taskContext.getOwningNepheleTask());
        this.blockIter = blockResettableMutableObjectIterator;
        Object createInstance = serializer.createInstance();
        Object createInstance2 = serializer.createInstance();
        Object createInstance3 = serializer2.createInstance();
        GenericCrosser<T1, T2, OT> stub = this.taskContext.getStub();
        Collector<OT> outputCollector = this.taskContext.getOutputCollector();
        while (true) {
            if (!this.running || (next = spillingResettableMutableObjectIterator.next(createInstance)) == null) {
                spillingResettableMutableObjectIterator.reset();
                if (!this.running || !blockResettableMutableObjectIterator.nextBlock()) {
                    return;
                }
            } else {
                while (this.running && (next2 = blockResettableMutableObjectIterator.next(createInstance3)) != null) {
                    createInstance2 = serializer.copy(next, createInstance2);
                    stub.cross(createInstance2, next2, outputCollector);
                }
                blockResettableMutableObjectIterator.reset();
            }
        }
    }

    private void runStreamedOuterFirst() throws Exception {
        Object next;
        Object next2;
        if (LOG.isDebugEnabled()) {
            LOG.debug(this.taskContext.formatLogString("Running Cross with Nested-Loops: First input is outer side, second input is inner (spilling) side."));
        }
        MutableObjectIterator<X> input = this.taskContext.getInput(0);
        MutableObjectIterator<X> input2 = this.taskContext.getInput(1);
        TypeSerializer serializer = this.taskContext.getInputSerializer(0).getSerializer();
        TypeSerializer serializer2 = this.taskContext.getInputSerializer(1).getSerializer();
        SpillingResettableMutableObjectIterator<?> spillingResettableMutableObjectIterator = new SpillingResettableMutableObjectIterator<>((MutableObjectIterator<?>) input2, (TypeSerializer<?>) serializer2, this.memManager, this.taskContext.getIOManager(), this.memPagesForSpillingSide, this.taskContext.getOwningNepheleTask());
        this.spillIter = spillingResettableMutableObjectIterator;
        Object createInstance = serializer.createInstance();
        Object createInstance2 = serializer.createInstance();
        Object createInstance3 = serializer2.createInstance();
        GenericCrosser<T1, T2, OT> stub = this.taskContext.getStub();
        Collector<OT> outputCollector = this.taskContext.getOutputCollector();
        while (this.running && (next = input.next(createInstance)) != null) {
            while (this.running && (next2 = spillingResettableMutableObjectIterator.next(createInstance3)) != null) {
                createInstance2 = serializer.copy(next, createInstance2);
                stub.cross(createInstance2, next2, outputCollector);
            }
            spillingResettableMutableObjectIterator.reset();
        }
    }

    private void runStreamedOuterSecond() throws Exception {
        Object next;
        Object next2;
        if (LOG.isDebugEnabled()) {
            LOG.debug(this.taskContext.formatLogString("Running Cross with Nested-Loops: First input is inner (spilling) side, second input is outer side."));
        }
        MutableObjectIterator<X> input = this.taskContext.getInput(0);
        MutableObjectIterator<X> input2 = this.taskContext.getInput(1);
        TypeSerializer serializer = this.taskContext.getInputSerializer(0).getSerializer();
        TypeSerializer serializer2 = this.taskContext.getInputSerializer(1).getSerializer();
        SpillingResettableMutableObjectIterator<?> spillingResettableMutableObjectIterator = new SpillingResettableMutableObjectIterator<>((MutableObjectIterator<?>) input, (TypeSerializer<?>) serializer, this.memManager, this.taskContext.getIOManager(), this.memPagesForSpillingSide, this.taskContext.getOwningNepheleTask());
        this.spillIter = spillingResettableMutableObjectIterator;
        Object createInstance = serializer.createInstance();
        Object createInstance2 = serializer2.createInstance();
        Object createInstance3 = serializer2.createInstance();
        GenericCrosser<T1, T2, OT> stub = this.taskContext.getStub();
        Collector<OT> outputCollector = this.taskContext.getOutputCollector();
        while (this.running && (next = input2.next(createInstance2)) != null) {
            while (this.running && (next2 = spillingResettableMutableObjectIterator.next(createInstance)) != null) {
                createInstance3 = serializer2.copy(next, createInstance3);
                stub.cross(next2, createInstance3, outputCollector);
            }
            spillingResettableMutableObjectIterator.reset();
        }
    }
}
