package eu.stratosphere.pact.runtime.sort;

import eu.stratosphere.api.common.typeutils.TypeComparator;
import eu.stratosphere.api.common.typeutils.TypeSerializer;
import eu.stratosphere.core.memory.MemorySegment;
import eu.stratosphere.nephele.services.iomanager.BlockChannelAccess;
import eu.stratosphere.nephele.services.iomanager.BlockChannelReader;
import eu.stratosphere.nephele.services.iomanager.BlockChannelWriter;
import eu.stratosphere.nephele.services.iomanager.Channel;
import eu.stratosphere.nephele.services.iomanager.ChannelReaderInputView;
import eu.stratosphere.nephele.services.iomanager.ChannelWriterOutputView;
import eu.stratosphere.nephele.services.iomanager.IOManager;
import eu.stratosphere.nephele.services.memorymanager.MemoryAllocationException;
import eu.stratosphere.nephele.services.memorymanager.MemoryManager;
import eu.stratosphere.nephele.template.AbstractInvokable;
import eu.stratosphere.pact.runtime.io.ChannelReaderInputViewIterator;
import eu.stratosphere.pact.runtime.util.EmptyMutableObjectIterator;
import eu.stratosphere.pact.runtime.util.MathUtils;
import eu.stratosphere.util.Collector;
import eu.stratosphere.util.MutableObjectIterator;
import java.io.File;
import java.io.IOException;
import java.lang.Thread;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

/* loaded from: input_file:eu/stratosphere/pact/runtime/sort/UnilateralSortMerger.class */
public class UnilateralSortMerger<E> implements Sorter<E> {
    private static final int THRESHOLD_FOR_IN_PLACE_SORTING = 32;
    protected static final int MIN_NUM_WRITE_BUFFERS = 2;
    protected static final int MAX_NUM_WRITE_BUFFERS = 64;
    protected static final int MIN_NUM_SORT_MEM_SEGMENTS = 32;
    private final ThreadBase<E> readThread;
    private final ThreadBase<E> sortThread;
    private final ThreadBase<E> spillThread;
    protected final ArrayList<MemorySegment> sortReadMemory;
    protected final ArrayList<MemorySegment> writeMemory;
    protected final MemoryManager memoryManager;
    private final HashSet<BlockChannelAccess<?, ?>> openChannels;
    private final HashSet<Channel.ID> channelsToDeleteAtShutdown;
    protected final Object iteratorLock;
    protected volatile MutableObjectIterator<E> iterator;
    protected volatile IOException iteratorException;
    protected volatile boolean closed;
    private static final Log LOG = LogFactory.getLog(UnilateralSortMerger.class);
    private static final CircularElement<Object> EOF_MARKER = new CircularElement<>();
    private static final CircularElement<Object> SPILLING_MARKER = new CircularElement<>();

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:eu/stratosphere/pact/runtime/sort/UnilateralSortMerger$ChannelWithBlockCount.class */
    public static final class ChannelWithBlockCount {
        private final Channel.ID channel;
        private final int blockCount;

        public ChannelWithBlockCount(Channel.ID id, int i) {
            this.channel = id;
            this.blockCount = i;
        }

        public Channel.ID getChannel() {
            return this.channel;
        }

