package eu.stratosphere.pact.runtime.resettable;

import eu.stratosphere.api.common.typeutils.TypeSerializer;
import eu.stratosphere.core.memory.DataInputView;
import eu.stratosphere.core.memory.MemorySegment;
import eu.stratosphere.nephele.services.iomanager.IOManager;
import eu.stratosphere.nephele.services.memorymanager.ListMemorySegmentSource;
import eu.stratosphere.nephele.services.memorymanager.MemoryAllocationException;
import eu.stratosphere.nephele.services.memorymanager.MemoryManager;
import eu.stratosphere.nephele.template.AbstractInvokable;
import eu.stratosphere.pact.runtime.io.SpillingBuffer;
import eu.stratosphere.pact.runtime.util.ResettableMutableObjectIterator;
import eu.stratosphere.util.MutableObjectIterator;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

/* loaded from: input_file:eu/stratosphere/pact/runtime/resettable/SpillingResettableMutableObjectIterator.class */
public class SpillingResettableMutableObjectIterator<T> implements ResettableMutableObjectIterator<T> {
    private static final Log LOG = LogFactory.getLog(SpillingResettableMutableObjectIterator.class);
    protected DataInputView inView;
    protected final TypeSerializer<T> serializer;
    private int elementCount;
    private int currentElementNum;
    protected final SpillingBuffer buffer;
    protected final MutableObjectIterator<T> input;
    protected final MemoryManager memoryManager;
    private final List<MemorySegment> memorySegments;
    private final boolean releaseMemoryOnClose;

    public SpillingResettableMutableObjectIterator(MutableObjectIterator<T> mutableObjectIterator, TypeSerializer<T> typeSerializer, MemoryManager memoryManager, IOManager iOManager, long j, AbstractInvokable abstractInvokable) throws MemoryAllocationException {
        this((MutableObjectIterator) mutableObjectIterator, (TypeSerializer) typeSerializer, memoryManager, iOManager, memoryManager.allocatePages(abstractInvokable, j), true);
    }

    public SpillingResettableMutableObjectIterator(MutableObjectIterator<T> mutableObjectIterator, TypeSerializer<T> typeSerializer, MemoryManager memoryManager, IOManager iOManager, List<MemorySegment> list) {
        this((MutableObjectIterator) mutableObjectIterator, (TypeSerializer) typeSerializer, memoryManager, iOManager, list, false);
    }

    private SpillingResettableMutableObjectIterator(MutableObjectIterator<T> mutableObjectIterator, TypeSerializer<T> typeSerializer, MemoryManager memoryManager, IOManager iOManager, List<MemorySegment> list, boolean z) {
        this.memoryManager = memoryManager;
        this.input = mutableObjectIterator;
        this.serializer = typeSerializer;
        this.memorySegments = list;
        this.releaseMemoryOnClose = z;
        if (LOG.isDebugEnabled()) {
            LOG.debug("Creating spilling resettable iterator with " + list.size() + " pages of memory.");
        }
        this.buffer = new SpillingBuffer(iOManager, new ListMemorySegmentSource(list), memoryManager.getPageSize());
    }

    public void open() throws IOException {
    }

    @Override // eu.stratosphere.pact.runtime.util.ResettableMutableObjectIterator
    public void reset() throws IOException {
        this.inView = this.buffer.flip();
        this.currentElementNum = 0;
    }

    public List<MemorySegment> close() throws IOException {
        if (LOG.isDebugEnabled()) {
            LOG.debug("Spilling Resettable Iterator closing. Stored " + this.elementCount + " records.");
        }
        this.inView = null;
        List<MemorySegment> close = this.buffer.close();
        close.addAll(this.memorySegments);
        this.memorySegments.clear();
        if (!this.releaseMemoryOnClose) {
            return close;
        }
        this.memoryManager.release(close);
        return Collections.emptyList();
    }

    public boolean next(T t) throws IOException {
        if (this.inView != null) {
            if (this.currentElementNum >= this.elementCount) {
                return false;
            }
            try {
                this.serializer.deserialize(t, this.inView);
                this.currentElementNum++;
                return true;
            } catch (IOException e) {
                throw new RuntimeException("SpillingIterator: Error reading element from buffer.", e);
            }
        }
        if (!this.input.next(t)) {
            return false;
        }
        try {
            this.serializer.serialize(t, this.buffer);
            this.elementCount++;
            return true;
        } catch (IOException e2) {
            throw new RuntimeException("SpillingIterator: Error writing element to buffer.", e2);
        }
    }
}
