package edu.iu.dsc.tws.examples.task;

import edu.iu.dsc.tws.api.compute.TaskContext;
import edu.iu.dsc.tws.api.compute.executor.ExecutionPlan;
import edu.iu.dsc.tws.api.compute.executor.IExecution;
import edu.iu.dsc.tws.api.compute.graph.ComputeGraph;
import edu.iu.dsc.tws.api.compute.graph.OperationMode;
import edu.iu.dsc.tws.api.compute.nodes.BaseSource;
import edu.iu.dsc.tws.api.compute.schedule.elements.TaskInstancePlan;
import edu.iu.dsc.tws.api.config.Config;
import edu.iu.dsc.tws.api.resource.IPersistentVolume;
import edu.iu.dsc.tws.api.resource.IVolatileVolume;
import edu.iu.dsc.tws.api.resource.IWorker;
import edu.iu.dsc.tws.api.resource.IWorkerController;
import edu.iu.dsc.tws.examples.comms.DataGenerator;
import edu.iu.dsc.tws.examples.comms.JobParameters;
import edu.iu.dsc.tws.examples.task.streaming.windowing.data.EventTimeData;
import edu.iu.dsc.tws.examples.utils.bench.BenchmarkConstants;
import edu.iu.dsc.tws.examples.utils.bench.BenchmarkResultsRecorder;
import edu.iu.dsc.tws.examples.utils.bench.Timing;
import edu.iu.dsc.tws.examples.utils.bench.TimingUnit;
import edu.iu.dsc.tws.examples.verification.ResultsVerifier;
import edu.iu.dsc.tws.task.ComputeEnvironment;
import edu.iu.dsc.tws.task.impl.ComputeConnection;
import edu.iu.dsc.tws.task.impl.ComputeGraphBuilder;
import edu.iu.dsc.tws.task.window.BaseWindowSource;
import java.util.Comparator;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Logger;

/* loaded from: input_file:edu/iu/dsc/tws/examples/task/BenchTaskWorker.class */
public abstract class BenchTaskWorker implements IWorker {
    protected static final String SOURCE = "source";
    protected static final String SINK = "sink";
    protected ComputeGraph computeGraph;
    protected ComputeGraphBuilder computeGraphBuilder;
    protected ExecutionPlan executionPlan;
    protected ComputeConnection computeConnection;
    protected static JobParameters jobParameters;
    protected static int[] inputDataArray;
    protected static BenchmarkResultsRecorder resultsRecorder;
    private static final Logger LOG = Logger.getLogger(BenchTaskWorker.class.getName());
    protected static AtomicInteger sendersInProgress = new AtomicInteger();
    protected static AtomicInteger receiversInProgress = new AtomicInteger();

    /* loaded from: input_file:edu/iu/dsc/tws/examples/task/BenchTaskWorker$SourceTask.class */
    protected static class SourceTask extends BaseSource {
        private static final long serialVersionUID = -254264903510284748L;
        private int count;
        private String edge;
        private int iterations;
        private boolean timingCondition;
        private boolean keyed;
        private boolean endNotified;
        private boolean markTimingOnlyForLowestTarget;
        private int noOfTargets;

        public SourceTask(String str) {
            this.count = 0;
            this.keyed = false;
            this.endNotified = false;
            this.markTimingOnlyForLowestTarget = false;
            this.noOfTargets = 0;
            this.iterations = BenchTaskWorker.jobParameters.getIterations() + BenchTaskWorker.jobParameters.getWarmupIterations();
            this.edge = str;
        }

        public SourceTask(String str, boolean z) {
            this(str);
            this.keyed = z;
        }

        public void setMarkTimingOnlyForLowestTarget(boolean z) {
            this.markTimingOnlyForLowestTarget = z;
        }

        public void prepare(Config config, TaskContext taskContext) {
            super.prepare(config, taskContext);
            this.timingCondition = BenchTaskWorker.getTimingCondition(BenchTaskWorker.SOURCE, taskContext);
            this.noOfTargets = taskContext.getTasksByName(BenchTaskWorker.SINK).size();
            BenchTaskWorker.sendersInProgress.incrementAndGet();
        }

        protected void notifyEnd() {
            if (this.endNotified) {
                return;
            }
            BenchTaskWorker.sendersInProgress.decrementAndGet();
            this.endNotified = true;
            BenchTaskWorker.LOG.info(String.format("Source : %d done sending.", Integer.valueOf(this.context.taskIndex())));
        }

