package org.apache.paimon.flink.source.align;

import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import org.apache.flink.api.connector.source.SplitEnumeratorContext;
import org.apache.flink.connector.testutils.source.reader.TestingSplitEnumeratorContext;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.consumer.ConsumerManager;
import org.apache.paimon.flink.source.ContinuousFileSplitEnumeratorTest;
import org.apache.paimon.flink.source.FileStoreSourceSplit;
import org.apache.paimon.fs.FileIOFinder;
import org.apache.paimon.fs.local.LocalFileIO;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.table.BucketMode;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.FileStoreTableFactory;
import org.apache.paimon.table.source.StreamTableScan;
import org.apache.paimon.testutils.assertj.AssertionUtils;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.RowType;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.Condition;
import org.assertj.core.api.ThrowingConsumer;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;

/* loaded from: input_file:org/apache/paimon/flink/source/align/AlignedContinuousFileSplitEnumeratorTest.class */
public class AlignedContinuousFileSplitEnumeratorTest {
    private static final RowType ROW_TYPE = RowType.of(new DataType[]{DataTypes.INT(), DataTypes.INT(), DataTypes.BIGINT()}, new String[]{"pt", "a", "b"});
    private static final String CONSUMER_ID = "consumer";

    @TempDir
    private Path tempDir;
    private FileStoreTable table;

    /* loaded from: input_file:org/apache/paimon/flink/source/align/AlignedContinuousFileSplitEnumeratorTest$Builder.class */
    private static class Builder {
        private SplitEnumeratorContext<FileStoreSourceSplit> context;
        private Collection<FileStoreSourceSplit> initialSplits;
        private long discoveryInterval;
        private StreamTableScan scan;
        private BucketMode bucketMode;
        private long timeout;

        private Builder() {
            this.initialSplits = Collections.emptyList();
            this.discoveryInterval = Long.MAX_VALUE;
            this.bucketMode = BucketMode.FIXED;
            this.timeout = 30000L;
        }

        public Builder setSplitEnumeratorContext(SplitEnumeratorContext<FileStoreSourceSplit> splitEnumeratorContext) {
            this.context = splitEnumeratorContext;
            return this;
        }

        public Builder setInitialSplits(Collection<FileStoreSourceSplit> collection) {
            this.initialSplits = collection;
            return this;
        }

        public Builder setDiscoveryInterval(long j) {
            this.discoveryInterval = j;
            return this;
        }

        public Builder setScan(StreamTableScan streamTableScan) {
            this.scan = streamTableScan;
            return this;
        }

        public Builder withBucketMode(BucketMode bucketMode) {
            this.bucketMode = bucketMode;
            return this;
        }

        public Builder setAlignedTimeout(long j) {
            this.timeout = j;
            return this;
        }

        public AlignedContinuousFileSplitEnumerator build() {
            return new AlignedContinuousFileSplitEnumerator(this.context, this.initialSplits, (Long) null, this.discoveryInterval, this.scan, this.bucketMode, this.timeout);
        }
    }

    @BeforeEach
    public void before() throws Exception {
        org.apache.paimon.fs.Path path = new org.apache.paimon.fs.Path("traceable://" + this.tempDir.toString());
        this.table = FileStoreTableFactory.create(LocalFileIO.create(), path, new SchemaManager(FileIOFinder.find(path), path).createTable(new Schema(ROW_TYPE.getFields(), Collections.singletonList("pt"), Arrays.asList("pt", "a"), Collections.singletonMap(CoreOptions.CONSUMER_ID.key(), CONSUMER_ID), "")));
    }

