package org.gridgain.grid.kernal.processors.hadoop.shuffle;

import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicReferenceArray;
import org.gridgain.grid.GridException;
import org.gridgain.grid.GridFuture;
import org.gridgain.grid.GridInterruptedException;
import org.gridgain.grid.hadoop.GridHadoopInputSplit;
import org.gridgain.grid.hadoop.GridHadoopJob;
import org.gridgain.grid.hadoop.GridHadoopJobProperty;
import org.gridgain.grid.hadoop.GridHadoopPartitioner;
import org.gridgain.grid.hadoop.GridHadoopTaskContext;
import org.gridgain.grid.hadoop.GridHadoopTaskInfo;
import org.gridgain.grid.hadoop.GridHadoopTaskInput;
import org.gridgain.grid.hadoop.GridHadoopTaskOutput;
import org.gridgain.grid.hadoop.GridHadoopTaskType;
import org.gridgain.grid.kernal.processors.hadoop.counter.GridHadoopPerformanceCounter;
import org.gridgain.grid.kernal.processors.hadoop.shuffle.GridHadoopShuffleMessage;
import org.gridgain.grid.kernal.processors.hadoop.shuffle.collections.GridHadoopConcurrentHashMultimap;
import org.gridgain.grid.kernal.processors.hadoop.shuffle.collections.GridHadoopMultimap;
import org.gridgain.grid.kernal.processors.hadoop.shuffle.collections.GridHadoopSkipList;
import org.gridgain.grid.lang.GridBiTuple;
import org.gridgain.grid.lang.GridInClosure;
import org.gridgain.grid.logger.GridLogger;
import org.gridgain.grid.thread.GridThread;
import org.gridgain.grid.util.future.GridCompoundFuture;
import org.gridgain.grid.util.future.GridFinishedFutureEx;
import org.gridgain.grid.util.future.GridFutureAdapterEx;
import org.gridgain.grid.util.io.GridUnsafeDataInput;
import org.gridgain.grid.util.lang.GridClosureException;
import org.gridgain.grid.util.lang.GridInClosure2X;
import org.gridgain.grid.util.offheap.unsafe.GridUnsafeMemory;
import org.gridgain.grid.util.typedef.F;
import org.gridgain.grid.util.typedef.internal.U;
import org.gridgain.grid.util.worker.GridWorker;

/* loaded from: input_file:org/gridgain/grid/kernal/processors/hadoop/shuffle/GridHadoopShuffleJob.class */
public class GridHadoopShuffleJob<T> implements AutoCloseable {
    private static final int MSG_BUF_SIZE = 131072;
    private final GridHadoopJob job;
    private final GridUnsafeMemory mem;
    private final boolean needPartitioner;
    private T[] reduceAddrs;
    private final T locReduceAddr;
    private final GridHadoopShuffleMessage[] msgs;
    private final AtomicReferenceArray<GridHadoopMultimap> maps;
    private volatile GridInClosure2X<T, GridHadoopShuffleMessage> io;
    private volatile GridWorker snd;
    private volatile boolean flushed;
    private final GridLogger log;
    static final /* synthetic */ boolean $assertionsDisabled;
    private final Map<Integer, GridHadoopTaskContext> reducersCtx = new HashMap();
    protected ConcurrentMap<Long, GridBiTuple<GridHadoopShuffleMessage, GridFutureAdapterEx<?>>> sentMsgs = new ConcurrentHashMap();
    private final CountDownLatch ioInitLatch = new CountDownLatch(1);

