package org.gridgain.grid.kernal.processors.hadoop.shuffle;

import java.util.Iterator;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.gridgain.grid.GridException;
import org.gridgain.grid.GridFuture;
import org.gridgain.grid.hadoop.GridHadoopJobId;
import org.gridgain.grid.hadoop.GridHadoopMapReducePlan;
import org.gridgain.grid.hadoop.GridHadoopTaskContext;
import org.gridgain.grid.hadoop.GridHadoopTaskInput;
import org.gridgain.grid.hadoop.GridHadoopTaskOutput;
import org.gridgain.grid.kernal.GridTopic;
import org.gridgain.grid.kernal.processors.hadoop.GridHadoopComponent;
import org.gridgain.grid.kernal.processors.hadoop.GridHadoopContext;
import org.gridgain.grid.kernal.processors.hadoop.message.GridHadoopMessage;
import org.gridgain.grid.lang.GridBiPredicate;
import org.gridgain.grid.util.future.GridFinishedFutureEx;
import org.gridgain.grid.util.lang.GridInClosure2X;
import org.gridgain.grid.util.offheap.unsafe.GridUnsafeMemory;
import org.gridgain.grid.util.typedef.F;
import org.gridgain.grid.util.typedef.internal.U;

/* loaded from: input_file:org/gridgain/grid/kernal/processors/hadoop/shuffle/GridHadoopShuffle.class */
public class GridHadoopShuffle extends GridHadoopComponent {
    private final ConcurrentMap<GridHadoopJobId, GridHadoopShuffleJob<UUID>> jobs = new ConcurrentHashMap();
    protected final GridUnsafeMemory mem = new GridUnsafeMemory(0);
    static final /* synthetic */ boolean $assertionsDisabled;

    @Override // org.gridgain.grid.kernal.processors.hadoop.GridHadoopComponent
    public void start(GridHadoopContext gridHadoopContext) throws GridException {
        super.start(gridHadoopContext);
        gridHadoopContext.kernalContext().io().addUserMessageListener(GridTopic.TOPIC_HADOOP, new GridBiPredicate<UUID, Object>() { // from class: org.gridgain.grid.kernal.processors.hadoop.shuffle.GridHadoopShuffle.1
            public boolean apply(UUID uuid, Object obj) {
                return GridHadoopShuffle.this.onMessageReceived(uuid, (GridHadoopMessage) obj);
            }
        });
    }

    @Override // org.gridgain.grid.kernal.processors.hadoop.GridHadoopComponent
    public void stop(boolean z) {
        Iterator<GridHadoopShuffleJob<UUID>> it = this.jobs.values().iterator();
        while (it.hasNext()) {
            try {
                it.next().close();
            } catch (GridException e) {
                U.error(this.log, "Failed to close job.", e);
            }
        }
        this.jobs.clear();
    }