    @Test
    public void testSplitsAssignedBySnapshot() throws Exception {
        SplitEnumeratorContext<FileStoreSourceSplit> testingSplitEnumeratorContext = new TestingSplitEnumeratorContext<>(2);
        testingSplitEnumeratorContext.registerReader(0, "test-host");
        testingSplitEnumeratorContext.registerReader(1, "test-host");
        ArrayList arrayList = new ArrayList();
        for (int i = 1; i <= 2; i++) {
            arrayList.add(ContinuousFileSplitEnumeratorTest.createSnapshotSplit(i, 0, Collections.emptyList()));
        }
        arrayList.add(ContinuousFileSplitEnumeratorTest.createSnapshotSplit(2, 1, Collections.emptyList()));
        ArrayList arrayList2 = new ArrayList(arrayList);
        AlignedContinuousFileSplitEnumerator build = new Builder().setSplitEnumeratorContext(testingSplitEnumeratorContext).setInitialSplits(arrayList).setDiscoveryInterval(3L).build();
        build.handleSplitRequest(0, "test-host");
        build.handleSplitRequest(1, "test-host");
        Map splitAssignments = testingSplitEnumeratorContext.getSplitAssignments();
        Assertions.assertThat(splitAssignments).containsOnlyKeys(new Integer[]{0});
        Assertions.assertThat(((TestingSplitEnumeratorContext.SplitAssignmentState) splitAssignments.get(0)).getAssignedSplits()).containsExactly(new FileStoreSourceSplit[]{(FileStoreSourceSplit) arrayList2.get(0)});
        testingSplitEnumeratorContext.getSplitAssignments().clear();
        build.handleSplitRequest(0, "test-host");
        build.handleSplitRequest(1, "test-host");
        Assertions.assertThat(testingSplitEnumeratorContext.getSplitAssignments()).isEmpty();
        build.snapshotState(1L);
        Assertions.assertThat(testingSplitEnumeratorContext.getSplitAssignments()).isEmpty();
        build.handleSplitRequest(0, "test-host");
        build.handleSplitRequest(1, "test-host");
        Map splitAssignments2 = testingSplitEnumeratorContext.getSplitAssignments();
        Assertions.assertThat(splitAssignments2).containsOnlyKeys(new Integer[]{0, 1});
        Assertions.assertThat(((TestingSplitEnumeratorContext.SplitAssignmentState) splitAssignments2.get(0)).getAssignedSplits()).containsExactly(new FileStoreSourceSplit[]{(FileStoreSourceSplit) arrayList2.get(1)});
        Assertions.assertThat(((TestingSplitEnumeratorContext.SplitAssignmentState) splitAssignments2.get(1)).getAssignedSplits()).containsExactly(new FileStoreSourceSplit[]{(FileStoreSourceSplit) arrayList2.get(2)});
    }

    @Test
    public void testEnumeratorSnapshotState() throws Exception {
        SplitEnumeratorContext<FileStoreSourceSplit> testingSplitEnumeratorContext = new TestingSplitEnumeratorContext<>(1);
        testingSplitEnumeratorContext.registerReader(0, "test-host");
        AlignedContinuousFileSplitEnumerator build = new Builder().setSplitEnumeratorContext(testingSplitEnumeratorContext).setInitialSplits(Collections.emptyList()).setDiscoveryInterval(3L).setAlignedTimeout(10L).build();
        Assertions.assertThatThrownBy(() -> {
            build.snapshotState(1L);
        }).satisfies(new ThrowingConsumer[]{AssertionUtils.anyCauseMatches("Timeout while waiting for snapshot from paimon source.")});
        ArrayList arrayList = new ArrayList();
        for (int i = 1; i <= 2; i++) {
            arrayList.add(ContinuousFileSplitEnumeratorTest.createSnapshotSplit(i, 0, Collections.emptyList()));
        }
        build.addSplits(arrayList);
        build.handleSplitRequest(0, "test-host");
        Map splitAssignments = testingSplitEnumeratorContext.getSplitAssignments();
        Assertions.assertThat(splitAssignments).containsOnlyKeys(new Integer[]{0});
        Assertions.assertThat(((TestingSplitEnumeratorContext.SplitAssignmentState) splitAssignments.get(0)).getAssignedSplits()).containsExactly(new FileStoreSourceSplit[]{(FileStoreSourceSplit) arrayList.get(0)});
        Assertions.assertThat(build.snapshotState(1L).splits()).containsExactly(new FileStoreSourceSplit[]{(FileStoreSourceSplit) arrayList.get(1)});
    }

    @Test
    public void testScanWithConsumerId() throws Exception {
        SplitEnumeratorContext<FileStoreSourceSplit> testingSplitEnumeratorContext = new TestingSplitEnumeratorContext<>(1);
        testingSplitEnumeratorContext.registerReader(0, "test-host");
        AlignedContinuousFileSplitEnumerator build = new Builder().setSplitEnumeratorContext(testingSplitEnumeratorContext).setInitialSplits(Collections.emptyList()).setScan(this.table.newStreamScan()).build();
        ArrayList arrayList = new ArrayList();
        for (int i = 1; i <= 2; i++) {
            arrayList.add(ContinuousFileSplitEnumeratorTest.createSnapshotSplit(i, 0, Collections.emptyList()));
        }
        build.addSplits(arrayList);
        ConsumerManager consumerManager = new ConsumerManager(this.table.fileIO(), this.table.location());
        Assertions.assertThat(consumerManager.consumer(CONSUMER_ID)).isEmpty();
        build.handleSplitRequest(0, "test-host");
        build.snapshotState(1L);
        build.notifyCheckpointComplete(1L);
        Assertions.assertThat(consumerManager.consumer(CONSUMER_ID)).hasValueSatisfying(new Condition(consumer -> {
            return consumer.nextSnapshot() == 2;
        }, "condition", new Object[0]));
        build.handleSplitRequest(0, "test-host");
        build.snapshotState(2L);
        build.notifyCheckpointComplete(2L);
        Assertions.assertThat(consumerManager.consumer(CONSUMER_ID)).hasValueSatisfying(new Condition(consumer2 -> {
            return consumer2.nextSnapshot() == 3;
        }, "condition", new Object[0]));
    }
}