        public void execute() {
            if (this.count >= this.iterations) {
                this.context.end(this.edge);
                notifyEnd();
                return;
            }
            if (this.count == BenchTaskWorker.jobParameters.getWarmupIterations()) {
                Timing.mark(BenchmarkConstants.TIMING_ALL_SEND, this.timingCondition);
            }
            if ((this.keyed && this.context.write(this.edge, Integer.valueOf(this.context.taskIndex()), BenchTaskWorker.inputDataArray)) || (!this.keyed && this.context.write(this.edge, BenchTaskWorker.inputDataArray))) {
                this.count++;
            }
            if (!BenchTaskWorker.jobParameters.isStream() || this.count < BenchTaskWorker.jobParameters.getWarmupIterations()) {
                return;
            }
            Timing.mark(BenchmarkConstants.TIMING_MESSAGE_SEND, this.timingCondition && (!this.markTimingOnlyForLowestTarget || (this.count % this.noOfTargets == 0)));
        }
    }

    /* loaded from: input_file:edu/iu/dsc/tws/examples/task/BenchTaskWorker$SourceWindowTask.class */
    protected static class SourceWindowTask extends BaseWindowSource {
        private static final long serialVersionUID = -254264903510284748L;
        private String edge;
        private boolean timingCondition;
        private int count = 0;
        private boolean keyed = false;
        private boolean endNotified = false;
        private boolean markTimingOnlyForLowestTarget = false;
        private int noOfTargets = 0;
        private int iterations = BenchTaskWorker.jobParameters.getIterations() + BenchTaskWorker.jobParameters.getWarmupIterations();

        public SourceWindowTask(String str) {
            this.edge = str;
        }

        public void prepare(Config config, TaskContext taskContext) {
            super.prepare(config, taskContext);
            this.timingCondition = BenchTaskWorker.getTimingCondition(BenchTaskWorker.SOURCE, taskContext);
            this.noOfTargets = taskContext.getTasksByName(BenchTaskWorker.SINK).size();
            BenchTaskWorker.sendersInProgress.incrementAndGet();
        }

        private void notifyEnd() {
            if (this.endNotified) {
                return;
            }
            BenchTaskWorker.sendersInProgress.decrementAndGet();
            this.endNotified = true;
            BenchTaskWorker.LOG.info(String.format("Source : %d done sending.", Integer.valueOf(this.context.taskIndex())));
        }

        public void execute() {
            if (this.count >= this.iterations) {
                this.context.end(this.edge);
                notifyEnd();
                return;
            }
            if (this.count == BenchTaskWorker.jobParameters.getWarmupIterations()) {
                Timing.mark(BenchmarkConstants.TIMING_ALL_SEND, this.timingCondition);
            }
            if ((this.keyed && this.context.write(this.edge, Integer.valueOf(this.context.taskIndex()), BenchTaskWorker.inputDataArray)) || (!this.keyed && this.context.write(this.edge, BenchTaskWorker.inputDataArray))) {
                this.count++;
            }
            if (!BenchTaskWorker.jobParameters.isStream() || this.count < BenchTaskWorker.jobParameters.getWarmupIterations()) {
                return;
            }
            Timing.mark(BenchmarkConstants.TIMING_MESSAGE_SEND, this.timingCondition && (!this.markTimingOnlyForLowestTarget || (this.count % this.noOfTargets == 0)));
        }
    }

    /* loaded from: input_file:edu/iu/dsc/tws/examples/task/BenchTaskWorker$SourceWindowTimeStampTask.class */
    protected static class SourceWindowTimeStampTask extends BaseWindowSource {
        private static final long serialVersionUID = -254264903510284748L;
        private String edge;
        private boolean timingCondition;
        private int count = 0;
        private boolean keyed = false;
        private long prevTime = System.currentTimeMillis();
        private boolean endNotified = false;
        private boolean markTimingOnlyForLowestTarget = false;
        private int noOfTargets = 0;
        private int iterations = BenchTaskWorker.jobParameters.getIterations() + BenchTaskWorker.jobParameters.getWarmupIterations();

        public SourceWindowTimeStampTask(String str) {
            this.edge = str;
        }

        public void prepare(Config config, TaskContext taskContext) {
            super.prepare(config, taskContext);
            this.timingCondition = BenchTaskWorker.getTimingCondition(BenchTaskWorker.SOURCE, taskContext);
            this.noOfTargets = taskContext.getTasksByName(BenchTaskWorker.SINK).size();
            BenchTaskWorker.sendersInProgress.incrementAndGet();
        }

