package edu.iu.dsc.tws.examples.comms.stream;

import edu.iu.dsc.tws.api.comms.Op;
import edu.iu.dsc.tws.api.comms.SingularReceiver;
import edu.iu.dsc.tws.api.comms.messaging.types.MessageTypes;
import edu.iu.dsc.tws.api.comms.structs.Tuple;
import edu.iu.dsc.tws.api.config.Config;
import edu.iu.dsc.tws.api.resource.WorkerEnvironment;
import edu.iu.dsc.tws.comms.functions.reduction.ReduceOperationFunction;
import edu.iu.dsc.tws.comms.selectors.SimpleKeyBasedSelector;
import edu.iu.dsc.tws.comms.stream.SKeyedReduce;
import edu.iu.dsc.tws.comms.utils.LogicalPlanBuilder;
import edu.iu.dsc.tws.examples.comms.KeyedBenchWorker;
import edu.iu.dsc.tws.examples.verification.ExperimentVerification;
import edu.iu.dsc.tws.examples.verification.VerificationException;
import java.util.Arrays;
import java.util.Iterator;
import java.util.Set;
import java.util.logging.Level;
import java.util.logging.Logger;

/* loaded from: input_file:edu/iu/dsc/tws/examples/comms/stream/SKeyedReduceExample.class */
public class SKeyedReduceExample extends KeyedBenchWorker {
    private static final Logger LOG = Logger.getLogger(SKeyedReduceExample.class.getName());
    private SKeyedReduce keyedReduce;
    private boolean reduceDone;

    /* loaded from: input_file:edu/iu/dsc/tws/examples/comms/stream/SKeyedReduceExample$FinalSingularReceiver.class */
    public class FinalSingularReceiver implements SingularReceiver {
        private int count = 0;
        private int expected;

        public FinalSingularReceiver(int i) {
            this.expected = i;
        }

        public void init(Config config, Set<Integer> set) {
        }

        public boolean receive(int i, Object obj) {
            this.count++;
            SKeyedReduceExample.LOG.log(Level.INFO, String.format("Target %d received count %d", Integer.valueOf(i), Integer.valueOf(this.count)));
            SKeyedReduceExample.this.reduceDone = true;
            int[] iArr = (int[]) ((Tuple) obj).getValue();
            SKeyedReduceExample.LOG.log(Level.INFO, String.format("%d Results : %s", Integer.valueOf(SKeyedReduceExample.this.workerId), Arrays.toString(Arrays.copyOfRange(iArr, 0, Math.min(iArr.length, 10)))));
            SKeyedReduceExample.LOG.log(Level.INFO, String.format("%d Received final input", Integer.valueOf(SKeyedReduceExample.this.workerId)));
            SKeyedReduceExample.this.reduceDone = true;
            SKeyedReduceExample.this.experimentData.setOutput(obj);
            try {
                SKeyedReduceExample.this.verify();
                return true;
            } catch (VerificationException e) {
                SKeyedReduceExample.LOG.info("Exception Message : " + e.getMessage());
                return true;
            }
        }
    }

    @Override // edu.iu.dsc.tws.examples.comms.KeyedBenchWorker
    protected void execute(WorkerEnvironment workerEnvironment) {
        LogicalPlanBuilder withFairDistribution = LogicalPlanBuilder.plan(this.jobParameters.getSources(), this.jobParameters.getTargets(), workerEnvironment).withFairDistribution();
        this.keyedReduce = new SKeyedReduce(workerEnvironment.getCommunicator(), withFairDistribution, MessageTypes.INTEGER, MessageTypes.INTEGER_ARRAY, new ReduceOperationFunction(Op.SUM, MessageTypes.INTEGER_ARRAY), new FinalSingularReceiver(this.jobParameters.getIterations()), new SimpleKeyBasedSelector());
        Set sourcesOnThisWorker = withFairDistribution.getSourcesOnThisWorker();
        Iterator it = sourcesOnThisWorker.iterator();
        while (it.hasNext()) {
            this.finishedSources.put(Integer.valueOf(((Integer) it.next()).intValue()), false);
        }
        if (sourcesOnThisWorker.size() == 0) {
            this.sourcesDone = true;
        }
        LOG.log(Level.INFO, String.format("%d Sources %s target %d this %s", Integer.valueOf(this.workerId), withFairDistribution.getSources(), 1, sourcesOnThisWorker));
        Iterator it2 = sourcesOnThisWorker.iterator();
        while (it2.hasNext()) {
            new Thread(new KeyedBenchWorker.MapWorker(((Integer) it2.next()).intValue())).start();
        }
    }

    @Override // edu.iu.dsc.tws.examples.comms.KeyedBenchWorker
    protected boolean progressCommunication() {
        return this.keyedReduce.progress();
    }

    @Override // edu.iu.dsc.tws.examples.comms.KeyedBenchWorker
    protected boolean isDone() {
        return this.reduceDone && this.sourcesDone && this.keyedReduce.isComplete();
    }

    @Override // edu.iu.dsc.tws.examples.comms.KeyedBenchWorker
    protected boolean sendMessages(int i, Object obj, Object obj2, int i2) {
        while (!this.keyedReduce.reduce(i, obj, obj2, i2)) {
            this.keyedReduce.progress();
        }
        return true;
    }

    public void verify() throws VerificationException {
        if (this.jobParameters.isDoVerify()) {
            LOG.info("Verifying results ...");
            if (!new ExperimentVerification(this.experimentData, "keyed_reduce").isVerified()) {
                throw new VerificationException("Results do not match");
            }
            LOG.info("Results generated from the experiment are verified.");
        }
    }
}
