package eu.stratosphere.test.cancelling;

import eu.stratosphere.api.common.Plan;
import eu.stratosphere.api.common.operators.GenericDataSink;
import eu.stratosphere.api.common.operators.GenericDataSource;
import eu.stratosphere.api.common.operators.Operator;
import eu.stratosphere.api.java.record.functions.MapFunction;
import eu.stratosphere.api.java.record.operators.MapOperator;
import eu.stratosphere.configuration.Configuration;
import eu.stratosphere.test.recordJobs.util.DiscardingOutputFormat;
import eu.stratosphere.test.recordJobs.util.InfiniteIntegerInputFormat;
import eu.stratosphere.types.Record;
import eu.stratosphere.util.Collector;

/* loaded from: input_file:eu/stratosphere/test/cancelling/MapCancelingITCase.class */
public class MapCancelingITCase extends CancellingTestBase {

    /* loaded from: input_file:eu/stratosphere/test/cancelling/MapCancelingITCase$DelayingIdentityMapper.class */
    public static final class DelayingIdentityMapper extends MapFunction {
        private static final long serialVersionUID = 1;
        private static final int WAIT_TIME_PER_RECORD = 10000;

        public void map(Record record, Collector<Record> collector) throws Exception {
            Thread.sleep(10000L);
            collector.collect(record);
        }

        public /* bridge */ /* synthetic */ void map(Object obj, Collector collector) throws Exception {
            map((Record) obj, (Collector<Record>) collector);
        }
    }

    /* loaded from: input_file:eu/stratosphere/test/cancelling/MapCancelingITCase$IdentityMapper.class */
    public static final class IdentityMapper extends MapFunction {
        private static final long serialVersionUID = 1;

        public void map(Record record, Collector<Record> collector) throws Exception {
            collector.collect(record);
        }

        public /* bridge */ /* synthetic */ void map(Object obj, Collector collector) throws Exception {
            map((Record) obj, (Collector<Record>) collector);
        }
    }

    /* loaded from: input_file:eu/stratosphere/test/cancelling/MapCancelingITCase$LongCancelTimeIdentityMapper.class */
    public static final class LongCancelTimeIdentityMapper extends MapFunction {
        private static final long serialVersionUID = 1;
        private static final int WAIT_TIME_PER_RECORD = 5000;

        public void map(Record record, Collector<Record> collector) throws Exception {
            long currentTimeMillis;
            long currentTimeMillis2 = System.currentTimeMillis();
            long j = 5000;
            do {
                try {
                    Thread.sleep(j);
                } catch (InterruptedException e) {
                }
                currentTimeMillis = (5000 - System.currentTimeMillis()) + currentTimeMillis2;
                j = currentTimeMillis;
            } while (currentTimeMillis > 0);
            collector.collect(record);
        }

        public /* bridge */ /* synthetic */ void map(Object obj, Collector collector) throws Exception {
            map((Record) obj, (Collector<Record>) collector);
        }
    }

    /* loaded from: input_file:eu/stratosphere/test/cancelling/MapCancelingITCase$StuckInOpenIdentityMapper.class */
    public static final class StuckInOpenIdentityMapper extends MapFunction {
        private static final long serialVersionUID = 1;

        public void open(Configuration configuration) throws Exception {
            synchronized (this) {
                wait();
            }
        }

        public void map(Record record, Collector<Record> collector) throws Exception {
            collector.collect(record);
        }

        public /* bridge */ /* synthetic */ void map(Object obj, Collector collector) throws Exception {
            map((Record) obj, (Collector<Record>) collector);
        }
    }

    public void testMapCancelling() throws Exception {
        Plan plan = new Plan(new GenericDataSink(new DiscardingOutputFormat(), MapOperator.builder(IdentityMapper.class).input(new Operator[]{new GenericDataSource(new InfiniteIntegerInputFormat(), "Source")}).name("Identity Mapper").build(), "Sink"));
        plan.setDefaultParallelism(4);
        runAndCancelJob(plan, 5000, 10000);
    }

    public void testSlowMapCancelling() throws Exception {
        Plan plan = new Plan(new GenericDataSink(new DiscardingOutputFormat(), MapOperator.builder(DelayingIdentityMapper.class).input(new Operator[]{new GenericDataSource(new InfiniteIntegerInputFormat(), "Source")}).name("Delay Mapper").build(), "Sink"));
        plan.setDefaultParallelism(4);
        runAndCancelJob(plan, 5000, 10000);
    }

    public void testMapWithLongCancellingResponse() throws Exception {
        Plan plan = new Plan(new GenericDataSink(new DiscardingOutputFormat(), MapOperator.builder(LongCancelTimeIdentityMapper.class).input(new Operator[]{new GenericDataSource(new InfiniteIntegerInputFormat(), "Source")}).name("Long Cancelling Time Mapper").build(), "Sink"));
        plan.setDefaultParallelism(4);
        runAndCancelJob(plan, 10000, 10000);
    }

    public void testMapPriorToFirstRecordReading() throws Exception {
        Plan plan = new Plan(new GenericDataSink(new DiscardingOutputFormat(), MapOperator.builder(StuckInOpenIdentityMapper.class).input(new Operator[]{new GenericDataSource(new InfiniteIntegerInputFormat(), "Source")}).name("Stuck-In-Open Mapper").build(), "Sink"));
        plan.setDefaultParallelism(4);
        runAndCancelJob(plan, 10000, 10000);
    }
}
