package eu.stratosphere.test.accumulators;

import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import eu.stratosphere.api.common.JobExecutionResult;
import eu.stratosphere.api.common.Plan;
import eu.stratosphere.api.common.accumulators.Accumulator;
import eu.stratosphere.api.common.accumulators.AccumulatorHelper;
import eu.stratosphere.api.common.accumulators.DoubleCounter;
import eu.stratosphere.api.common.accumulators.Histogram;
import eu.stratosphere.api.common.accumulators.IntCounter;
import eu.stratosphere.api.common.operators.FileDataSink;
import eu.stratosphere.api.common.operators.FileDataSource;
import eu.stratosphere.api.common.operators.Operator;
import eu.stratosphere.api.java.record.functions.FunctionAnnotation;
import eu.stratosphere.api.java.record.functions.MapFunction;
import eu.stratosphere.api.java.record.functions.ReduceFunction;
import eu.stratosphere.api.java.record.io.CsvOutputFormat;
import eu.stratosphere.api.java.record.io.TextInputFormat;
import eu.stratosphere.api.java.record.operators.MapOperator;
import eu.stratosphere.api.java.record.operators.ReduceOperator;
import eu.stratosphere.configuration.Configuration;
import eu.stratosphere.core.io.IOReadableWritable;
import eu.stratosphere.core.io.StringRecord;
import eu.stratosphere.nephele.util.SerializableHashSet;
import eu.stratosphere.test.util.RecordAPITestBase;
import eu.stratosphere.types.IntValue;
import eu.stratosphere.types.Record;
import eu.stratosphere.types.StringValue;
import eu.stratosphere.util.Collector;
import eu.stratosphere.util.SimpleStringUtils;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.io.Serializable;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;
import org.junit.Assert;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(Parameterized.class)
/* loaded from: input_file:eu/stratosphere/test/accumulators/AccumulatorITCase.class */
public class AccumulatorITCase extends RecordAPITestBase {
    private static final String INPUT = "one\ntwo two\nthree three three\n";
    private static final String EXPECTED = "one 1\ntwo 2\nthree 3\n";
    private static final int NUM_SUBTASKS = 2;
    protected String dataPath;
    protected String resultPath;

    @ReduceOperator.Combinable
    @FunctionAnnotation.ConstantFields({0})
    /* loaded from: input_file:eu/stratosphere/test/accumulators/AccumulatorITCase$CountWords.class */
    public static class CountWords extends ReduceFunction implements Serializable {
        private static final long serialVersionUID = 1;
        private final IntValue cnt = new IntValue();
        private IntCounter reduceCalls = null;
        private IntCounter combineCalls = null;

        public void open(Configuration configuration) throws Exception {
            this.reduceCalls = getRuntimeContext().getIntCounter("reduce-calls");
            this.combineCalls = getRuntimeContext().getIntCounter("combine-calls");
        }

        public void reduce(Iterator<Record> it, Collector<Record> collector) throws Exception {
            this.reduceCalls.add(1);
            reduceInternal(it, collector);
        }

        public void combine(Iterator<Record> it, Collector<Record> collector) throws Exception {
            this.combineCalls.add(1);
            reduceInternal(it, collector);
        }

        private void reduceInternal(Iterator<Record> it, Collector<Record> collector) {
            Record record = null;
            int i = 0;
            while (true) {
                int i2 = i;
                if (!it.hasNext()) {
                    this.cnt.setValue(i2);
                    record.setField(1, this.cnt);
                    collector.collect(record);
                    return;
                }
                record = it.next();
                i = i2 + record.getField(1, IntValue.class).getValue();
            }
        }
    }

    /* loaded from: input_file:eu/stratosphere/test/accumulators/AccumulatorITCase$SetAccumulator.class */
    public static class SetAccumulator<T extends IOReadableWritable> implements Accumulator<T, Set<T>> {
        private static final long serialVersionUID = 1;
        private SerializableHashSet<T> set = new SerializableHashSet<>();

        public void add(T t) {
            this.set.add(t);
        }

        /* renamed from: getLocalValue, reason: merged with bridge method [inline-methods] */
        public Set<T> m60getLocalValue() {
            return this.set;
        }

        public void resetLocal() {
            this.set.clear();
        }

        public void merge(Accumulator<T, Set<T>> accumulator) {
            this.set.addAll(((SetAccumulator) accumulator).m60getLocalValue());
        }

        public void write(DataOutput dataOutput) throws IOException {
            this.set.write(dataOutput);
        }

        public void read(DataInput dataInput) throws IOException {
            this.set.read(dataInput);
        }
    }

    /* loaded from: input_file:eu/stratosphere/test/accumulators/AccumulatorITCase$TokenizeLine.class */
    public static class TokenizeLine extends MapFunction implements Serializable {
        private static final long serialVersionUID = 1;
        private StringValue word;
        private final Record outputRecord = new Record();
        private final IntValue one = new IntValue(1);
        private final SimpleStringUtils.WhitespaceTokenizer tokenizer = new SimpleStringUtils.WhitespaceTokenizer();
        IntCounter cntNumLines = null;
        Histogram wordsPerLineDistribution = null;
        DoubleCounter openCloseCounter = new DoubleCounter();
        private SetAccumulator<StringRecord> distinctWords = null;

