package org.apache.flink.connector.testutils.source.reader;

import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import org.apache.flink.api.connector.source.ReaderInfo;
import org.apache.flink.api.connector.source.SourceEvent;
import org.apache.flink.api.connector.source.SourceSplit;
import org.apache.flink.api.connector.source.SplitEnumeratorContext;
import org.apache.flink.api.connector.source.SplitsAssignment;
import org.apache.flink.core.testutils.ManuallyTriggeredScheduledExecutorService;
import org.apache.flink.metrics.groups.SplitEnumeratorMetricGroup;
import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
import org.apache.flink.util.Preconditions;

/* loaded from: input_file:org/apache/flink/connector/testutils/source/reader/TestingSplitEnumeratorContext.class */
public class TestingSplitEnumeratorContext<SplitT extends SourceSplit> implements SplitEnumeratorContext<SplitT> {
    private final ManuallyTriggeredScheduledExecutorService executor = new ManuallyTriggeredScheduledExecutorService();
    private final HashMap<Integer, SplitAssignmentState<SplitT>> splitAssignments = new HashMap<>();
    private final HashMap<Integer, List<SourceEvent>> events = new HashMap<>();
    private final HashMap<Integer, ReaderInfo> registeredReaders = new HashMap<>();
    private final int parallelism;

    /* loaded from: input_file:org/apache/flink/connector/testutils/source/reader/TestingSplitEnumeratorContext$SplitAssignmentState.class */
    public static final class SplitAssignmentState<SplitT extends SourceSplit> {
        final List<SplitT> splits = new ArrayList();
        boolean noMoreSplits;

        public List<SplitT> getAssignedSplits() {
            return this.splits;
        }

        public boolean hasReceivedNoMoreSplitsSignal() {
            return this.noMoreSplits;
        }
    }

    public TestingSplitEnumeratorContext(int i) {
        this.parallelism = i;
    }

    public void triggerAllActions() {
        this.executor.triggerPeriodicScheduledTasks();
        this.executor.triggerAll();
    }

    public ManuallyTriggeredScheduledExecutorService getExecutorService() {
        return this.executor;
    }

    public Map<Integer, SplitAssignmentState<SplitT>> getSplitAssignments() {
        return this.splitAssignments;
    }

    public Map<Integer, List<SourceEvent>> getSentEvents() {
        return this.events;
    }

    public void registerReader(int i, String str) {
        Preconditions.checkState(!this.registeredReaders.containsKey(Integer.valueOf(i)), "Reader already registered");
        this.registeredReaders.put(Integer.valueOf(i), new ReaderInfo(i, str));
    }

    public SplitEnumeratorMetricGroup metricGroup() {
        return UnregisteredMetricsGroup.createSplitEnumeratorMetricGroup();
    }

    public void sendEventToSourceReader(int i, SourceEvent sourceEvent) {
        this.events.computeIfAbsent(Integer.valueOf(i), num -> {
            return new ArrayList();
        }).add(sourceEvent);
    }

    public void sendEventToSourceReader(int i, int i2, SourceEvent sourceEvent) {
        sendEventToSourceReader(i, sourceEvent);
    }

    public int currentParallelism() {
        return this.parallelism;
    }

    public Map<Integer, ReaderInfo> registeredReaders() {
        return this.registeredReaders;
    }

    public void assignSplits(SplitsAssignment<SplitT> splitsAssignment) {
        for (Map.Entry entry : splitsAssignment.assignment().entrySet()) {
            ((SplitAssignmentState) this.splitAssignments.computeIfAbsent(entry.getKey(), num -> {
                return new SplitAssignmentState();
            })).splits.addAll((Collection) entry.getValue());
        }
    }

    public void signalNoMoreSplits(int i) {
        this.splitAssignments.computeIfAbsent(Integer.valueOf(i), num -> {
            return new SplitAssignmentState();
        }).noMoreSplits = true;
    }

    public <T> void callAsync(Callable<T> callable, BiConsumer<T, Throwable> biConsumer) {
        this.executor.execute(callableWithResultHandler(callable, biConsumer));
    }

    public <T> void callAsync(Callable<T> callable, BiConsumer<T, Throwable> biConsumer, long j, long j2) {
        this.executor.scheduleWithFixedDelay(callableWithResultHandler(callable, biConsumer), j, j2, TimeUnit.MILLISECONDS);
    }

    public void runInCoordinatorThread(Runnable runnable) {
        this.executor.execute(runnable);
    }

    private static <T> Runnable callableWithResultHandler(Callable<T> callable, BiConsumer<T, Throwable> biConsumer) {
        return () -> {
            try {
                biConsumer.accept(callable.call(), null);
            } catch (Throwable th) {
                biConsumer.accept(null, th);
            }
        };
    }
}
