package edu.iu.dsc.tws.examples.batch.kmeans.checkpointing;

import edu.iu.dsc.tws.api.JobConfig;
import edu.iu.dsc.tws.api.Twister2Job;
import edu.iu.dsc.tws.api.checkpointing.Snapshot;
import edu.iu.dsc.tws.api.comms.packing.types.ObjectPacker;
import edu.iu.dsc.tws.api.comms.packing.types.primitive.IntegerPacker;
import edu.iu.dsc.tws.api.compute.executor.ExecutionPlan;
import edu.iu.dsc.tws.api.compute.graph.ComputeGraph;
import edu.iu.dsc.tws.api.config.Config;
import edu.iu.dsc.tws.api.dataset.DataObject;
import edu.iu.dsc.tws.api.resource.IPersistentVolume;
import edu.iu.dsc.tws.api.resource.IVolatileVolume;
import edu.iu.dsc.tws.api.resource.IWorker;
import edu.iu.dsc.tws.api.resource.IWorkerController;
import edu.iu.dsc.tws.checkpointing.worker.CheckpointingWorkerEnv;
import edu.iu.dsc.tws.examples.Utils;
import edu.iu.dsc.tws.examples.batch.cdfw.CDFConstants;
import edu.iu.dsc.tws.examples.batch.kmeans.KMeansWorker;
import edu.iu.dsc.tws.examples.batch.kmeans.KMeansWorkerParameters;
import edu.iu.dsc.tws.examples.batch.kmeans.KMeansWorkerUtils;
import edu.iu.dsc.tws.examples.comms.Constants;
import edu.iu.dsc.tws.rsched.core.ResourceAllocator;
import edu.iu.dsc.tws.rsched.job.Twister2Submitter;
import edu.iu.dsc.tws.task.ComputeEnvironment;
import edu.iu.dsc.tws.task.impl.TaskExecutor;
import java.util.Arrays;
import java.util.HashMap;
import java.util.logging.Logger;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.DefaultParser;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;

/* loaded from: input_file:edu/iu/dsc/tws/examples/batch/kmeans/checkpointing/KMeansCheckpointingWorker.class */
public class KMeansCheckpointingWorker implements IWorker {
    private static final Logger LOG = Logger.getLogger(KMeansCheckpointingWorker.class.getName());
    private static final String I_KEY = "iter";
    private static final String CENT_OBJ = "centDataObj";

    public void execute(Config config, int i, IWorkerController iWorkerController, IPersistentVolume iPersistentVolume, IVolatileVolume iVolatileVolume) {
        DataObject dataObject;
        ComputeEnvironment init = ComputeEnvironment.init(config, i, iWorkerController, iPersistentVolume, iVolatileVolume);
        CheckpointingWorkerEnv build = CheckpointingWorkerEnv.newBuilder(config, i, iWorkerController).registerVariable(I_KEY, IntegerPacker.getInstance()).registerVariable(CENT_OBJ, ObjectPacker.getInstance()).build();
        Snapshot snapshot = build.getSnapshot();
        TaskExecutor taskExecutor = init.getTaskExecutor();
        LOG.info("Task worker starting: " + i + " Current snapshot ver: " + snapshot.getVersion());
        KMeansWorkerParameters build2 = KMeansWorkerParameters.build(config);
        KMeansWorkerUtils kMeansWorkerUtils = new KMeansWorkerUtils(config);
        int parallelismValue = build2.getParallelismValue();
        int dimension = build2.getDimension();
        int numFiles = build2.getNumFiles();
        int dsize = build2.getDsize();
        int csize = build2.getCsize();
        int iterations = build2.getIterations();
        String str = build2.getDatapointDirectory() + i;
        String str2 = build2.getCentroidDirectory() + i;
        kMeansWorkerUtils.generateDatapoints(dimension, numFiles, dsize, csize, str, str2);
        long currentTimeMillis = System.currentTimeMillis();
        ComputeGraph buildDataPointsTG = KMeansWorker.buildDataPointsTG(str, dsize, parallelismValue, dimension, config);
        ExecutionPlan plan = taskExecutor.plan(buildDataPointsTG);
        taskExecutor.execute(buildDataPointsTG, plan);
        DataObject output = taskExecutor.getOutput(buildDataPointsTG, plan, "datapointsink");
        if (snapshot.checkpointAvailable(CENT_OBJ)) {
            dataObject = (DataObject) snapshot.get(CENT_OBJ);
        } else {
            ComputeGraph buildCentroidsTG = KMeansWorker.buildCentroidsTG(str2, csize, parallelismValue, dimension, config);
            ExecutionPlan plan2 = taskExecutor.plan(buildCentroidsTG);
            taskExecutor.execute(buildCentroidsTG, plan2);
            dataObject = taskExecutor.getOutput(buildCentroidsTG, plan2, "centroidsink");
        }
        long currentTimeMillis2 = System.currentTimeMillis();
        ComputeGraph buildKMeansTG = KMeansWorker.buildKMeansTG(parallelismValue, config);
        ExecutionPlan plan3 = taskExecutor.plan(buildKMeansTG);
        for (int intValue = ((Integer) snapshot.getOrDefault(I_KEY, 0)).intValue(); intValue < iterations; intValue++) {
            taskExecutor.addInput(buildKMeansTG, plan3, "kmeanssource", "points", output);
            taskExecutor.addInput(buildKMeansTG, plan3, "kmeanssource", "centroids", dataObject);
            taskExecutor.itrExecute(buildKMeansTG, plan3);
            dataObject = taskExecutor.getOutput(buildKMeansTG, plan3, "kmeanssink");
            build.commitSnapshot();
        }
        taskExecutor.waitFor(buildKMeansTG, plan3);
        double[][] dArr = (double[][]) dataObject.getPartition(i).getConsumer().next();
        long currentTimeMillis3 = System.currentTimeMillis();
        if (i == 0) {
            LOG.info("Data Load time : " + (currentTimeMillis2 - currentTimeMillis) + "\nTotal Time : " + (currentTimeMillis3 - currentTimeMillis) + "Compute Time : " + (currentTimeMillis3 - currentTimeMillis2));
        }
        LOG.info("Final Centroids After\t" + iterations + "\titerations\t" + Arrays.deepToString(dArr));
        init.close();
    }