    /* renamed from: org.gridgain.grid.kernal.processors.hadoop.shuffle.GridHadoopShuffleJob$6, reason: invalid class name */
    /* loaded from: input_file:org/gridgain/grid/kernal/processors/hadoop/shuffle/GridHadoopShuffleJob$6.class */
    static /* synthetic */ class AnonymousClass6 {
        static final /* synthetic */ int[] $SwitchMap$org$gridgain$grid$hadoop$GridHadoopTaskType = new int[GridHadoopTaskType.values().length];

        static {
            try {
                $SwitchMap$org$gridgain$grid$hadoop$GridHadoopTaskType[GridHadoopTaskType.MAP.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$gridgain$grid$hadoop$GridHadoopTaskType[GridHadoopTaskType.COMBINE.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$org$gridgain$grid$hadoop$GridHadoopTaskType[GridHadoopTaskType.REDUCE.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
        }
    }

    /* loaded from: input_file:org/gridgain/grid/kernal/processors/hadoop/shuffle/GridHadoopShuffleJob$PartitionedOutput.class */
    private class PartitionedOutput implements GridHadoopTaskOutput {
        private final GridHadoopTaskOutput[] adders;
        private GridHadoopPartitioner partitioner;
        private final GridHadoopTaskContext taskCtx;

        private PartitionedOutput(GridHadoopTaskContext gridHadoopTaskContext) throws GridException {
            this.adders = new GridHadoopTaskOutput[GridHadoopShuffleJob.this.maps.length()];
            this.taskCtx = gridHadoopTaskContext;
            if (GridHadoopShuffleJob.this.needPartitioner) {
                this.partitioner = gridHadoopTaskContext.partitioner();
            }
        }

        public void write(Object obj, Object obj2) throws GridException {
            int i = 0;
            if (this.partitioner != null) {
                i = this.partitioner.partition(obj, obj2, this.adders.length);
                if (i < 0 || i >= this.adders.length) {
                    throw new GridException("Invalid partition: " + i);
                }
            }
            GridHadoopTaskOutput gridHadoopTaskOutput = this.adders[i];
            if (gridHadoopTaskOutput == null) {
                GridHadoopMultimap.Adder startAdding = GridHadoopShuffleJob.this.getOrCreateMap(GridHadoopShuffleJob.this.maps, i).startAdding(this.taskCtx);
                gridHadoopTaskOutput = startAdding;
                this.adders[i] = startAdding;
            }
            gridHadoopTaskOutput.write(obj, obj2);
        }

        public void close() throws GridException {
            for (GridHadoopTaskOutput gridHadoopTaskOutput : this.adders) {
                if (gridHadoopTaskOutput != null) {
                    gridHadoopTaskOutput.close();
                }
            }
        }
    }

    /* loaded from: input_file:org/gridgain/grid/kernal/processors/hadoop/shuffle/GridHadoopShuffleJob$UnsafeValue.class */
    private static class UnsafeValue implements GridHadoopMultimap.Value {
        private final byte[] buf;
        private int off;
        private int size;
        static final /* synthetic */ boolean $assertionsDisabled;

        private UnsafeValue(byte[] bArr) {
            if (!$assertionsDisabled && bArr == null) {
                throw new AssertionError();
            }
            this.buf = bArr;
        }

        @Override // org.gridgain.grid.kernal.processors.hadoop.shuffle.collections.GridHadoopMultimap.Value
        public int size() {
            return this.size;
        }

        @Override // org.gridgain.grid.kernal.processors.hadoop.shuffle.collections.GridHadoopMultimap.Value
        public void copyTo(long j) {
            GridUnsafeMemory.UNSAFE.copyMemory(this.buf, GridUnsafeMemory.BYTE_ARR_OFF + this.off, (Object) null, j, this.size);
        }

        static {
            $assertionsDisabled = !GridHadoopShuffleJob.class.desiredAssertionStatus();
        }
    }

    public GridHadoopShuffleJob(T t, GridLogger gridLogger, GridHadoopJob gridHadoopJob, GridUnsafeMemory gridUnsafeMemory, int i, int[] iArr) throws GridException {
        this.locReduceAddr = t;
        this.job = gridHadoopJob;
        this.mem = gridUnsafeMemory;
        this.log = gridLogger.getLogger(GridHadoopShuffleJob.class);
        if (!F.isEmpty(iArr)) {
            for (int i2 : iArr) {
                this.reducersCtx.put(Integer.valueOf(i2), gridHadoopJob.getTaskContext(new GridHadoopTaskInfo(GridHadoopTaskType.REDUCE, gridHadoopJob.id(), i2, 0, (GridHadoopInputSplit) null)));
            }
        }
        this.needPartitioner = i > 1;
        this.maps = new AtomicReferenceArray<>(i);
        this.msgs = new GridHadoopShuffleMessage[i];
    }

    public boolean initializeReduceAddresses(T[] tArr) {
        if (this.reduceAddrs != null) {
            return false;
        }
        this.reduceAddrs = tArr;
        return true;
    }

    public boolean reducersInitialized() {
        return this.reduceAddrs != null;
    }

    public void startSending(String str, GridInClosure2X<T, GridHadoopShuffleMessage> gridInClosure2X) {
        if (!$assertionsDisabled && this.snd != null) {
            throw new AssertionError();
        }
        if (!$assertionsDisabled && gridInClosure2X == null) {
            throw new AssertionError();
        }
        this.io = gridInClosure2X;
        if (!this.flushed) {
            this.snd = new GridWorker(str, "hadoop-shuffle-" + this.job.id(), this.log) { // from class: org.gridgain.grid.kernal.processors.hadoop.shuffle.GridHadoopShuffleJob.1
                protected void body() throws InterruptedException {
                    while (!isCancelled()) {
                        try {
                            Thread.sleep(5L);
                            GridHadoopShuffleJob.this.collectUpdatesAndSend(false);
                        } catch (GridException e) {
                            throw new IllegalStateException((Throwable) e);
                        }
                    }
                }
            };
            new GridThread(this.snd).start();
        }
        this.ioInitLatch.countDown();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public GridHadoopMultimap getOrCreateMap(AtomicReferenceArray<GridHadoopMultimap> atomicReferenceArray, int i) {
        GridHadoopMultimap gridHadoopMultimap = atomicReferenceArray.get(i);
        if (gridHadoopMultimap == null) {
            gridHadoopMultimap = GridHadoopJobProperty.get(this.job.info(), GridHadoopJobProperty.SHUFFLE_REDUCER_NO_SORTING, false) ? new GridHadoopConcurrentHashMultimap(this.job.info(), this.mem, GridHadoopJobProperty.get(this.job.info(), GridHadoopJobProperty.PARTITION_HASHMAP_SIZE, 8192)) : new GridHadoopSkipList(this.job.info(), this.mem);
            if (!atomicReferenceArray.compareAndSet(i, null, gridHadoopMultimap)) {
                gridHadoopMultimap.close();
                return atomicReferenceArray.get(i);
            }
        }
        return gridHadoopMultimap;
    }

    public void onShuffleMessage(GridHadoopShuffleMessage gridHadoopShuffleMessage) throws GridException {
        if (!$assertionsDisabled && gridHadoopShuffleMessage.buffer() == null) {
            throw new AssertionError();
        }
        if (!$assertionsDisabled && gridHadoopShuffleMessage.offset() <= 0) {
            throw new AssertionError();
        }
        GridHadoopTaskContext gridHadoopTaskContext = this.reducersCtx.get(Integer.valueOf(gridHadoopShuffleMessage.reducer()));
        GridHadoopPerformanceCounter.getCounter(gridHadoopTaskContext.counters(), null).onShuffleMessage(gridHadoopShuffleMessage.reducer(), U.currentTimeMillis());
        final GridHadoopMultimap.Adder startAdding = getOrCreateMap(this.maps, gridHadoopShuffleMessage.reducer()).startAdding(gridHadoopTaskContext);
        Throwable th = null;
        try {
            try {
                final GridUnsafeDataInput gridUnsafeDataInput = new GridUnsafeDataInput();
                final UnsafeValue unsafeValue = new UnsafeValue(gridHadoopShuffleMessage.buffer());
                gridHadoopShuffleMessage.visit(new GridHadoopShuffleMessage.Visitor() { // from class: org.gridgain.grid.kernal.processors.hadoop.shuffle.GridHadoopShuffleJob.2
                    private GridHadoopMultimap.Key key;

                    @Override // org.gridgain.grid.kernal.processors.hadoop.shuffle.GridHadoopShuffleMessage.Visitor
                    public void onKey(byte[] bArr, int i, int i2) throws GridException {
                        gridUnsafeDataInput.bytes(bArr, i, i + i2);
                        this.key = startAdding.addKey(gridUnsafeDataInput, this.key);
                    }

                    @Override // org.gridgain.grid.kernal.processors.hadoop.shuffle.GridHadoopShuffleMessage.Visitor
                    public void onValue(byte[] bArr, int i, int i2) {
                        unsafeValue.off = i;
                        unsafeValue.size = i2;
                        this.key.add(unsafeValue);
                    }
                });
                if (startAdding != null) {
                    if (0 == 0) {
                        startAdding.close();
                        return;
                    }
                    try {
                        startAdding.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (startAdding != null) {
                if (th != null) {
                    try {
                        startAdding.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    startAdding.close();
                }
            }
            throw th4;
        }
    }

    public void onShuffleAck(GridHadoopShuffleAck gridHadoopShuffleAck) {
        GridBiTuple<GridHadoopShuffleMessage, GridFutureAdapterEx<?>> gridBiTuple = this.sentMsgs.get(Long.valueOf(gridHadoopShuffleAck.id()));
        if (gridBiTuple != null) {
            ((GridFutureAdapterEx) gridBiTuple.get2()).onDone();
        } else {
            this.log.warning("Received shuffle ack for not registered shuffle id: " + gridHadoopShuffleAck);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void collectUpdatesAndSend(boolean z) throws GridException {
        for (int i = 0; i < this.maps.length(); i++) {
            GridHadoopMultimap gridHadoopMultimap = this.maps.get(i);
            if (gridHadoopMultimap != null && !this.locReduceAddr.equals(this.reduceAddrs[i])) {
                if (this.msgs[i] == null) {
                    this.msgs[i] = new GridHadoopShuffleMessage(this.job.id(), i, MSG_BUF_SIZE);
                }
                final int i2 = i;
                gridHadoopMultimap.visit(false, new GridHadoopMultimap.Visitor() { // from class: org.gridgain.grid.kernal.processors.hadoop.shuffle.GridHadoopShuffleJob.3
                    private long keyPtr;
                    private int keySize;
                    private boolean keyAdded;

                    @Override // org.gridgain.grid.kernal.processors.hadoop.shuffle.collections.GridHadoopMultimap.Visitor
                    public void onKey(long j, int i3) {
                        this.keyPtr = j;
                        this.keySize = i3;
                        this.keyAdded = false;
                    }

                    private boolean tryAdd(long j, int i3) {
                        GridHadoopShuffleMessage gridHadoopShuffleMessage = GridHadoopShuffleJob.this.msgs[i2];
                        if (this.keyAdded) {
                            if (!gridHadoopShuffleMessage.available(i3, true)) {
                                return false;
                            }
                            gridHadoopShuffleMessage.addValue(j, i3);
                            return true;
                        }
                        if (!gridHadoopShuffleMessage.available(this.keySize + i3, false)) {
                            return false;
                        }
                        gridHadoopShuffleMessage.addKey(this.keyPtr, this.keySize);
                        gridHadoopShuffleMessage.addValue(j, i3);
                        this.keyAdded = true;
                        return true;
                    }

                    @Override // org.gridgain.grid.kernal.processors.hadoop.shuffle.collections.GridHadoopMultimap.Visitor
                    public void onValue(long j, int i3) {
                        if (tryAdd(j, i3)) {
                            return;
                        }
                        GridHadoopShuffleJob.this.send(i2, this.keySize + i3);
                        this.keyAdded = false;
                        if (!tryAdd(j, i3)) {
                            throw new IllegalStateException();
                        }
                    }
                });
                if (z && this.msgs[i].offset() != 0) {
                    send(i, 0);
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void send(int i, int i2) {
        GridFutureAdapterEx gridFutureAdapterEx = new GridFutureAdapterEx();
        GridHadoopShuffleMessage gridHadoopShuffleMessage = this.msgs[i];
        final long id = gridHadoopShuffleMessage.id();
        GridBiTuple<GridHadoopShuffleMessage, GridFutureAdapterEx<?>> putIfAbsent = this.sentMsgs.putIfAbsent(Long.valueOf(id), new GridBiTuple<>(gridHadoopShuffleMessage, gridFutureAdapterEx));
        if (!$assertionsDisabled && putIfAbsent != null) {
            throw new AssertionError();
        }
        try {
            this.io.apply(this.reduceAddrs[i], gridHadoopShuffleMessage);
        } catch (GridClosureException e) {
            gridFutureAdapterEx.onDone(U.unwrap(e));
        }
        gridFutureAdapterEx.listenAsync(new GridInClosure<GridFuture<?>>() { // from class: org.gridgain.grid.kernal.processors.hadoop.shuffle.GridHadoopShuffleJob.4
            public void apply(GridFuture<?> gridFuture) {
                try {
                    gridFuture.get();
                    GridHadoopShuffleJob.this.sentMsgs.remove(Long.valueOf(id));
                } catch (GridException e2) {
                    GridHadoopShuffleJob.this.log.error("Failed to send message.", e2);
                }
            }
        });
        this.msgs[i] = i2 == 0 ? null : new GridHadoopShuffleMessage(this.job.id(), i, Math.max(MSG_BUF_SIZE, i2));
    }

    @Override // java.lang.AutoCloseable
    public void close() throws GridException {
        if (this.snd != null) {
            this.snd.cancel();
            try {
                this.snd.join();
            } catch (InterruptedException e) {
                throw new GridInterruptedException(e);
            }
        }
        close(this.maps);
    }

    private void close(AtomicReferenceArray<GridHadoopMultimap> atomicReferenceArray) {
        for (int i = 0; i < atomicReferenceArray.length(); i++) {
            GridHadoopMultimap gridHadoopMultimap = atomicReferenceArray.get(i);
            if (gridHadoopMultimap != null) {
                gridHadoopMultimap.close();
            }
        }
    }

    public GridFuture<?> flush() throws GridException {
        if (this.log.isDebugEnabled()) {
            this.log.debug("Flushing job " + this.job.id() + " on address " + this.locReduceAddr);
        }
        this.flushed = true;
        if (this.maps.length() == 0) {
            return new GridFinishedFutureEx();
        }
        U.await(this.ioInitLatch);
        GridWorker gridWorker = this.snd;
        if (gridWorker != null) {
            if (this.log.isDebugEnabled()) {
                this.log.debug("Cancelling sender thread.");
            }
            gridWorker.cancel();
            try {
                gridWorker.join();
                if (this.log.isDebugEnabled()) {
                    this.log.debug("Finished waiting for sending thread to complete on shuffle job flush: " + this.job.id());
                }
            } catch (InterruptedException e) {
                throw new GridInterruptedException(e);
            }
        }
        collectUpdatesAndSend(true);
        if (this.log.isDebugEnabled()) {
            this.log.debug("Finished sending collected updates to remote reducers: " + this.job.id());
        }
        GridCompoundFuture gridCompoundFuture = new GridCompoundFuture();
        Iterator<GridBiTuple<GridHadoopShuffleMessage, GridFutureAdapterEx<?>>> it = this.sentMsgs.values().iterator();
        while (it.hasNext()) {
            gridCompoundFuture.add((GridFuture) it.next().get2());
        }
        gridCompoundFuture.markInitialized();
        if (this.log.isDebugEnabled()) {
            this.log.debug("Collected futures to compound futures for flush: " + this.sentMsgs.size());
        }
        return gridCompoundFuture;
    }

    public GridHadoopTaskOutput output(GridHadoopTaskContext gridHadoopTaskContext) throws GridException {
        switch (AnonymousClass6.$SwitchMap$org$gridgain$grid$hadoop$GridHadoopTaskType[gridHadoopTaskContext.taskInfo().type().ordinal()]) {
            case 1:
                if (!$assertionsDisabled && this.job.info().hasCombiner()) {
                    throw new AssertionError("The output creation is allowed if combiner has not been defined.");
                }
                break;
            case 2:
                break;
            default:
                throw new IllegalStateException("Illegal type: " + gridHadoopTaskContext.taskInfo().type());
        }
        return new PartitionedOutput(gridHadoopTaskContext);
    }

    public GridHadoopTaskInput input(GridHadoopTaskContext gridHadoopTaskContext) throws GridException {
        switch (AnonymousClass6.$SwitchMap$org$gridgain$grid$hadoop$GridHadoopTaskType[gridHadoopTaskContext.taskInfo().type().ordinal()]) {
            case 3:
                GridHadoopMultimap gridHadoopMultimap = this.maps.get(gridHadoopTaskContext.taskInfo().taskNumber());
                return gridHadoopMultimap != null ? gridHadoopMultimap.input(gridHadoopTaskContext) : new GridHadoopTaskInput() { // from class: org.gridgain.grid.kernal.processors.hadoop.shuffle.GridHadoopShuffleJob.5
                    public boolean next() {
                        return false;
                    }

                    public Object key() {
                        throw new IllegalStateException();
                    }

                    public Iterator<?> values() {
                        throw new IllegalStateException();
                    }

                    public void close() {
                    }
                };
            default:
                throw new IllegalStateException("Illegal type: " + gridHadoopTaskContext.taskInfo().type());
        }
    }

    static {
        $assertionsDisabled = !GridHadoopShuffleJob.class.desiredAssertionStatus();
    }
}
