package edu.iu.dsc.tws.examples.task.streaming;

import edu.iu.dsc.tws.api.comms.messaging.types.MessageTypes;
import edu.iu.dsc.tws.api.comms.messaging.types.PrimitiveMessageTypes;
import edu.iu.dsc.tws.api.compute.TaskContext;
import edu.iu.dsc.tws.api.compute.nodes.ISink;
import edu.iu.dsc.tws.api.config.Config;
import edu.iu.dsc.tws.examples.task.BenchTaskWorker;
import edu.iu.dsc.tws.examples.verification.ResultsVerifier;
import edu.iu.dsc.tws.examples.verification.comparators.IntArrayComparator;
import edu.iu.dsc.tws.task.impl.ComputeGraphBuilder;
import edu.iu.dsc.tws.task.typed.streaming.SPartitionCompute;
import java.util.List;
import java.util.logging.Logger;

/* loaded from: input_file:edu/iu/dsc/tws/examples/task/streaming/STPartitionExample.class */
public class STPartitionExample extends BenchTaskWorker {
    private static final Logger LOG = Logger.getLogger(STPartitionExample.class.getName());

    /* loaded from: input_file:edu/iu/dsc/tws/examples/task/streaming/STPartitionExample$PartitionSinkTask.class */
    protected static class PartitionSinkTask extends SPartitionCompute<int[]> implements ISink {
        private static final long serialVersionUID = -254264903510284798L;
        private ResultsVerifier<int[], int[]> resultsVerifier;
        private boolean timingCondition;
        private boolean verified = true;
        private int count = 0;
        private int countTotal = 0;
        private int expectedWarmups = 0;
        private int expectedTotal = 0;
        private int expectedTotalFromAll = 0;

        protected PartitionSinkTask() {
        }

        public void prepare(Config config, TaskContext taskContext) {
            super.prepare(config, taskContext);
            this.timingCondition = BenchTaskWorker.getTimingCondition("sink", this.context);
            this.resultsVerifier = new ResultsVerifier<>(STPartitionExample.inputDataArray, (iArr, map) -> {
                return iArr;
            }, IntArrayComparator.getInstance());
            STPartitionExample.receiversInProgress.incrementAndGet();
            int size = taskContext.getTasksByName("sink").size();
            this.expectedWarmups = STPartitionExample.jobParameters.getWarmupIterations() / size;
            if (STPartitionExample.jobParameters.getWarmupIterations() % size > 0 && STPartitionExample.jobParameters.getWarmupIterations() % size > taskContext.taskIndex()) {
                this.expectedWarmups++;
            }
            this.expectedTotal = STPartitionExample.jobParameters.getTotalIterations() / size;
            if (STPartitionExample.jobParameters.getTotalIterations() % size > 0 && STPartitionExample.jobParameters.getWarmupIterations() % size > taskContext.taskIndex()) {
                this.expectedTotal++;
            }
            this.expectedTotalFromAll = this.expectedTotal * taskContext.getTasksByName("source").size();
            STPartitionExample.LOG.info(String.format("%d expecting %d warmups and %d total", Integer.valueOf(taskContext.taskIndex()), Integer.valueOf(this.expectedWarmups), Integer.valueOf(this.expectedTotal)));
        }

        public boolean partition(int[] iArr) {
            this.countTotal++;
            if (this.countTotal == this.expectedTotalFromAll) {
                STPartitionExample.receiversInProgress.decrementAndGet();
            }
            this.verified = STPartitionExample.verifyResults(this.resultsVerifier, iArr, null, this.verified);
            return true;
        }
    }

    @Override // edu.iu.dsc.tws.examples.task.BenchTaskWorker
    public ComputeGraphBuilder buildTaskGraph() {
        List<Integer> taskStages = jobParameters.getTaskStages();
        int intValue = taskStages.get(0).intValue();
        int intValue2 = taskStages.get(1).intValue();
        PrimitiveMessageTypes primitiveMessageTypes = MessageTypes.INTEGER_ARRAY;
        BenchTaskWorker.SourceTask sourceTask = new BenchTaskWorker.SourceTask("edge");
        sourceTask.setMarkTimingOnlyForLowestTarget(true);
        PartitionSinkTask partitionSinkTask = new PartitionSinkTask();
        this.computeGraphBuilder.addSource("source", sourceTask, intValue);
        this.computeConnection = this.computeGraphBuilder.addSink("sink", partitionSinkTask, intValue2);
        this.computeConnection.partition("source").viaEdge("edge").withDataType(primitiveMessageTypes);
        return this.computeGraphBuilder;
    }
}
