package edu.iu.dsc.tws.examples.internal.hdfs;

import edu.iu.dsc.tws.api.JobConfig;
import edu.iu.dsc.tws.api.Twister2Job;
import edu.iu.dsc.tws.api.comms.Communicator;
import edu.iu.dsc.tws.api.comms.channel.TWSChannel;
import edu.iu.dsc.tws.api.compute.IMessage;
import edu.iu.dsc.tws.api.compute.TaskContext;
import edu.iu.dsc.tws.api.compute.graph.ComputeGraph;
import edu.iu.dsc.tws.api.compute.graph.OperationMode;
import edu.iu.dsc.tws.api.compute.nodes.BaseSink;
import edu.iu.dsc.tws.api.compute.nodes.BaseSource;
import edu.iu.dsc.tws.api.compute.schedule.elements.TaskInstancePlan;
import edu.iu.dsc.tws.api.compute.schedule.elements.TaskSchedulePlan;
import edu.iu.dsc.tws.api.compute.schedule.elements.Worker;
import edu.iu.dsc.tws.api.compute.schedule.elements.WorkerPlan;
import edu.iu.dsc.tws.api.compute.schedule.elements.WorkerSchedulePlan;
import edu.iu.dsc.tws.api.config.Config;
import edu.iu.dsc.tws.api.exceptions.TimeoutException;
import edu.iu.dsc.tws.api.resource.IPersistentVolume;
import edu.iu.dsc.tws.api.resource.IVolatileVolume;
import edu.iu.dsc.tws.api.resource.IWorker;
import edu.iu.dsc.tws.api.resource.IWorkerController;
import edu.iu.dsc.tws.api.resource.Network;
import edu.iu.dsc.tws.executor.core.ExecutionPlanBuilder;
import edu.iu.dsc.tws.executor.threading.Executor;
import edu.iu.dsc.tws.proto.jobmaster.JobMasterAPI;
import edu.iu.dsc.tws.rsched.core.ResourceAllocator;
import edu.iu.dsc.tws.rsched.job.Twister2Submitter;
import edu.iu.dsc.tws.task.graph.GraphBuilder;
import edu.iu.dsc.tws.tsched.taskscheduler.TaskScheduler;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.logging.Level;
import java.util.logging.Logger;

/* loaded from: input_file:edu/iu/dsc/tws/examples/internal/hdfs/HDFSTaskExample.class */
public class HDFSTaskExample implements IWorker {
    private static final Logger LOG = Logger.getLogger(HDFSTaskExample.class.getName());

    /* loaded from: input_file:edu/iu/dsc/tws/examples/internal/hdfs/HDFSTaskExample$GeneratorTask.class */
    private static class GeneratorTask extends BaseSource {
        private static final long serialVersionUID = -254264903510284748L;
        private int count;
        private String inputFileName;
        private String outputFileName;

        private GeneratorTask() {
            this.count = 0;
        }

        public void prepare(Config config, TaskContext taskContext) {
            this.context = taskContext;
            this.config = config;
            for (Map.Entry entry : this.context.getConfigurations().entrySet()) {
                if (((String) entry.getKey()).contains("inputdataset")) {
                    List list = (List) entry.getValue();
                    if (list.size() == 1) {
                        this.inputFileName = (String) list.get(0);
                    } else {
                        for (int i = 0; i < list.size(); i++) {
                            this.inputFileName = (String) list.get(i);
                        }
                    }
                }
            }
        }

        public void execute() {
            if (this.count == 0) {
                new HDFSReaderWriter(this.config, this.inputFileName).readInputFromHDFS();
            }
            if (this.context.write("partition-edge", "Hello")) {
                this.count++;
                if (this.count % 100 == 0) {
                    HDFSTaskExample.LOG.info(String.format("%d %d Message Partition sent count : %d", Integer.valueOf(this.context.getWorkerId()), Integer.valueOf(this.context.globalTaskId()), Integer.valueOf(this.count)));
                }
            }
        }
    }

    /* loaded from: input_file:edu/iu/dsc/tws/examples/internal/hdfs/HDFSTaskExample$ReceivingTask.class */
    private static class ReceivingTask extends BaseSink {
        private static final long serialVersionUID = -254264903510284798L;
        private int count;
        private String outputFileName;

        private ReceivingTask() {
            this.count = 0;
        }

        public void prepare(Config config, TaskContext taskContext) {
            this.context = taskContext;
            this.config = config;
            for (Map.Entry entry : this.context.getConfigurations().entrySet()) {
                if (((String) entry.getKey()).contains("outputdataset")) {
                    List list = (List) entry.getValue();
                    if (list.size() == 1) {
                        this.outputFileName = (String) list.get(0);
                    } else {
                        for (int i = 0; i < list.size(); i++) {
                            this.outputFileName = (String) list.get(i);
                        }
                    }
                }
            }
        }