        public int getBlockCount() {
            return this.blockCount;
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:eu/stratosphere/pact/runtime/sort/UnilateralSortMerger$CircularElement.class */
    public static final class CircularElement<E> {
        final int id;
        final InMemorySorter<E> buffer;

        public CircularElement() {
            this.buffer = null;
            this.id = -1;
        }

        public CircularElement(int i, InMemorySorter<E> inMemorySorter) {
            this.id = i;
            this.buffer = inMemorySorter;
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:eu/stratosphere/pact/runtime/sort/UnilateralSortMerger$CircularQueues.class */
    public static final class CircularQueues<E> {
        final BlockingQueue<CircularElement<E>> empty;
        final BlockingQueue<CircularElement<E>> sort;
        final BlockingQueue<CircularElement<E>> spill;

        public CircularQueues() {
            this.empty = new LinkedBlockingQueue();
            this.sort = new LinkedBlockingQueue();
            this.spill = new LinkedBlockingQueue();
        }

        public CircularQueues(int i) {
            this.empty = new ArrayBlockingQueue(i);
            this.sort = new ArrayBlockingQueue(i);
            this.spill = new ArrayBlockingQueue(i);
        }
    }

    /* loaded from: input_file:eu/stratosphere/pact/runtime/sort/UnilateralSortMerger$InputDataCollector.class */
    public static final class InputDataCollector<E> implements Collector<E> {
        private final CircularQueues<E> queues;
        private InMemorySorter<E> currentBuffer;
        private CircularElement<E> currentElement;
        private long bytesUntilSpilling;
        private boolean spillingInThisBuffer;
        private volatile boolean running = true;

        public InputDataCollector(CircularQueues<E> circularQueues, long j) {
            this.queues = circularQueues;
            this.bytesUntilSpilling = j;
            grabBuffer();
        }

        private void grabBuffer() {
            while (this.currentElement == null) {
                try {
                    this.currentElement = this.queues.empty.take();
                } catch (InterruptedException e) {
                    if (!this.running) {
                        return;
                    } else {
                        UnilateralSortMerger.LOG.error("Reading thread was interrupted (without being shut down) while grabbing a buffer. Retrying to grab buffer...");
                    }
                }
            }
            this.currentBuffer = this.currentElement.buffer;
            if (!this.currentBuffer.isEmpty()) {
                throw new RuntimeException("New sort-buffer is not empty.");
            }
            if (UnilateralSortMerger.LOG.isDebugEnabled()) {
                UnilateralSortMerger.LOG.debug("Retrieved empty read buffer " + this.currentElement.id + ".");
            }
            this.spillingInThisBuffer = this.currentBuffer.getCapacity() <= this.bytesUntilSpilling;
        }

        public void collect(E e) {
            try {
                if (this.spillingInThisBuffer) {
                    if (this.currentBuffer.write(e)) {
                        if (this.bytesUntilSpilling - this.currentBuffer.getOccupancy() <= 0) {
                            this.bytesUntilSpilling = 0L;
                            this.queues.sort.add(UnilateralSortMerger.spillingMarker());
                            return;
                        }
                        return;
                    }
                } else if (this.currentBuffer.write(e)) {
                    return;
                }
                if (this.bytesUntilSpilling > 0) {
                    this.bytesUntilSpilling -= this.currentBuffer.getCapacity();
                    if (this.bytesUntilSpilling <= 0) {
                        this.bytesUntilSpilling = 0L;
                        this.queues.sort.add(UnilateralSortMerger.spillingMarker());
                    }
                }
                if (UnilateralSortMerger.LOG.isDebugEnabled()) {
                    UnilateralSortMerger.LOG.debug("Emitting full buffer from reader thread: " + this.currentElement.id + ".");
                }
                this.queues.sort.add(this.currentElement);
                this.currentElement = null;
                while (this.running && this.currentElement == null) {
                    try {
                        this.currentElement = this.queues.empty.take();
                    } catch (InterruptedException e2) {
                        if (!this.running) {
                            return;
                        } else {
                            UnilateralSortMerger.LOG.error("Reading thread was interrupted (without being shut down) while grabbing a buffer. Retrying to grab buffer...");
                        }
                    }
                }
                if (this.running) {
                    this.currentBuffer = this.currentElement.buffer;
                    if (!this.currentBuffer.isEmpty()) {
                        throw new RuntimeException("BUG: New sort-buffer is not empty.");
                    }
                    if (UnilateralSortMerger.LOG.isDebugEnabled()) {
                        UnilateralSortMerger.LOG.debug("Retrieved empty read buffer " + this.currentElement.id + ".");
                    }
                    if (!this.currentBuffer.write(e)) {
                        throw new RuntimeException("Record could not be written to empty sort-buffer: Serialized record exceeds buffer capacity.");
                    }
                }
            } catch (IOException e3) {
                throw new RuntimeException("BUG: An error occurred while writing a record to the sort buffer: " + e3.getMessage(), e3);
            }
        }

        public void close() {
            if (this.running) {
                this.running = false;
                if (this.currentBuffer != null && this.currentElement != null) {
                    if (this.currentBuffer.isEmpty()) {
                        this.queues.empty.add(this.currentElement);
                    } else {
                        this.queues.sort.add(this.currentElement);
                        if (UnilateralSortMerger.LOG.isDebugEnabled()) {
                            UnilateralSortMerger.LOG.debug("Emitting last buffer from input collector: " + this.currentElement.id + ".");
                        }
                    }
                }
                this.currentBuffer = null;
                this.currentElement = null;
                this.queues.sort.add(UnilateralSortMerger.endMarker());
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:eu/stratosphere/pact/runtime/sort/UnilateralSortMerger$ReadingThread.class */
    public static class ReadingThread<E> extends ThreadBase<E> {
        private final MutableObjectIterator<E> reader;
        private final long startSpillingBytes;
        private final E readTarget;

        public ReadingThread(ExceptionHandler<IOException> exceptionHandler, MutableObjectIterator<E> mutableObjectIterator, CircularQueues<E> circularQueues, E e, AbstractInvokable abstractInvokable, long j) {
            super(exceptionHandler, "SortMerger Reading Thread", circularQueues, abstractInvokable);
            this.reader = mutableObjectIterator;
            this.readTarget = e;
            this.startSpillingBytes = j;
        }

        /* JADX WARN: Code restructure failed: missing block: B:57:0x01f0, code lost:
        
            if (r14 != false) goto L66;
         */
        /* JADX WARN: Code restructure failed: missing block: B:59:0x01f7, code lost:
        
            if (isRunning() == false) goto L116;
         */
        /* JADX WARN: Code restructure failed: missing block: B:61:0x0201, code lost:
        
            if (r0.next(r0) == false) goto L115;
         */
        /* JADX WARN: Code restructure failed: missing block: B:63:0x020c, code lost:
        
            if (r0.write(r0) != false) goto L118;
         */
        /* JADX WARN: Code restructure failed: missing block: B:65:0x020f, code lost:
        
            r8 = r0;
         */
        /* JADX WARN: Code restructure failed: missing block: B:70:0x0215, code lost:
        
            if (r8 == null) goto L78;
         */
        /* JADX WARN: Code restructure failed: missing block: B:72:0x0220, code lost:
        
            if (eu.stratosphere.pact.runtime.sort.UnilateralSortMerger.LOG.isDebugEnabled() == false) goto L81;
         */
        /* JADX WARN: Code restructure failed: missing block: B:73:0x0223, code lost:
        
            eu.stratosphere.pact.runtime.sort.UnilateralSortMerger.LOG.debug("Emitting full buffer from reader thread: " + r9.id + ".");
         */
        /* JADX WARN: Code restructure failed: missing block: B:75:0x0283, code lost:
        
            if (r0.isEmpty() != false) goto L84;
         */
        /* JADX WARN: Code restructure failed: missing block: B:76:0x0286, code lost:
        
            r5.queues.sort.add(r9);
         */
        /* JADX WARN: Code restructure failed: missing block: B:77:0x02a7, code lost:
        
            r9 = null;
         */
        /* JADX WARN: Code restructure failed: missing block: B:80:0x0298, code lost:
        
            r5.queues.empty.add(r9);
         */
        /* JADX WARN: Code restructure failed: missing block: B:81:0x024a, code lost:
        
            r12 = true;
         */
        /* JADX WARN: Code restructure failed: missing block: B:82:0x0255, code lost:
        
            if (eu.stratosphere.pact.runtime.sort.UnilateralSortMerger.LOG.isDebugEnabled() == false) goto L81;
         */
        /* JADX WARN: Code restructure failed: missing block: B:83:0x0258, code lost:
        
            eu.stratosphere.pact.runtime.sort.UnilateralSortMerger.LOG.debug("Emitting final buffer from reader thread: " + r9.id + ".");
         */
        @Override // eu.stratosphere.pact.runtime.sort.UnilateralSortMerger.ThreadBase
        /*
            Code decompiled incorrectly, please refer to instructions dump.
            To view partially-correct add '--show-bad-code' argument
        */
        public void go() throws java.io.IOException {
            /*
                Method dump skipped, instructions count: 724
                To view this dump add '--comments-level debug' option
            */
            throw new UnsupportedOperationException("Method not decompiled: eu.stratosphere.pact.runtime.sort.UnilateralSortMerger.ReadingThread.go():void");
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:eu/stratosphere/pact/runtime/sort/UnilateralSortMerger$SortingThread.class */
    public static class SortingThread<E> extends ThreadBase<E> {
        private final IndexedSorter sorter;

        public SortingThread(ExceptionHandler<IOException> exceptionHandler, CircularQueues<E> circularQueues, AbstractInvokable abstractInvokable) {
            super(exceptionHandler, "SortMerger sorting thread", circularQueues, abstractInvokable);
            this.sorter = new QuickSort();
        }

        @Override // eu.stratosphere.pact.runtime.sort.UnilateralSortMerger.ThreadBase
        public void go() throws IOException {
            boolean z = true;
            while (isRunning() && z) {
                try {
                    CircularElement<E> take = this.queues.sort.take();
                    if (take != UnilateralSortMerger.EOF_MARKER && take != UnilateralSortMerger.SPILLING_MARKER) {
                        if (UnilateralSortMerger.LOG.isDebugEnabled()) {
                            UnilateralSortMerger.LOG.debug("Sorting buffer " + take.id + ".");
                        }
                        this.sorter.sort(take.buffer);
                        if (UnilateralSortMerger.LOG.isDebugEnabled()) {
                            UnilateralSortMerger.LOG.debug("Sorted buffer " + take.id + ".");
                        }
                    } else if (take == UnilateralSortMerger.EOF_MARKER) {
                        if (UnilateralSortMerger.LOG.isDebugEnabled()) {
                            UnilateralSortMerger.LOG.debug("Sorting thread done.");
                        }
                        z = false;
                    }
                    this.queues.spill.add(take);
                } catch (InterruptedException e) {
                    if (!isRunning()) {
                        return;
                    }
                    if (UnilateralSortMerger.LOG.isErrorEnabled()) {
                        UnilateralSortMerger.LOG.error("Sorting thread was interrupted (without being shut down) while grabbing a buffer. Retrying to grab buffer...");
                    }
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:eu/stratosphere/pact/runtime/sort/UnilateralSortMerger$SpillingThread.class */
    public class SpillingThread extends ThreadBase<E> {
        protected final MemoryManager memManager;
        protected final IOManager ioManager;
        protected final TypeSerializer<E> serializer;
        protected final TypeComparator<E> comparator;
        protected final List<MemorySegment> writeMemory;
        protected final List<MemorySegment> sortReadMemory;
        protected final int maxNumFileHandles;
        protected final int numWriteBuffersToCluster;

        public SpillingThread(ExceptionHandler<IOException> exceptionHandler, CircularQueues<E> circularQueues, AbstractInvokable abstractInvokable, MemoryManager memoryManager, IOManager iOManager, TypeSerializer<E> typeSerializer, TypeComparator<E> typeComparator, List<MemorySegment> list, List<MemorySegment> list2, int i) {
            super(exceptionHandler, "SortMerger spilling thread", circularQueues, abstractInvokable);
            this.memManager = memoryManager;
            this.ioManager = iOManager;
            this.serializer = typeSerializer;
            this.comparator = typeComparator;
            this.sortReadMemory = list;
            this.writeMemory = list2;
            this.maxNumFileHandles = i;
            this.numWriteBuffersToCluster = list2.size() >= 4 ? list2.size() / 2 : 1;
        }

        @Override // eu.stratosphere.pact.runtime.sort.UnilateralSortMerger.ThreadBase
        public void go() throws IOException {
            CircularElement<E> takeNext;
            CircularElement<E> take;
            ArrayDeque arrayDeque = new ArrayDeque();
            boolean z = false;
            while (true) {
                if (!isRunning()) {
                    break;
                }
                try {
                    take = this.queues.spill.take();
                } catch (InterruptedException e) {
                    if (!isRunning()) {
                        return;
                    } else {
                        UnilateralSortMerger.LOG.error("Sorting thread was interrupted (without being shut down) while grabbing a buffer. Retrying to grab buffer...");
                    }
                }
                if (take == UnilateralSortMerger.SPILLING_MARKER) {
                    break;
                }
                if (take == UnilateralSortMerger.EOF_MARKER) {
                    z = true;
                    break;
                }
                arrayDeque.add(take);
            }
            if (isRunning()) {
                if (z) {
                    if (UnilateralSortMerger.LOG.isDebugEnabled()) {
                        UnilateralSortMerger.LOG.debug("Initiating in memory merge.");
                    }
                    ArrayList arrayList = new ArrayList(arrayDeque.size());
                    Iterator<E> it = arrayDeque.iterator();
                    while (it.hasNext()) {
                        arrayList.add(((CircularElement) it.next()).buffer.getIterator());
                    }
                    if (UnilateralSortMerger.LOG.isDebugEnabled()) {
                        UnilateralSortMerger.LOG.debug("Releasing unused sort-buffer memory.");
                    }
                    disposeSortBuffers(true);
                    UnilateralSortMerger.this.setResultIterator(arrayList.isEmpty() ? EmptyMutableObjectIterator.get() : arrayList.size() == 1 ? (MutableObjectIterator) arrayList.get(0) : new MergeIterator<>(arrayList, this.serializer, this.comparator));
                    return;
                }
                Channel.Enumerator createChannelEnumerator = this.ioManager.createChannelEnumerator();
                List<ChannelWithBlockCount> arrayList2 = new ArrayList<>();
                while (isRunning()) {
                    try {
                        takeNext = takeNext(this.queues.spill, arrayDeque);
                    } catch (InterruptedException e2) {
                        if (!isRunning()) {
                            return;
                        } else {
                            UnilateralSortMerger.LOG.error("Sorting thread was interrupted (without being shut down) while grabbing a buffer. Retrying to grab buffer...");
                        }
                    }
                    if (!isRunning()) {
                        return;
                    }
                    if (takeNext == UnilateralSortMerger.EOF_MARKER) {
                        break;
                    }
                    Channel.ID next = createChannelEnumerator.next();
                    registerChannelToBeRemovedAtShudown(next);
                    BlockChannelWriter createBlockChannelWriter = this.ioManager.createBlockChannelWriter(next, this.numWriteBuffersToCluster);
                    registerOpenChannelToBeRemovedAtShudown(createBlockChannelWriter);
                    ChannelWriterOutputView channelWriterOutputView = new ChannelWriterOutputView(createBlockChannelWriter, this.writeMemory, this.memManager.getPageSize());
                    if (UnilateralSortMerger.LOG.isDebugEnabled()) {
                        UnilateralSortMerger.LOG.debug("Spilling buffer " + takeNext.id + ".");
                    }
                    takeNext.buffer.writeToOutput(channelWriterOutputView);
                    if (UnilateralSortMerger.LOG.isDebugEnabled()) {
                        UnilateralSortMerger.LOG.debug("Spilled buffer " + takeNext.id + ".");
                    }
                    channelWriterOutputView.close();
                    unregisterOpenChannelToBeRemovedAtShudown(createBlockChannelWriter);
                    arrayList2.add(new ChannelWithBlockCount(next, channelWriterOutputView.getBlockCount()));
                    takeNext.buffer.reset();
                    this.queues.empty.add(takeNext);
                }
                if (UnilateralSortMerger.LOG.isDebugEnabled()) {
                    UnilateralSortMerger.LOG.debug("Spilling done.");
                    UnilateralSortMerger.LOG.debug("Releasing sort-buffer memory.");
                }
                disposeSortBuffers(false);
                while (isRunning() && arrayList2.size() > this.maxNumFileHandles) {
                    arrayList2 = mergeChannelList(arrayList2, this.sortReadMemory, this.writeMemory);
                }
                this.memManager.release(this.writeMemory);
                this.writeMemory.clear();
                if (arrayList2.isEmpty()) {
                    UnilateralSortMerger.this.setResultIterator(EmptyMutableObjectIterator.get());
                } else {
                    if (UnilateralSortMerger.LOG.isDebugEnabled()) {
                        UnilateralSortMerger.LOG.debug("Beginning final merge.");
                    }
                    List<List<MemorySegment>> arrayList3 = new ArrayList<>(arrayList2.size());
                    getSegmentsForReaders(arrayList3, this.sortReadMemory, arrayList2.size());
                    UnilateralSortMerger.this.setResultIterator(getMergingIterator(arrayList2, arrayList3, new ArrayList<>(arrayList2.size())));
                }
                if (UnilateralSortMerger.LOG.isDebugEnabled()) {
                    UnilateralSortMerger.LOG.debug("Spilling and merging thread done.");
                }
            }
        }

        /* JADX INFO: Access modifiers changed from: protected */
        public final void disposeSortBuffers(boolean z) {
            while (!this.queues.empty.isEmpty()) {
                try {
                    List<MemorySegment> dispose = this.queues.empty.take().buffer.dispose();
                    if (z) {
                        this.memManager.release(dispose);
                    }
                } catch (InterruptedException e) {
                    if (!isRunning()) {
                        return;
                    } else {
                        UnilateralSortMerger.LOG.error("Spilling thread was interrupted (without being shut down) while collecting empty buffers to release them. Retrying to collect buffers...");
                    }
                }
            }
        }

        /* JADX INFO: Access modifiers changed from: protected */
        public final CircularElement<E> takeNext(BlockingQueue<CircularElement<E>> blockingQueue, Queue<CircularElement<E>> queue) throws InterruptedException {
            return queue.isEmpty() ? blockingQueue.take() : queue.poll();
        }

        /* JADX INFO: Access modifiers changed from: protected */
        public final MergeIterator<E> getMergingIterator(List<ChannelWithBlockCount> list, List<List<MemorySegment>> list2, List<BlockChannelAccess<?, ?>> list3) throws IOException {
            if (UnilateralSortMerger.LOG.isDebugEnabled()) {
                UnilateralSortMerger.LOG.debug("Performing merge of " + list.size() + " sorted streams.");
            }
            ArrayList arrayList = new ArrayList(list.size());
            for (int i = 0; i < list.size(); i++) {
                ChannelWithBlockCount channelWithBlockCount = list.get(i);
                List<MemorySegment> list4 = list2.get(i);
                BlockChannelReader createBlockChannelReader = list4.size() >= 4 ? this.ioManager.createBlockChannelReader(channelWithBlockCount.getChannel(), list4.size() / 2) : this.ioManager.createBlockChannelReader(channelWithBlockCount.getChannel());
                list3.add(createBlockChannelReader);
                registerOpenChannelToBeRemovedAtShudown(createBlockChannelReader);
                unregisterChannelToBeRemovedAtShudown(channelWithBlockCount.getChannel());
                arrayList.add(new ChannelReaderInputViewIterator(new ChannelReaderInputView(createBlockChannelReader, list4, channelWithBlockCount.getBlockCount(), false), null, this.serializer));
            }
            return new MergeIterator<>(arrayList, this.serializer, this.comparator);
        }

        /* JADX INFO: Access modifiers changed from: protected */
        public final List<ChannelWithBlockCount> mergeChannelList(List<ChannelWithBlockCount> list, List<MemorySegment> list2, List<MemorySegment> list3) throws IOException {
            double ceil = Math.ceil(list.size() / this.maxNumFileHandles);
            int ceil2 = (int) Math.ceil(list.size() / ceil);
            List<List<MemorySegment>> arrayList = new ArrayList<>(ceil2);
            getSegmentsForReaders(arrayList, list2, ceil2);
            ArrayList arrayList2 = new ArrayList((int) (ceil + 1.0d));
            ArrayList arrayList3 = new ArrayList(ceil2);
            int i = 0;
            while (isRunning() && i < list.size()) {
                arrayList3.clear();
                int i2 = 0;
                while (i2 < ceil2 && i < list.size()) {
                    arrayList3.add(list.get(i));
                    i2++;
                    i++;
                }
                if (arrayList3.size() < 2) {
                    arrayList2.addAll(arrayList3);
                } else {
                    arrayList2.add(mergeChannels(arrayList3, arrayList, list3));
                }
            }
            return arrayList2;
        }

        /* JADX WARN: Multi-variable type inference failed */
        protected ChannelWithBlockCount mergeChannels(List<ChannelWithBlockCount> list, List<List<MemorySegment>> list2, List<MemorySegment> list3) throws IOException {
            List<BlockChannelAccess<?, ?>> arrayList = new ArrayList<>(list.size());
            MergeIterator mergingIterator = getMergingIterator(list, list2, arrayList);
            Channel.ID createChannel = this.ioManager.createChannel();
            registerChannelToBeRemovedAtShudown(createChannel);
            BlockChannelWriter createBlockChannelWriter = this.ioManager.createBlockChannelWriter(createChannel, this.numWriteBuffersToCluster);
            registerOpenChannelToBeRemovedAtShudown(createBlockChannelWriter);
            ChannelWriterOutputView channelWriterOutputView = new ChannelWriterOutputView(createBlockChannelWriter, list3, this.memManager.getPageSize());
            TypeSerializer<E> typeSerializer = this.serializer;
            Object createInstance = typeSerializer.createInstance();
            while (mergingIterator.next(createInstance)) {
                typeSerializer.serialize(createInstance, channelWriterOutputView);
            }
            channelWriterOutputView.close();
            int blockCount = channelWriterOutputView.getBlockCount();
            unregisterOpenChannelToBeRemovedAtShudown(createBlockChannelWriter);
            for (int i = 0; i < arrayList.size(); i++) {
                BlockChannelAccess<?, ?> blockChannelAccess = arrayList.get(i);
                blockChannelAccess.closeAndDelete();
                unregisterOpenChannelToBeRemovedAtShudown(blockChannelAccess);
            }
            return new ChannelWithBlockCount(createChannel, blockCount);
        }

        /* JADX INFO: Access modifiers changed from: protected */
        public final void getSegmentsForReaders(List<List<MemorySegment>> list, List<MemorySegment> list2, int i) {
            int size = list2.size();
            int i2 = size / i;
            int i3 = size % i;
            Iterator<MemorySegment> it = list2.iterator();
            for (int i4 = 0; i4 < i3; i4++) {
                ArrayList arrayList = new ArrayList(i2 + 1);
                list.add(arrayList);
                for (int i5 = i2; i5 >= 0; i5--) {
                    arrayList.add(it.next());
                }
            }
            for (int i6 = i3; i6 < i; i6++) {
                ArrayList arrayList2 = new ArrayList(i2);
                list.add(arrayList2);
                for (int i7 = i2; i7 > 0; i7--) {
                    arrayList2.add(it.next());
                }
            }
        }

        /* JADX INFO: Access modifiers changed from: protected */
        public void registerChannelToBeRemovedAtShudown(Channel.ID id) {
            UnilateralSortMerger.this.channelsToDeleteAtShutdown.add(id);
        }

        protected void unregisterChannelToBeRemovedAtShudown(Channel.ID id) {
            UnilateralSortMerger.this.channelsToDeleteAtShutdown.remove(id);
        }

        /* JADX INFO: Access modifiers changed from: protected */
        public void registerOpenChannelToBeRemovedAtShudown(BlockChannelAccess<?, ?> blockChannelAccess) {
            UnilateralSortMerger.this.openChannels.add(blockChannelAccess);
        }

        /* JADX INFO: Access modifiers changed from: protected */
        public void unregisterOpenChannelToBeRemovedAtShudown(BlockChannelAccess<?, ?> blockChannelAccess) {
            UnilateralSortMerger.this.openChannels.remove(blockChannelAccess);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:eu/stratosphere/pact/runtime/sort/UnilateralSortMerger$ThreadBase.class */
    public static abstract class ThreadBase<E> extends Thread implements Thread.UncaughtExceptionHandler {
        protected final CircularQueues<E> queues;
        private final ExceptionHandler<IOException> exceptionHandler;
        private final AbstractInvokable parentTask;
        private volatile boolean alive;

        protected ThreadBase(ExceptionHandler<IOException> exceptionHandler, String str, CircularQueues<E> circularQueues, AbstractInvokable abstractInvokable) {
            super(str);
            setDaemon(true);
            this.exceptionHandler = exceptionHandler;
            setUncaughtExceptionHandler(this);
            this.queues = circularQueues;
            this.parentTask = abstractInvokable;
            this.alive = true;
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            try {
                try {
                    if (this.parentTask != null) {
                        this.parentTask.userThreadStarted(this);
                    }
                    go();
                    if (this.parentTask != null) {
                        this.parentTask.userThreadFinished(this);
                    }
                } catch (Throwable th) {
                    internalHandleException(new IOException("Thread '" + getName() + "' terminated due to an exception: " + th.getMessage(), th));
                    if (this.parentTask != null) {
                        this.parentTask.userThreadFinished(this);
                    }
                }
            } catch (Throwable th2) {
                if (this.parentTask != null) {
                    this.parentTask.userThreadFinished(this);
                }
                throw th2;
            }
        }

        protected abstract void go() throws IOException;

        public boolean isRunning() {
            return this.alive;
        }

        public void shutdown() {
            this.alive = false;
            interrupt();
        }

        protected final void internalHandleException(IOException iOException) {
            if (isRunning() && this.exceptionHandler != null) {
                try {
                    this.exceptionHandler.handleException(iOException);
                } catch (Throwable th) {
                }
            }
        }

        @Override // java.lang.Thread.UncaughtExceptionHandler
        public void uncaughtException(Thread thread, Throwable th) {
            internalHandleException(new IOException("Thread '" + thread.getName() + "' terminated due to an uncaught exception: " + th.getMessage(), th));
        }
    }

    public UnilateralSortMerger(MemoryManager memoryManager, IOManager iOManager, MutableObjectIterator<E> mutableObjectIterator, AbstractInvokable abstractInvokable, TypeSerializer<E> typeSerializer, TypeComparator<E> typeComparator, long j, int i, float f) throws IOException, MemoryAllocationException {
        this(memoryManager, iOManager, mutableObjectIterator, abstractInvokable, typeSerializer, typeComparator, j, -1, i, f);
    }

    public UnilateralSortMerger(MemoryManager memoryManager, IOManager iOManager, MutableObjectIterator<E> mutableObjectIterator, AbstractInvokable abstractInvokable, TypeSerializer<E> typeSerializer, TypeComparator<E> typeComparator, long j, int i, int i2, float f) throws IOException, MemoryAllocationException {
        this(memoryManager, iOManager, mutableObjectIterator, abstractInvokable, typeSerializer, typeComparator, j, i, i2, f, false);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public UnilateralSortMerger(MemoryManager memoryManager, IOManager iOManager, MutableObjectIterator<E> mutableObjectIterator, AbstractInvokable abstractInvokable, TypeSerializer<E> typeSerializer, TypeComparator<E> typeComparator, long j, int i, int i2, float f, boolean z) throws IOException, MemoryAllocationException {
        int max;
        this.iteratorLock = new Object();
        if (((memoryManager == null) | (iOManager == null && !z) | (typeSerializer == null)) || (typeComparator == null)) {
            throw new NullPointerException();
        }
        if (abstractInvokable == null) {
            throw new NullPointerException("Parent Task must not be null.");
        }
        if (i2 < 2) {
            throw new IllegalArgumentException("Merger cannot work with less than two file handles.");
        }
        this.memoryManager = memoryManager;
        int checkedDownCast = MathUtils.checkedDownCast(memoryManager.roundDownToPageSizeMultiple(j) / memoryManager.getPageSize());
        if (checkedDownCast < 34) {
            throw new IllegalArgumentException("Too little memory provided to sorter to perform task. Required are at least 34 pages. Current page size is " + memoryManager.getPageSize() + " bytes.");
        }
        if (z) {
            max = 0;
        } else {
            int i3 = 2 + i2;
            if (2 + (2 * i2) > checkedDownCast) {
                max = 2;
                if (i3 > checkedDownCast) {
                    i2 = checkedDownCast - 2;
                    if (LOG.isWarnEnabled()) {
                        LOG.warn("Reducing maximal merge fan-in to " + i2 + " due to limited memory availability during merge");
                    }
                }
            } else {
                max = Math.max(2, Math.min(Math.min(MAX_NUM_WRITE_BUFFERS, checkedDownCast - 32), Math.min(checkedDownCast / (i2 + 1), checkedDownCast / MAX_NUM_WRITE_BUFFERS)));
            }
        }
        int i4 = checkedDownCast - max;
        long pageSize = i4 * memoryManager.getPageSize();
        i = i < 1 ? pageSize > 100663296 ? 3 : i4 >= MAX_NUM_WRITE_BUFFERS ? 2 : 1 : i;
        int i5 = i4 / i;
        if (LOG.isDebugEnabled()) {
            LOG.debug("Instantiating sorter with " + i4 + " pages of sorting memory (=" + pageSize + " bytes total) divided over " + i + " sort buffers (" + i5 + " pages per buffer). Using " + max + " buffers for writing sorted results and merging maximally " + i2 + " streams at once.");
        }
        this.writeMemory = new ArrayList<>(max);
        this.sortReadMemory = new ArrayList<>(i4);
        memoryManager.allocatePages(abstractInvokable, (List<MemorySegment>) this.sortReadMemory, i4);
        if (max > 0) {
            memoryManager.allocatePages(abstractInvokable, (List<MemorySegment>) this.writeMemory, max);
        }
        CircularQueues<E> circularQueues = new CircularQueues<>();
        Iterator<MemorySegment> it = this.sortReadMemory.iterator();
        int i6 = 0;
        while (i6 < i) {
            ArrayList arrayList = new ArrayList(i5);
            for (int i7 = i6 == i - 1 ? Integer.MAX_VALUE : i5; i7 > 0 && it.hasNext(); i7--) {
                arrayList.add(it.next());
            }
            TypeComparator duplicate = typeComparator.duplicate();
            circularQueues.empty.add(new CircularElement<>(i6, (!duplicate.supportsSerializationWithKeyNormalization() || typeSerializer.getLength() <= 0 || typeSerializer.getLength() > 32) ? new NormalizedKeySorter(typeSerializer, duplicate, arrayList) : new FixedLengthRecordSorter(typeSerializer, duplicate, arrayList)));
            i6++;
        }
        ExceptionHandler<IOException> exceptionHandler = new ExceptionHandler<IOException>() { // from class: eu.stratosphere.pact.runtime.sort.UnilateralSortMerger.1
            @Override // eu.stratosphere.pact.runtime.sort.ExceptionHandler
            public void handleException(IOException iOException) {
                if (UnilateralSortMerger.this.closed) {
                    return;
                }
                UnilateralSortMerger.this.setResultIteratorException(iOException);
                UnilateralSortMerger.this.close();
            }
        };
        this.channelsToDeleteAtShutdown = new HashSet<>(MAX_NUM_WRITE_BUFFERS);
        this.openChannels = new HashSet<>(MAX_NUM_WRITE_BUFFERS);
        this.readThread = getReadingThread(exceptionHandler, mutableObjectIterator, circularQueues, abstractInvokable, typeSerializer, f * ((float) pageSize));
        this.sortThread = getSortingThread(exceptionHandler, circularQueues, abstractInvokable);
        this.spillThread = getSpillingThread(exceptionHandler, circularQueues, abstractInvokable, memoryManager, iOManager, typeSerializer, typeComparator, this.sortReadMemory, this.writeMemory, i2);
        startThreads();
    }

    protected void startThreads() {
        if (this.readThread != null) {
            this.readThread.start();
        }
        if (this.sortThread != null) {
            this.sortThread.start();
        }
        if (this.spillThread != null) {
            this.spillThread.start();
        }
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        boolean isEmpty;
        boolean isEmpty2;
        synchronized (this) {
            if (this.closed) {
                return;
            }
            this.closed = true;
            try {
                synchronized (this.iteratorLock) {
                    if (this.iteratorException == null) {
                        this.iteratorException = new IOException("The sorter has been closed.");
                        this.iteratorLock.notifyAll();
                    }
                }
                if (this.readThread != null) {
                    try {
                        this.readThread.shutdown();
                    } catch (Throwable th) {
                        LOG.error("Error shutting down reader thread: " + th.getMessage(), th);
                    }
                }
                if (this.sortThread != null) {
                    try {
                        this.sortThread.shutdown();
                    } catch (Throwable th2) {
                        LOG.error("Error shutting down sorter thread: " + th2.getMessage(), th2);
                    }
                }
                if (this.spillThread != null) {
                    try {
                        this.spillThread.shutdown();
                    } catch (Throwable th3) {
                        LOG.error("Error shutting down spilling thread: " + th3.getMessage(), th3);
                    }
                }
                try {
                    if (this.readThread != null) {
                        this.readThread.join();
                    }
                    if (this.sortThread != null) {
                        this.sortThread.join();
                    }
                    if (this.spillThread != null) {
                        this.spillThread.join();
                    }
                } catch (InterruptedException e) {
                    LOG.debug("Closing of sort/merger was interrupted. The reading/sorting/spilling threads may still be working.", e);
                }
                while (true) {
                    if (isEmpty) {
                        break;
                    }
                }
                while (true) {
                    if (isEmpty2) {
                        return;
                    }
                }
            } finally {
                try {
                    if (!this.writeMemory.isEmpty()) {
                        this.memoryManager.release(this.writeMemory);
                    }
                    this.writeMemory.clear();
                } catch (Throwable th4) {
                }
                try {
                    if (!this.sortReadMemory.isEmpty()) {
                        this.memoryManager.release(this.sortReadMemory);
                    }
                    this.sortReadMemory.clear();
                } catch (Throwable th5) {
                }
                while (!this.openChannels.isEmpty()) {
                    try {
                        Iterator<BlockChannelAccess<?, ?>> it = this.openChannels.iterator();
                        while (it.hasNext()) {
                            BlockChannelAccess<?, ?> next = it.next();
                            it.remove();
                            next.closeAndDelete();
                        }
                    } catch (Throwable th6) {
                    }
                }
                while (!this.channelsToDeleteAtShutdown.isEmpty()) {
                    try {
                        Iterator<Channel.ID> it2 = this.channelsToDeleteAtShutdown.iterator();
                        while (it2.hasNext()) {
                            Channel.ID next2 = it2.next();
                            it2.remove();
                            try {
                                File file = new File(next2.getPath());
                                if (file.exists()) {
                                    file.delete();
                                }
                            } catch (Throwable th7) {
                            }
                        }
                    } catch (Throwable th8) {
                    }
                }
            }
        }
    }

    protected ThreadBase<E> getReadingThread(ExceptionHandler<IOException> exceptionHandler, MutableObjectIterator<E> mutableObjectIterator, CircularQueues<E> circularQueues, AbstractInvokable abstractInvokable, TypeSerializer<E> typeSerializer, long j) {
        return new ReadingThread(exceptionHandler, mutableObjectIterator, circularQueues, typeSerializer.createInstance(), abstractInvokable, j);
    }

    protected ThreadBase<E> getSortingThread(ExceptionHandler<IOException> exceptionHandler, CircularQueues<E> circularQueues, AbstractInvokable abstractInvokable) {
        return new SortingThread(exceptionHandler, circularQueues, abstractInvokable);
    }

    protected ThreadBase<E> getSpillingThread(ExceptionHandler<IOException> exceptionHandler, CircularQueues<E> circularQueues, AbstractInvokable abstractInvokable, MemoryManager memoryManager, IOManager iOManager, TypeSerializer<E> typeSerializer, TypeComparator<E> typeComparator, List<MemorySegment> list, List<MemorySegment> list2, int i) {
        return new SpillingThread(exceptionHandler, circularQueues, abstractInvokable, memoryManager, iOManager, typeSerializer, typeComparator, list, list2, i);
    }

    @Override // eu.stratosphere.pact.runtime.sort.Sorter, eu.stratosphere.pact.runtime.task.util.CloseableInputProvider
    public MutableObjectIterator<E> getIterator() throws InterruptedException {
        MutableObjectIterator<E> mutableObjectIterator;
        synchronized (this.iteratorLock) {
            while (this.iterator == null && this.iteratorException == null) {
                this.iteratorLock.wait();
            }
            if (this.iteratorException != null) {
                throw new RuntimeException("Error obtaining the sorted input: " + this.iteratorException.getMessage(), this.iteratorException);
            }
            mutableObjectIterator = this.iterator;
        }
        return mutableObjectIterator;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public final void setResultIterator(MutableObjectIterator<E> mutableObjectIterator) {
        synchronized (this.iteratorLock) {
            if (this.iteratorException == null) {
                this.iterator = mutableObjectIterator;
                this.iteratorLock.notifyAll();
            }
        }
    }

    protected final void setResultIteratorException(IOException iOException) {
        synchronized (this.iteratorLock) {
            if (this.iteratorException == null) {
                this.iteratorException = iOException;
                this.iteratorLock.notifyAll();
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public static <T> CircularElement<T> endMarker() {
        return (CircularElement<T>) EOF_MARKER;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public static <T> CircularElement<T> spillingMarker() {
        return (CircularElement<T>) SPILLING_MARKER;
    }
}