    public static void main(String[] strArr) throws ParseException {
        LOG.info("KMeans Clustering Job with fault tolerance");
        HashMap hashMap = new HashMap();
        hashMap.put("twister2.job.id", "KMeans-faultolerance-job");
        Config loadConfig = ResourceAllocator.loadConfig(hashMap);
        Options options = new Options();
        options.addOption("workers", true, "Workers");
        options.addOption("csize", true, "Size of the dapoints file");
        options.addOption("dsize", true, "Size of the centroids file");
        options.addOption(Constants.ARGS_NUMBER_OF_FILES, true, "Number of files");
        options.addOption(Constants.ARGS_SHARED_FILE_SYSTEM, false, "Shared file system");
        options.addOption(CDFConstants.ARGS_NUMBER_OF_DIMENSIONS, true, CDFConstants.ARGS_NUMBER_OF_DIMENSIONS);
        options.addOption(CDFConstants.ARGS_PARALLELISM_VALUE, true, CDFConstants.ARGS_PARALLELISM_VALUE);
        options.addOption(I_KEY, true, I_KEY);
        options.addOption(Utils.createOption("dinput", true, "Data points Input directory", true));
        options.addOption(Utils.createOption("cinput", true, "Centroids Input directory", true));
        options.addOption(Utils.createOption("output", true, "Output directory", true));
        options.addOption(Utils.createOption("filesys", true, "file system", true));
        CommandLine parse = new DefaultParser().parse(options, strArr);
        int parseInt = Integer.parseInt(parse.getOptionValue("workers"));
        int parseInt2 = Integer.parseInt(parse.getOptionValue("dsize"));
        int parseInt3 = Integer.parseInt(parse.getOptionValue("csize"));
        int parseInt4 = Integer.parseInt(parse.getOptionValue(Constants.ARGS_NUMBER_OF_FILES));
        int parseInt5 = Integer.parseInt(parse.getOptionValue(CDFConstants.ARGS_NUMBER_OF_DIMENSIONS));
        int parseInt6 = Integer.parseInt(parse.getOptionValue(CDFConstants.ARGS_PARALLELISM_VALUE));
        int parseInt7 = Integer.parseInt(parse.getOptionValue(I_KEY));
        String optionValue = parse.getOptionValue("dinput");
        String optionValue2 = parse.getOptionValue("cinput");
        String optionValue3 = parse.getOptionValue("output");
        String optionValue4 = parse.getOptionValue("filesys");
        boolean parseBoolean = Boolean.parseBoolean(parse.getOptionValue(Constants.ARGS_SHARED_FILE_SYSTEM));
        JobConfig jobConfig = new JobConfig();
        jobConfig.put("dinput", optionValue);
        jobConfig.put("cinput", optionValue2);
        jobConfig.put("output", optionValue3);
        jobConfig.put("filesys", optionValue4);
        jobConfig.put("dsize", Integer.toString(parseInt2));
        jobConfig.put("csize", Integer.toString(parseInt3));
        jobConfig.put("workers", Integer.toString(parseInt));
        jobConfig.put(Constants.ARGS_NUMBER_OF_FILES, Integer.toString(parseInt4));
        jobConfig.put(CDFConstants.ARGS_NUMBER_OF_DIMENSIONS, Integer.toString(parseInt5));
        jobConfig.put(CDFConstants.ARGS_PARALLELISM_VALUE, Integer.toString(parseInt6));
        jobConfig.put(Constants.ARGS_SHARED_FILE_SYSTEM, Boolean.valueOf(parseBoolean));
        jobConfig.put(I_KEY, Integer.toString(parseInt7));
        Twister2Job.Twister2JobBuilder newBuilder = Twister2Job.newBuilder();
        newBuilder.setJobName("KMeans-faultolerance-job");
        newBuilder.setWorkerClass(KMeansCheckpointingWorker.class.getName());
        newBuilder.addComputeResource(2.0d, 512, 1.0d, parseInt);
        newBuilder.setConfig(jobConfig);
        Twister2Submitter.submitJob(newBuilder.build(), loadConfig);
    }
}