        public boolean execute(IMessage iMessage) {
            if (this.count == 0) {
                new HDFSReaderWriter(this.config, this.outputFileName).writeOutputToHDFS();
            }
            if (iMessage.getContent() instanceof List) {
                this.count += ((List) iMessage.getContent()).size();
            }
            HDFSTaskExample.LOG.info(String.format("%d %d Message Partition Received count: %d", Integer.valueOf(this.context.getWorkerId()), Integer.valueOf(this.context.globalTaskId()), Integer.valueOf(this.count)));
            return true;
        }
    }

    public static void main(String[] strArr) {
        Config loadConfig = ResourceAllocator.loadConfig(new HashMap());
        HashMap hashMap = new HashMap();
        hashMap.put("twister2.exector.worker.threads", 8);
        JobConfig jobConfig = new JobConfig();
        jobConfig.putAll(hashMap);
        Twister2Job.Twister2JobBuilder newBuilder = Twister2Job.newBuilder();
        newBuilder.setJobName("hdfstask-example");
        newBuilder.setWorkerClass(HDFSTaskExample.class.getName());
        newBuilder.addComputeResource(1.0d, 512, 2);
        newBuilder.setConfig(jobConfig);
        Twister2Submitter.submitJob(newBuilder.build(), loadConfig);
    }

    public void execute(Config config, int i, IWorkerController iWorkerController, IPersistentVolume iPersistentVolume, IVolatileVolume iVolatileVolume) {
        GeneratorTask generatorTask = new GeneratorTask();
        ReceivingTask receivingTask = new ReceivingTask();
        GraphBuilder newBuilder = GraphBuilder.newBuilder();
        newBuilder.addSource("source", generatorTask);
        newBuilder.setParallelism("source", 2);
        newBuilder.addSink("sink", receivingTask);
        newBuilder.setParallelism("sink", 2);
        newBuilder.connect("source", "sink", "partition-edge", "partition");
        newBuilder.operationMode(OperationMode.STREAMING);
        ArrayList arrayList = new ArrayList();
        arrayList.add("dataset1.txt");
        newBuilder.addConfiguration("source", "inputdataset", arrayList);
        newBuilder.addConfiguration("sink", "inputdataset", arrayList);
        ArrayList arrayList2 = new ArrayList();
        arrayList2.add("datasetout.txt");
        newBuilder.addConfiguration("source", "outputdataset", arrayList2);
        newBuilder.addConfiguration("sink", "outputdataset", arrayList2);
        try {
            List<JobMasterAPI.WorkerInfo> allWorkers = iWorkerController.getAllWorkers();
            WorkerPlan createWorkerPlan = createWorkerPlan(allWorkers);
            ComputeGraph build = newBuilder.build();
            TaskScheduler taskScheduler = new TaskScheduler();
            taskScheduler.initialize(config);
            TaskSchedulePlan schedule = taskScheduler.schedule(build, createWorkerPlan);
            if (i == 0 && schedule != null) {
                for (Map.Entry entry : schedule.getContainersMap().entrySet()) {
                    Integer num = (Integer) entry.getKey();
                    Set<TaskInstancePlan> taskInstances = ((WorkerSchedulePlan) entry.getValue()).getTaskInstances();
                    LOG.info("Task Details for Container Id:" + num);
                    for (TaskInstancePlan taskInstancePlan : taskInstances) {
                        LOG.info("Task Id:" + taskInstancePlan.getTaskId() + "\tTask Index" + taskInstancePlan.getTaskIndex() + "\tTask Name:" + taskInstancePlan.getTaskName());
                    }
                }
            }
            TWSChannel initializeChannel = Network.initializeChannel(config, iWorkerController);
            new Executor(config, i, initializeChannel).execute(new ExecutionPlanBuilder(i, allWorkers, new Communicator(config, initializeChannel), iWorkerController.getCheckpointingClient()).build(config, build, schedule));
        } catch (TimeoutException e) {
            LOG.log(Level.SEVERE, e.getMessage(), e);
        }
    }

    public WorkerPlan createWorkerPlan(List<JobMasterAPI.WorkerInfo> list) {
        ArrayList arrayList = new ArrayList();
        Iterator<JobMasterAPI.WorkerInfo> it = list.iterator();
        while (it.hasNext()) {
            arrayList.add(new Worker(it.next().getWorkerID()));
        }
        return new WorkerPlan(arrayList);
    }
}