        private void notifyEnd() {
            if (this.endNotified) {
                return;
            }
            BenchTaskWorker.sendersInProgress.decrementAndGet();
            this.endNotified = true;
            BenchTaskWorker.LOG.info(String.format("Source : %d done sending.", Integer.valueOf(this.context.taskIndex())));
        }

        public void execute() {
            if (this.count >= this.iterations) {
                this.context.end(this.edge);
                notifyEnd();
                return;
            }
            if (this.count == BenchTaskWorker.jobParameters.getWarmupIterations()) {
                Timing.mark(BenchmarkConstants.TIMING_ALL_SEND, this.timingCondition);
            }
            long currentTimeMillis = System.currentTimeMillis();
            EventTimeData eventTimeData = new EventTimeData(BenchTaskWorker.inputDataArray, this.count, currentTimeMillis);
            this.prevTime = currentTimeMillis;
            if ((this.keyed && this.context.write(this.edge, Integer.valueOf(this.context.taskIndex()), eventTimeData)) || (!this.keyed && this.context.write(this.edge, eventTimeData))) {
                this.count++;
            }
            if (!BenchTaskWorker.jobParameters.isStream() || this.count < BenchTaskWorker.jobParameters.getWarmupIterations()) {
                return;
            }
            Timing.mark(BenchmarkConstants.TIMING_MESSAGE_SEND, this.timingCondition && (!this.markTimingOnlyForLowestTarget || (this.count % this.noOfTargets == 0)));
        }
    }

    public void execute(Config config, int i, IWorkerController iWorkerController, IPersistentVolume iPersistentVolume, IVolatileVolume iVolatileVolume) {
        ComputeEnvironment init = ComputeEnvironment.init(config, i, iWorkerController, iPersistentVolume, iVolatileVolume);
        if (resultsRecorder == null) {
            resultsRecorder = new BenchmarkResultsRecorder(config, i == 0);
        }
        Timing.setDefaultTimingUnit(TimingUnit.NANO_SECONDS);
        jobParameters = JobParameters.build(config);
        this.computeGraphBuilder = ComputeGraphBuilder.newBuilder(config);
        if (jobParameters.isStream()) {
            this.computeGraphBuilder.setMode(OperationMode.STREAMING);
        } else {
            this.computeGraphBuilder.setMode(OperationMode.BATCH);
        }
        inputDataArray = DataGenerator.generateIntData(jobParameters.getSize());
        buildTaskGraph();
        this.computeGraph = this.computeGraphBuilder.build();
        this.executionPlan = init.getTaskExecutor().plan(this.computeGraph);
        IExecution iExecute = init.getTaskExecutor().iExecute(this.computeGraph, this.executionPlan);
        if (jobParameters.isStream()) {
            while (iExecute.progress() && (sendersInProgress.get() != 0 || receiversInProgress.get() != 0)) {
            }
            long currentTimeMillis = System.currentTimeMillis();
            LOG.info("Streaming Example task will wait 10secs to finish communication...");
            while (System.currentTimeMillis() - currentTimeMillis < 10000) {
                iExecute.progress();
            }
            LOG.info("Stopping execution....");
            iExecute.stop();
            iExecute.close();
        }
        do {
        } while (iExecute.progress());
        LOG.info("Stopping execution....");
        iExecute.stop();
        iExecute.close();
    }

    public abstract ComputeGraphBuilder buildTaskGraph();

    /* JADX INFO: Access modifiers changed from: protected */
    public static boolean verifyResults(ResultsVerifier resultsVerifier, Object obj, Map<String, Object> map, boolean z) {
        boolean z2 = z;
        if (jobParameters.isDoVerify()) {
            z2 = z && resultsVerifier.verify(obj, map);
            resultsRecorder.recordColumn("Verified", Boolean.valueOf(z));
        } else {
            resultsRecorder.recordColumn("Verified", "Not Performed");
        }
        return z2;
    }

    public static boolean getTimingCondition(String str, TaskContext taskContext) {
        Optional min = taskContext.getTasksInThisWorkerByName(str).stream().min(Comparator.comparingInt((v0) -> {
            return v0.getTaskIndex();
        }));
        if (min.isPresent()) {
            return taskContext.getWorkerId() == 0 && taskContext.taskIndex() == ((TaskInstancePlan) min.get()).getTaskIndex();
        }
        LOG.warning("Couldn't find lowest ID task for source");
        return false;
    }
}
