package edu.iu.dsc.tws.examples.batch.wordcount.comms;

import edu.iu.dsc.tws.api.comms.DataFlowOperation;
import edu.iu.dsc.tws.api.comms.LogicalPlan;
import edu.iu.dsc.tws.api.comms.ReduceFunction;
import edu.iu.dsc.tws.api.comms.messaging.types.MessageTypes;
import edu.iu.dsc.tws.api.config.Config;
import edu.iu.dsc.tws.api.resource.IPersistentVolume;
import edu.iu.dsc.tws.api.resource.IVolatileVolume;
import edu.iu.dsc.tws.api.resource.IWorker;
import edu.iu.dsc.tws.api.resource.IWorkerController;
import edu.iu.dsc.tws.api.resource.WorkerEnvironment;
import edu.iu.dsc.tws.comms.batch.BKeyedReduce;
import edu.iu.dsc.tws.comms.selectors.HashingSelector;
import edu.iu.dsc.tws.examples.Utils;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.logging.Logger;

/* loaded from: input_file:edu/iu/dsc/tws/examples/batch/wordcount/comms/WordCountWorker.class */
public class WordCountWorker implements IWorker {
    private static final Logger LOG = Logger.getLogger(WordCountWorker.class.getName());
    private BKeyedReduce keyGather;
    private static final int NO_OF_TASKS = 8;
    private Set<Integer> sources;
    private Set<Integer> destinations;
    private LogicalPlan logicalPlan;
    private WordAggregator wordAggregator;
    private int workerId;
    private WorkerEnvironment workerEnv;
    private Set<BatchWordSource> batchWordSources = new HashSet();
    private List<Integer> taskStages = new ArrayList();

    public void execute(Config config, int i, IWorkerController iWorkerController, IPersistentVolume iPersistentVolume, IVolatileVolume iVolatileVolume) {
        this.workerId = i;
        this.taskStages.add(Integer.valueOf(NO_OF_TASKS));
        this.taskStages.add(Integer.valueOf(NO_OF_TASKS));
        this.workerEnv = WorkerEnvironment.init(config, i, iWorkerController, iPersistentVolume, iVolatileVolume);
        this.logicalPlan = Utils.createStageLogicalPlan(this.workerEnv, this.taskStages);
        setupTasks();
        this.wordAggregator = new WordAggregator();
        this.keyGather = new BKeyedReduce(this.workerEnv.getCommunicator(), this.logicalPlan, this.sources, this.destinations, new ReduceFunction() { // from class: edu.iu.dsc.tws.examples.batch.wordcount.comms.WordCountWorker.1
            public void init(Config config2, DataFlowOperation dataFlowOperation, Map<Integer, List<Integer>> map) {
            }

            public Object reduce(Object obj, Object obj2) {
                return Integer.valueOf(((Integer) obj).intValue() + ((Integer) obj2).intValue());
            }
        }, this.wordAggregator, MessageTypes.OBJECT, MessageTypes.INTEGER, new HashingSelector());
        scheduleTasks();
        progress();
        this.workerEnv.close();
    }

    private void setupTasks() {
        this.sources = new HashSet();
        for (int i = 0; i < NO_OF_TASKS; i++) {
            this.sources.add(Integer.valueOf(i));
        }
        this.destinations = new HashSet();
        for (int i2 = 0; i2 < NO_OF_TASKS; i2++) {
            this.destinations.add(Integer.valueOf(NO_OF_TASKS + i2));
        }
        LOG.fine(String.format("%d sources %s destinations %s", Integer.valueOf(this.logicalPlan.getThisExecutor()), this.sources, this.destinations));
    }

    private void scheduleTasks() {
        Iterator<Integer> it = Utils.getTasksOfExecutor(this.workerId, this.logicalPlan, this.taskStages, 0).iterator();
        while (it.hasNext()) {
            BatchWordSource batchWordSource = new BatchWordSource(this.keyGather, 1000, it.next().intValue(), 10);
            this.batchWordSources.add(batchWordSource);
            new Thread(batchWordSource).start();
        }
    }

    private void progress() {
        boolean z = false;
        while (!z) {
            z = true;
            this.workerEnv.getChannel().progress();
            if (this.keyGather.progress()) {
                z = false;
            }
            if (!this.keyGather.isComplete()) {
                z = false;
            }
            Iterator<BatchWordSource> it = this.batchWordSources.iterator();
            while (it.hasNext()) {
                if (!it.next().isDone()) {
                    z = false;
                }
            }
            if (!this.wordAggregator.isDone()) {
                z = false;
            }
        }
    }
}