    private GridHadoopShuffleJob<UUID> newJob(GridHadoopJobId gridHadoopJobId) throws GridException {
        GridHadoopMapReducePlan plan = this.ctx.jobTracker().plan(gridHadoopJobId);
        GridHadoopShuffleJob<UUID> gridHadoopShuffleJob = new GridHadoopShuffleJob<>(this.ctx.localNodeId(), this.log, this.ctx.jobTracker().job(gridHadoopJobId, null), this.mem, plan.reducers(), plan.reducers(this.ctx.localNodeId()));
        UUID[] uuidArr = new UUID[plan.reducers()];
        for (int i = 0; i < uuidArr.length; i++) {
            UUID nodeForReducer = plan.nodeForReducer(i);
            if (!$assertionsDisabled && nodeForReducer == null) {
                throw new AssertionError("Plan is missing node for reducer [plan=" + plan + ", rdc=" + i + ']');
            }
            uuidArr[i] = nodeForReducer;
        }
        boolean initializeReduceAddresses = gridHadoopShuffleJob.initializeReduceAddresses(uuidArr);
        if ($assertionsDisabled || initializeReduceAddresses) {
            return gridHadoopShuffleJob;
        }
        throw new AssertionError();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void send0(UUID uuid, Object obj) throws GridException {
        this.ctx.kernalContext().io().sendUserMessage(F.asList(this.ctx.kernalContext().discovery().node(uuid)), obj, GridTopic.TOPIC_HADOOP, false, 0L);
    }

    private GridHadoopShuffleJob<UUID> job(GridHadoopJobId gridHadoopJobId) throws GridException {
        GridHadoopShuffleJob<UUID> gridHadoopShuffleJob = this.jobs.get(gridHadoopJobId);
        if (gridHadoopShuffleJob == null) {
            gridHadoopShuffleJob = newJob(gridHadoopJobId);
            GridHadoopShuffleJob<UUID> putIfAbsent = this.jobs.putIfAbsent(gridHadoopJobId, gridHadoopShuffleJob);
            if (putIfAbsent != null) {
                gridHadoopShuffleJob.close();
                gridHadoopShuffleJob = putIfAbsent;
            } else if (gridHadoopShuffleJob.reducersInitialized()) {
                startSending(gridHadoopShuffleJob);
            }
        }
        return gridHadoopShuffleJob;
    }

    private void startSending(GridHadoopShuffleJob<UUID> gridHadoopShuffleJob) {
        gridHadoopShuffleJob.startSending(this.ctx.kernalContext().gridName(), new GridInClosure2X<UUID, GridHadoopShuffleMessage>() { // from class: org.gridgain.grid.kernal.processors.hadoop.shuffle.GridHadoopShuffle.2
            public void applyx(UUID uuid, GridHadoopShuffleMessage gridHadoopShuffleMessage) throws GridException {
                GridHadoopShuffle.this.send0(uuid, gridHadoopShuffleMessage);
            }
        });
    }

    public boolean onMessageReceived(UUID uuid, GridHadoopMessage gridHadoopMessage) {
        if (!(gridHadoopMessage instanceof GridHadoopShuffleMessage)) {
            if (!(gridHadoopMessage instanceof GridHadoopShuffleAck)) {
                throw new IllegalStateException("Unknown message type received to Hadoop shuffle [src=" + uuid + ", msg=" + gridHadoopMessage + ']');
            }
            GridHadoopShuffleAck gridHadoopShuffleAck = (GridHadoopShuffleAck) gridHadoopMessage;
            try {
                job(gridHadoopShuffleAck.jobId()).onShuffleAck(gridHadoopShuffleAck);
                return true;
            } catch (GridException e) {
                U.error(this.log, "Message handling failed.", e);
                return true;
            }
        }
        GridHadoopShuffleMessage gridHadoopShuffleMessage = (GridHadoopShuffleMessage) gridHadoopMessage;
        try {
            job(gridHadoopShuffleMessage.jobId()).onShuffleMessage(gridHadoopShuffleMessage);
        } catch (GridException e2) {
            U.error(this.log, "Message handling failed.", e2);
        }
        try {
            send0(uuid, new GridHadoopShuffleAck(gridHadoopShuffleMessage.id(), gridHadoopShuffleMessage.jobId()));
            return true;
        } catch (GridException e3) {
            U.error(this.log, "Failed to reply back to shuffle message sender [snd=" + uuid + ", msg=" + gridHadoopMessage + ']', e3);
            return true;
        }
    }

    public GridHadoopTaskOutput output(GridHadoopTaskContext gridHadoopTaskContext) throws GridException {
        return job(gridHadoopTaskContext.taskInfo().jobId()).output(gridHadoopTaskContext);
    }

    public GridHadoopTaskInput input(GridHadoopTaskContext gridHadoopTaskContext) throws GridException {
        return job(gridHadoopTaskContext.taskInfo().jobId()).input(gridHadoopTaskContext);
    }

    public void jobFinished(GridHadoopJobId gridHadoopJobId) {
        GridHadoopShuffleJob<UUID> remove = this.jobs.remove(gridHadoopJobId);
        if (remove != null) {
            try {
                remove.close();
            } catch (GridException e) {
                U.error(this.log, "Failed to close job: " + gridHadoopJobId, e);
            }
        }
    }

    public GridFuture<?> flush(GridHadoopJobId gridHadoopJobId) {
        GridHadoopShuffleJob<UUID> gridHadoopShuffleJob = this.jobs.get(gridHadoopJobId);
        if (gridHadoopShuffleJob == null) {
            return new GridFinishedFutureEx();
        }
        try {
            return gridHadoopShuffleJob.flush();
        } catch (GridException e) {
            return new GridFinishedFutureEx(e);
        }
    }

    public GridUnsafeMemory memory() {
        return this.mem;
    }

    static {
        $assertionsDisabled = !GridHadoopShuffle.class.desiredAssertionStatus();
    }
}
