package org.apache.paimon.flink.source;

import java.util.Collections;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.flink.connector.file.src.reader.BulkFormat;
import org.apache.flink.connector.file.src.util.ArrayResultIterator;
import org.apache.flink.connector.file.src.util.SingletonResultIterator;
import org.apache.flink.connector.testutils.source.reader.TestingReaderOutput;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.paimon.table.source.Split;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

/* loaded from: input_file:org/apache/paimon/flink/source/FlinkRecordsWithSplitIdsTest.class */
public class FlinkRecordsWithSplitIdsTest {
    private RowData[] rows;
    private ArrayResultIterator<RowData> iter;
    private TestingReaderOutput<RowData> output;
    private FileStoreSourceSplitState state;

    @BeforeEach
    public void beforeEach() {
        this.iter = new ArrayResultIterator<>();
        this.rows = new RowData[]{GenericRowData.of(new Object[]{1, 1}), GenericRowData.of(new Object[]{2, 2})};
        this.iter.set(this.rows, this.rows.length, -1L, 0L);
        this.output = new TestingReaderOutput<>();
        this.state = new FileStoreSourceSplitState(new FileStoreSourceSplit("", (Split) null));
    }

    @Test
    public void testEmitRecord() {
        FlinkRecordsWithSplitIds forRecords = FlinkRecordsWithSplitIds.forRecords("", this.iter);
        forRecords.nextSplit();
        BulkFormat.RecordIterator recordIterator = (BulkFormat.RecordIterator) forRecords.nextRecordFromSplit();
        Assertions.assertThat(recordIterator).isNotNull();
        FlinkRecordsWithSplitIds.emitRecord(recordIterator, this.output, this.state);
        Assertions.assertThat(this.output.getEmittedRecords()).containsExactly(this.rows);
        Assertions.assertThat(this.state.recordsToSkip()).isEqualTo(2L);
        Assertions.assertThat(forRecords.nextRecordFromSplit()).isNull();
    }

    @Test
    void testEmptySplits() {
        Assertions.assertThat(FlinkRecordsWithSplitIds.finishedSplit("empty").finishedSplits()).isEqualTo(Collections.singleton("empty"));
    }

    @Test
    void testMoveToFirstSplit() {
        Assertions.assertThat("splitId").isEqualTo(FlinkRecordsWithSplitIds.forRecords("splitId", new SingletonResultIterator()).nextSplit());
    }

    @Test
    void testMoveToSecondSplit() {
        FlinkRecordsWithSplitIds forRecords = FlinkRecordsWithSplitIds.forRecords("splitId", new SingletonResultIterator());
        forRecords.nextSplit();
        Assertions.assertThat(forRecords.nextSplit()).isNull();
    }

    @Test
    void testRecordsFromFirstSplit() {
        SingletonResultIterator singletonResultIterator = new SingletonResultIterator();
        singletonResultIterator.set(GenericRowData.of(new Object[]{"test"}), 18L, 99L);
        FlinkRecordsWithSplitIds forRecords = FlinkRecordsWithSplitIds.forRecords("splitId", singletonResultIterator);
        forRecords.nextSplit();
        Assertions.assertThat(forRecords.nextRecordFromSplit()).isSameAs(singletonResultIterator);
        Assertions.assertThat(forRecords.nextRecordFromSplit()).isNull();
    }

    @Test
    void testRecordsInitiallyIllegal() {
        FlinkRecordsWithSplitIds forRecords = FlinkRecordsWithSplitIds.forRecords("splitId", new SingletonResultIterator());
        forRecords.getClass();
        Assertions.assertThatThrownBy(forRecords::nextRecordFromSplit).isInstanceOf(IllegalStateException.class);
    }

    @Test
    void testRecordsOnSecondSplitIllegal() {
        FlinkRecordsWithSplitIds forRecords = FlinkRecordsWithSplitIds.forRecords("splitId", new SingletonResultIterator());
        forRecords.nextSplit();
        forRecords.nextSplit();
        forRecords.getClass();
        Assertions.assertThatThrownBy(forRecords::nextRecordFromSplit).isInstanceOf(IllegalStateException.class);
    }

    @Test
    void testRecycleExhaustedBatch() {
        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        SingletonResultIterator singletonResultIterator = new SingletonResultIterator(() -> {
            atomicBoolean.set(true);
        });
        singletonResultIterator.set(GenericRowData.of(new Object[0]), 1L, 2L);
        FlinkRecordsWithSplitIds forRecords = FlinkRecordsWithSplitIds.forRecords("test split", singletonResultIterator);
        forRecords.nextSplit();
        forRecords.nextRecordFromSplit();
        Assertions.assertThat(forRecords.nextRecordFromSplit()).isNull();
        Assertions.assertThat(forRecords.nextSplit()).isNull();
        forRecords.recycle();
        Assertions.assertThat(atomicBoolean.get()).isTrue();
    }

    @Test
    void testRecycleNonExhaustedBatch() {
        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        SingletonResultIterator singletonResultIterator = new SingletonResultIterator(() -> {
            atomicBoolean.set(true);
        });
        singletonResultIterator.set(GenericRowData.of(new Object[0]), 1L, 2L);
        FlinkRecordsWithSplitIds forRecords = FlinkRecordsWithSplitIds.forRecords("test split", singletonResultIterator);
        forRecords.nextSplit();
        forRecords.recycle();
        Assertions.assertThat(atomicBoolean.get()).isTrue();
    }
}
