package edu.iu.dsc.tws.examples.compatibility.storm.bench;

import edu.iu.dsc.tws.api.JobConfig;
import edu.iu.dsc.tws.api.Twister2Job;
import edu.iu.dsc.tws.api.comms.Op;
import edu.iu.dsc.tws.api.comms.messaging.types.MessageTypes;
import edu.iu.dsc.tws.api.compute.IMessage;
import edu.iu.dsc.tws.api.compute.TaskContext;
import edu.iu.dsc.tws.api.compute.graph.ComputeGraph;
import edu.iu.dsc.tws.api.compute.graph.OperationMode;
import edu.iu.dsc.tws.api.compute.nodes.IComputableSink;
import edu.iu.dsc.tws.api.compute.nodes.ISource;
import edu.iu.dsc.tws.api.config.Config;
import edu.iu.dsc.tws.rsched.core.ResourceAllocator;
import edu.iu.dsc.tws.rsched.job.Twister2Submitter;
import edu.iu.dsc.tws.task.impl.ComputeGraphBuilder;
import edu.iu.dsc.tws.task.impl.TaskWorker;
import java.io.BufferedWriter;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.logging.Level;
import java.util.logging.Logger;

/* loaded from: input_file:edu/iu/dsc/tws/examples/compatibility/storm/bench/StormBenchmark.class */
public class StormBenchmark extends TaskWorker {
    private static final Logger LOG = Logger.getLogger(StormBenchmark.class.getName());
    private static final String PARAM_SIZE = "size";
    private static final String PARAM_PARALLEL_SOURCES = "parallel-sources";
    private static final String PARAM_MESSAGES_COUNT = "messages-count";
    private static final String PARAM_OPERATION = "operation";

    /* loaded from: input_file:edu/iu/dsc/tws/examples/compatibility/storm/bench/StormBenchmark$DataSink.class */
    public static class DataSink implements IComputableSink {
        private static final long serialVersionUID = -254264903510284798L;
        private List<Long> timeStamps;
        private int messageCount;
        private boolean done;
        private int dataSize;
        private String operation;
        private int parallelSources;

        public boolean execute(IMessage iMessage) {
            if (this.done) {
                StormBenchmark.LOG.info("Data collection done. Kill and rerun....");
                return true;
            }
            this.timeStamps.add(Long.valueOf(System.nanoTime()));
            if (this.timeStamps.size() != this.messageCount) {
                return true;
            }
            this.done = true;
            dumpData();
            return true;
        }

        private void dumpData() {
            try {
                File file = new File(this.parallelSources + "_" + this.operation + "_" + this.dataSize + ".csv");
                BufferedWriter bufferedWriter = new BufferedWriter(new FileWriter(file));
                Iterator<Long> it = this.timeStamps.iterator();
                while (it.hasNext()) {
                    bufferedWriter.write(it.next().toString());
                    bufferedWriter.newLine();
                }
                bufferedWriter.close();
                StormBenchmark.LOG.info("File written to " + file.getAbsolutePath());
            } catch (IOException e) {
                StormBenchmark.LOG.log(Level.SEVERE, "Error in dumping results");
            }
        }

        public void prepare(Config config, TaskContext taskContext) {
            this.messageCount = config.getIntegerValue(StormBenchmark.PARAM_MESSAGES_COUNT, 1000).intValue();
            this.dataSize = config.getIntegerValue("size", -1).intValue();
            this.operation = config.getStringValue(StormBenchmark.PARAM_OPERATION);
            this.parallelSources = config.getIntegerValue(StormBenchmark.PARAM_PARALLEL_SOURCES, -1).intValue();
            this.timeStamps = new ArrayList(this.messageCount);
        }
    }

    /* loaded from: input_file:edu/iu/dsc/tws/examples/compatibility/storm/bench/StormBenchmark$Generator.class */
    public static class Generator implements ISource {
        private static final long serialVersionUID = -254264903510284798L;
        private TaskContext taskContext;
        private double[] data;

        public void execute() {
            this.taskContext.write("edge", this.data);
            try {
                Thread.sleep(1L);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

        public void prepare(Config config, TaskContext taskContext) {
            int intValue = config.getIntegerValue("size", 1024).intValue();
            StormBenchmark.LOG.info("Generating data for size " + intValue);
            this.data = new double[intValue / 8];
            Arrays.fill(this.data, 1.0d);
            this.taskContext = taskContext;
        }
    }

    public void execute() {
        Integer integerValue = this.config.getIntegerValue(PARAM_PARALLEL_SOURCES, 256);
        ComputeGraphBuilder newBuilder = ComputeGraphBuilder.newBuilder(Config.newBuilder().build());
        Generator generator = new Generator();
        DataSink dataSink = new DataSink();
        newBuilder.addSource("generator", generator, integerValue.intValue());
        if ("reduce".equals(this.config.get(PARAM_OPERATION))) {
            newBuilder.addSink("sink", dataSink).reduce("generator").viaEdge("edge").withOperation(Op.SUM, MessageTypes.DOUBLE_ARRAY);
        } else {
            newBuilder.addSink("sink", dataSink).gather("generator").viaEdge("edge");
        }
        newBuilder.setMode(OperationMode.STREAMING);
        ComputeGraph build = newBuilder.build();
        this.taskExecutor.execute(build, this.taskExecutor.plan(build));
    }

    public static void main(String[] strArr) {
        Config loadConfig = ResourceAllocator.loadConfig(Collections.emptyMap());
        JobConfig jobConfig = new JobConfig();
        jobConfig.put("size", Integer.valueOf(strArr[0]));
        int intValue = Integer.valueOf(strArr[1]).intValue();
        jobConfig.put(PARAM_PARALLEL_SOURCES, Integer.valueOf(intValue));
        jobConfig.put(PARAM_MESSAGES_COUNT, Integer.valueOf(strArr[2]));
        jobConfig.put(PARAM_OPERATION, strArr.length == 4 ? strArr[3] : "reduce");
        Twister2Job.Twister2JobBuilder newBuilder = Twister2Job.newBuilder();
        newBuilder.setJobName("storm-bench-mark");
        newBuilder.setWorkerClass(StormBenchmark.class.getName());
        newBuilder.setConfig(jobConfig);
        newBuilder.addComputeResource(1.0d, 1024, intValue + 1);
        Twister2Submitter.submitJob(newBuilder.build(), loadConfig);
    }
}
