package edu.iu.dsc.tws.examples.task.streaming;

import edu.iu.dsc.tws.api.JobConfig;
import edu.iu.dsc.tws.api.Twister2Job;
import edu.iu.dsc.tws.api.compute.IMessage;
import edu.iu.dsc.tws.api.compute.graph.GraphConstants;
import edu.iu.dsc.tws.api.compute.graph.OperationMode;
import edu.iu.dsc.tws.api.compute.nodes.BaseSink;
import edu.iu.dsc.tws.api.config.Config;
import edu.iu.dsc.tws.api.resource.IPersistentVolume;
import edu.iu.dsc.tws.api.resource.IVolatileVolume;
import edu.iu.dsc.tws.api.resource.IWorker;
import edu.iu.dsc.tws.api.resource.IWorkerController;
import edu.iu.dsc.tws.connectors.TwsKafkaConsumer;
import edu.iu.dsc.tws.examples.internal.task.TaskUtils;
import edu.iu.dsc.tws.rsched.core.ResourceAllocator;
import edu.iu.dsc.tws.rsched.job.Twister2Submitter;
import edu.iu.dsc.tws.task.graph.GraphBuilder;
import java.util.ArrayList;
import java.util.HashMap;

/* loaded from: input_file:edu/iu/dsc/tws/examples/task/streaming/StreamingTaskExampleKafka.class */
public class StreamingTaskExampleKafka implements IWorker {

    /* loaded from: input_file:edu/iu/dsc/tws/examples/task/streaming/StreamingTaskExampleKafka$RecevingTask.class */
    private static class RecevingTask extends BaseSink {
        private static final long serialVersionUID = -254264903510284798L;
        private int count;

        private RecevingTask() {
            this.count = 0;
        }

        public boolean execute(IMessage iMessage) {
            if (this.count % 1000000 == 0) {
                System.out.println(iMessage.getContent());
            }
            this.count++;
            return true;
        }
    }

    public void execute(Config config, int i, IWorkerController iWorkerController, IPersistentVolume iPersistentVolume, IVolatileVolume iVolatileVolume) {
        ArrayList arrayList = new ArrayList();
        arrayList.add("sample_topic1");
        ArrayList arrayList2 = new ArrayList();
        arrayList2.add("localhost:9092");
        TwsKafkaConsumer twsKafkaConsumer = new TwsKafkaConsumer(arrayList, arrayList2, "test", "partition-edge");
        RecevingTask recevingTask = new RecevingTask();
        GraphBuilder newBuilder = GraphBuilder.newBuilder();
        newBuilder.addSource("source", twsKafkaConsumer);
        newBuilder.setParallelism("source", 4);
        newBuilder.addSink("sink", recevingTask);
        newBuilder.setParallelism("sink", 4);
        newBuilder.connect("source", "sink", "partition-edge", "partition");
        newBuilder.operationMode(OperationMode.STREAMING);
        newBuilder.addConfiguration("source", "Ram", Integer.valueOf(GraphConstants.taskInstanceRam(config)));
        newBuilder.addConfiguration("source", "Disk", Integer.valueOf(GraphConstants.taskInstanceDisk(config)));
        newBuilder.addConfiguration("source", "Cpu", Integer.valueOf(GraphConstants.taskInstanceCpu(config)));
        ArrayList arrayList3 = new ArrayList();
        arrayList3.add("dataset1.txt");
        arrayList3.add("dataset2.txt");
        newBuilder.addConfiguration("source", "inputdataset", arrayList3);
        TaskUtils.execute(config, i, newBuilder.build(), iWorkerController);
    }

    public static void main(String[] strArr) {
        Config loadConfig = ResourceAllocator.loadConfig(new HashMap());
        HashMap hashMap = new HashMap();
        hashMap.put("twister2.exector.worker.threads", 8);
        JobConfig jobConfig = new JobConfig();
        jobConfig.putAll(hashMap);
        Twister2Job.Twister2JobBuilder newBuilder = Twister2Job.newBuilder();
        newBuilder.setJobName("task-example");
        newBuilder.setWorkerClass(StreamingTaskExampleKafka.class.getName());
        newBuilder.addComputeResource(1.0d, 512, 4);
        newBuilder.setConfig(jobConfig);
        Twister2Submitter.submitJob(newBuilder.build(), loadConfig);
    }
}