        public void open(Configuration configuration) throws Exception {
            this.cntNumLines = getRuntimeContext().getIntCounter("num-lines");
            this.wordsPerLineDistribution = getRuntimeContext().getHistogram("words-per-line");
            getRuntimeContext().addAccumulator("open-close-counter", this.openCloseCounter);
            this.distinctWords = new SetAccumulator<>();
            getRuntimeContext().addAccumulator("distinct-words", this.distinctWords);
            IntCounter intCounter = getRuntimeContext().getIntCounter("simple-counter");
            intCounter.add(1);
            Assert.assertEquals(intCounter.getLocalValue().intValue(), serialVersionUID);
            Assert.assertEquals(intCounter.getLocalValue(), getRuntimeContext().getIntCounter("simple-counter").getLocalValue());
            try {
                getRuntimeContext().getDoubleCounter("simple-counter");
                Assert.fail("Should not be able to obtain previously created counter with different type");
            } catch (UnsupportedOperationException e) {
            }
            this.openCloseCounter.add(Double.valueOf(0.5d));
        }

        public void map(Record record, Collector<Record> collector) {
            this.cntNumLines.add(1);
            StringValue field = record.getField(0, StringValue.class);
            SimpleStringUtils.replaceNonWordChars(field, ' ');
            SimpleStringUtils.toLowerCase(field);
            this.tokenizer.setStringToTokenize(field);
            int i = 0;
            this.word = new StringValue();
            while (this.tokenizer.next(this.word)) {
                this.distinctWords.add((SetAccumulator<StringRecord>) new StringRecord(this.word.getValue()));
                this.outputRecord.setField(0, this.word);
                this.outputRecord.setField(1, this.one);
                collector.collect(this.outputRecord);
                i++;
            }
            this.wordsPerLineDistribution.add(Integer.valueOf(i));
        }

        public void close() throws Exception {
            this.openCloseCounter.add(Double.valueOf(0.5d));
            Assert.assertEquals(serialVersionUID, this.openCloseCounter.getLocalValue().intValue());
        }

        public /* bridge */ /* synthetic */ void map(Object obj, Collector collector) throws Exception {
            map((Record) obj, (Collector<Record>) collector);
        }
    }

    public AccumulatorITCase(Configuration configuration) {
        super(configuration);
    }

    protected void preSubmit() throws Exception {
        this.dataPath = createTempFile("datapoints.txt", INPUT);
        this.resultPath = getTempFilePath("result");
    }

    protected void postSubmit() throws Exception {
        compareResultsByLinesInMemory(EXPECTED, this.resultPath);
        System.out.println("Accumulator results:");
        JobExecutionResult jobExecutionResult = getJobExecutionResult();
        System.out.println(AccumulatorHelper.getResultsFormated(jobExecutionResult.getAllAccumulatorResults()));
        Assert.assertEquals(new Integer(3), (Integer) jobExecutionResult.getAccumulatorResult("num-lines"));
        Assert.assertEquals(new Double(2.0d), (Double) jobExecutionResult.getAccumulatorResult("open-close-counter"));
        HashMap newHashMap = Maps.newHashMap();
        newHashMap.put(1, 1);
        newHashMap.put(Integer.valueOf(NUM_SUBTASKS), Integer.valueOf(NUM_SUBTASKS));
        newHashMap.put(3, 3);
        Assert.assertEquals(newHashMap, jobExecutionResult.getAccumulatorResult("words-per-line"));
        HashSet newHashSet = Sets.newHashSet();
        newHashSet.add(new StringRecord("one"));
        newHashSet.add(new StringRecord("two"));
        newHashSet.add(new StringRecord("three"));
        Assert.assertEquals(newHashSet, jobExecutionResult.getAccumulatorResult("distinct-words"));
    }

    protected Plan getTestJob() {
        return getTestPlanPlan(this.config.getInteger("IterationAllReducer#NoSubtasks", 1), this.dataPath, this.resultPath);
    }

    @Parameterized.Parameters
    public static Collection<Object[]> getConfigurations() {
        Configuration configuration = new Configuration();
        configuration.setInteger("IterationAllReducer#NoSubtasks", NUM_SUBTASKS);
        return toParameterList(new Configuration[]{configuration});
    }

    static Plan getTestPlanPlan(int i, String str, String str2) {
        Operator fileDataSource = new FileDataSource(new TextInputFormat(), str, "Input Lines");
        fileDataSource.setParameter("textformat.charset", "ASCII");
        Plan plan = new Plan(new FileDataSink(new CsvOutputFormat("\n", " ", new Class[]{StringValue.class, IntValue.class}), str2, ReduceOperator.builder(CountWords.class, StringValue.class, 0).input(new Operator[]{MapOperator.builder(new TokenizeLine()).input(new Operator[]{fileDataSource}).name("Tokenize Lines").build()}).name("Count Words").build(), "Word Counts"), "WordCount Example");
        plan.setDefaultParallelism(i);
        return plan;
    }
}
