package eu.stratosphere.test.iterative;

import eu.stratosphere.api.common.Plan;
import eu.stratosphere.api.common.operators.BulkIteration;
import eu.stratosphere.api.common.operators.FileDataSink;
import eu.stratosphere.api.common.operators.FileDataSource;
import eu.stratosphere.api.common.operators.Operator;
import eu.stratosphere.api.java.record.functions.MapFunction;
import eu.stratosphere.api.java.record.functions.ReduceFunction;
import eu.stratosphere.api.java.record.io.CsvOutputFormat;
import eu.stratosphere.api.java.record.io.TextInputFormat;
import eu.stratosphere.api.java.record.operators.MapOperator;
import eu.stratosphere.api.java.record.operators.ReduceOperator;
import eu.stratosphere.test.util.RecordAPITestBase;
import eu.stratosphere.types.Record;
import eu.stratosphere.types.StringValue;
import eu.stratosphere.util.Collector;
import java.io.Serializable;
import java.util.Iterator;
import org.junit.Assert;

/* loaded from: input_file:eu/stratosphere/test/iterative/IterationTerminationWithTerminationTail.class */
public class IterationTerminationWithTerminationTail extends RecordAPITestBase {
    private static final String INPUT = "1\n2\n3\n4\n5\n";
    private static final String EXPECTED = "22\n";
    protected String dataPath;
    protected String resultPath;

    /* loaded from: input_file:eu/stratosphere/test/iterative/IterationTerminationWithTerminationTail$SumReducer.class */
    public static final class SumReducer extends ReduceFunction implements Serializable {
        private static final long serialVersionUID = 1;

        public void reduce(Iterator<Record> it, Collector<Record> collector) {
            int i = 0;
            while (true) {
                int i2 = i;
                if (!it.hasNext()) {
                    collector.collect(new Record(new StringValue(Integer.toString(i2))));
                    return;
                }
                i = i2 + Integer.parseInt(it.next().getField(0, StringValue.class).getValue()) + 1;
            }
        }
    }

    /* loaded from: input_file:eu/stratosphere/test/iterative/IterationTerminationWithTerminationTail$TerminationMapper.class */
    public static class TerminationMapper extends MapFunction implements Serializable {
        private static final long serialVersionUID = 1;

        public void map(Record record, Collector<Record> collector) {
            if (Integer.parseInt(record.getField(0, StringValue.class).getValue()) < 22) {
                collector.collect(record);
            }
        }

        public /* bridge */ /* synthetic */ void map(Object obj, Collector collector) throws Exception {
            map((Record) obj, (Collector<Record>) collector);
        }
    }

    protected void preSubmit() throws Exception {
        this.dataPath = createTempFile("datapoints.txt", INPUT);
        this.resultPath = getTempFilePath("result");
    }

    protected void postSubmit() throws Exception {
        compareResultsByLinesInMemory(EXPECTED, this.resultPath);
    }

    protected Plan getTestJob() {
        return getTestPlanPlan(4, this.dataPath, this.resultPath);
    }

    private static Plan getTestPlanPlan(int i, String str, String str2) {
        FileDataSource fileDataSource = new FileDataSource(TextInputFormat.class, str, "input");
        BulkIteration bulkIteration = new BulkIteration("Loop");
        bulkIteration.setInput(fileDataSource);
        bulkIteration.setMaximumNumberOfIterations(5);
        Assert.assertTrue(bulkIteration.getMaximumNumberOfIterations() > 1);
        Operator build = ReduceOperator.builder(new SumReducer()).input(new Operator[]{bulkIteration.getPartialSolution()}).name("Compute sum (Reduce)").build();
        bulkIteration.setNextPartialSolution(build);
        bulkIteration.setTerminationCriterion(MapOperator.builder(new TerminationMapper()).input(new Operator[]{build}).name("Compute termination criterion (Map)").build());
        FileDataSink fileDataSink = new FileDataSink(CsvOutputFormat.class, str2, bulkIteration, "Output");
        ((CsvOutputFormat.ConfigBuilder) ((CsvOutputFormat.ConfigBuilder) CsvOutputFormat.configureRecordFormat(fileDataSink).recordDelimiter('\n')).fieldDelimiter(' ')).field(StringValue.class, 0);
        Plan plan = new Plan(fileDataSink, "Iteration with AllReducer (keyless Reducer)");
        plan.setDefaultParallelism(i);
        Assert.assertTrue(plan.getDefaultParallelism() > 1);
        return plan;
    }
}
