package org.apache.paimon.operation;

import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.ThreadLocalRandom;
import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.KeyValue;
import org.apache.paimon.Snapshot;
import org.apache.paimon.TestFileStore;
import org.apache.paimon.TestKeyValueGenerator;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.flink.util.ReadWriteTableTestUtil;
import org.apache.paimon.fs.local.LocalFileIO;
import org.apache.paimon.index.IndexFileHandler;
import org.apache.paimon.index.IndexFileMeta;
import org.apache.paimon.manifest.IndexManifestEntry;
import org.apache.paimon.manifest.ManifestCommittable;
import org.apache.paimon.mergetree.compact.DeduplicateMergeFunction;
import org.apache.paimon.predicate.PredicateBuilder;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.schema.SchemaUtils;
import org.apache.paimon.testutils.assertj.AssertionUtils;
import org.apache.paimon.types.RowKind;
import org.apache.paimon.utils.FailingFileIO;
import org.apache.paimon.utils.RowDataToObjectArrayConverter;
import org.apache.paimon.utils.SnapshotManager;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.ThrowingConsumer;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.CsvSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/paimon/operation/FileStoreCommitTest.class */
public class FileStoreCommitTest {
    private static final Logger LOG = LoggerFactory.getLogger(FileStoreCommitTest.class);
    private TestKeyValueGenerator gen;
    private String failingName;

    @TempDir
    Path tempDir;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: org.apache.paimon.operation.FileStoreCommitTest$1, reason: invalid class name */
    /* loaded from: input_file:org/apache/paimon/operation/FileStoreCommitTest$1.class */
    public static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$paimon$types$RowKind = new int[RowKind.values().length];

        static {
            try {
                $SwitchMap$org$apache$paimon$types$RowKind[RowKind.INSERT.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$paimon$types$RowKind[RowKind.UPDATE_AFTER.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$org$apache$paimon$types$RowKind[RowKind.UPDATE_BEFORE.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
            try {
                $SwitchMap$org$apache$paimon$types$RowKind[RowKind.DELETE.ordinal()] = 4;
            } catch (NoSuchFieldError e4) {
            }
        }
    }

    @BeforeEach
    public void beforeEach() {
        this.gen = new TestKeyValueGenerator();
        this.failingName = UUID.randomUUID().toString();
        FailingFileIO.reset(this.failingName, 100, 100);
    }

    @AfterEach
    public void afterEach() {
        Predicate predicate = path -> {
            return path.toString().contains(this.tempDir.toString());
        };
        Assertions.assertThat(FailingFileIO.openInputStreams(predicate)).isEmpty();
        Assertions.assertThat(FailingFileIO.openOutputStreams(predicate)).isEmpty();
    }

    @ParameterizedTest
    @CsvSource({"false,NONE", "false,INPUT", "false,FULL_COMPACTION", "true,NONE", "true,INPUT", "true,FULL_COMPACTION"})
    public void testSingleCommitUser(boolean z, String str) throws Exception {
        testRandomConcurrentNoConflict(1, z, CoreOptions.ChangelogProducer.valueOf(str));
    }

    @ParameterizedTest
    @CsvSource({"false,NONE", "false,INPUT", "false,FULL_COMPACTION", "true,NONE", "true,INPUT", "true,FULL_COMPACTION"})
    public void testManyCommitUsersNoConflict(boolean z, String str) throws Exception {
        testRandomConcurrentNoConflict(ThreadLocalRandom.current().nextInt(3) + 2, z, CoreOptions.ChangelogProducer.valueOf(str));
    }

    @ParameterizedTest
    @CsvSource({"false,NONE", "false,INPUT", "false,FULL_COMPACTION", "true,NONE", "true,INPUT", "true,FULL_COMPACTION"})
    public void testManyCommitUsersWithConflict(boolean z, String str) throws Exception {
        testRandomConcurrentWithConflict(ThreadLocalRandom.current().nextInt(3) + 2, z, CoreOptions.ChangelogProducer.valueOf(str));
    }

    @Test
    public void testLatestHint() throws Exception {
        testRandomConcurrentNoConflict(1, false, CoreOptions.ChangelogProducer.NONE);
        SnapshotManager snapshotManager = createStore(false, 1).snapshotManager();
        org.apache.paimon.fs.Path path = new org.apache.paimon.fs.Path(snapshotManager.snapshotDirectory(), "LATEST");
        Assertions.assertThat(new LocalFileIO().exists(path)).isTrue();
        Long latestSnapshotId = snapshotManager.latestSnapshotId();
        LocalFileIO.create().delete(path, false);
        Assertions.assertThat(snapshotManager.latestSnapshotId()).isEqualTo(latestSnapshotId);
    }

    @Test
    public void testFilterCommittedAfterExpire() throws Exception {
        testRandomConcurrentNoConflict(1, false, CoreOptions.ChangelogProducer.NONE);
        TestFileStore createStore = createStore(false);
        LocalFileIO.create().deleteQuietly(createStore.snapshotManager().snapshotPath(1L));
        createStore.newCommit(UUID.randomUUID().toString()).filterCommitted(Collections.singletonList(new ManifestCommittable(999L)));
    }

    @Test
    public void testFilterAllCommits() throws Exception {
        testRandomConcurrentNoConflict(1, false, CoreOptions.ChangelogProducer.NONE);
        TestFileStore createStore = createStore(false);
        SnapshotManager snapshotManager = createStore.snapshotManager();
        long longValue = snapshotManager.latestSnapshotId().longValue();
        LinkedHashSet linkedHashSet = new LinkedHashSet();
        String str = "";
        long j = 1;
        while (true) {
            long j2 = j;
            if (j2 > longValue) {
                Assertions.assertThat(createStore.newCommit(str).filterCommitted((List) linkedHashSet.stream().map((v1) -> {
                    return new ManifestCommittable(v1);
                }).collect(Collectors.toList()))).isEmpty();
                return;
            }
            Snapshot snapshot = snapshotManager.snapshot(j2);
            linkedHashSet.add(Long.valueOf(snapshot.commitIdentifier()));
            str = snapshot.commitUser();
            j = j2 + 1;
        }
    }

    protected void testRandomConcurrentNoConflict(int i, boolean z, CoreOptions.ChangelogProducer changelogProducer) throws Exception {
        Map<BinaryRow, List<KeyValue>> generateData = generateData(ThreadLocalRandom.current().nextInt(1000) + 1);
        logData(() -> {
            return (List) generateData.values().stream().flatMap((v0) -> {
                return v0.stream();
            }).collect(Collectors.toList());
        }, "input");
        ArrayList arrayList = new ArrayList();
        for (int i2 = 0; i2 < i; i2++) {
            arrayList.add(new HashMap());
        }
        for (Map.Entry<BinaryRow, List<KeyValue>> entry : generateData.entrySet()) {
            arrayList.get(ThreadLocalRandom.current().nextInt(i)).put(entry.getKey(), entry.getValue());
        }
        testRandomConcurrent(arrayList, changelogProducer == CoreOptions.ChangelogProducer.NONE, z, changelogProducer);
    }

    protected void testRandomConcurrentWithConflict(int i, boolean z, CoreOptions.ChangelogProducer changelogProducer) throws Exception {
        Map<BinaryRow, List<KeyValue>> generateData = generateData(ThreadLocalRandom.current().nextInt(1000) + 1);
        logData(() -> {
            return (List) generateData.values().stream().flatMap((v0) -> {
                return v0.stream();
            }).collect(Collectors.toList());
        }, "input");
        ArrayList arrayList = new ArrayList();
        for (int i2 = 0; i2 < i; i2++) {
            arrayList.add(new HashMap());
        }
        for (Map.Entry<BinaryRow, List<KeyValue>> entry : generateData.entrySet()) {
            for (KeyValue keyValue : entry.getValue()) {
                arrayList.get(Math.abs(keyValue.key().hashCode()) % i).computeIfAbsent(entry.getKey(), binaryRow -> {
                    return new ArrayList();
                }).add(keyValue);
            }
        }
        testRandomConcurrent(arrayList, false, z, changelogProducer);
    }

    private void testRandomConcurrent(List<Map<BinaryRow, List<KeyValue>>> list, boolean z, boolean z2, CoreOptions.ChangelogProducer changelogProducer) throws Exception {
        ArrayList<TestCommitThread> arrayList = new ArrayList();
        Iterator<Map<BinaryRow, List<KeyValue>>> it = list.iterator();
        while (it.hasNext()) {
            TestCommitThread testCommitThread = new TestCommitThread(TestKeyValueGenerator.KEY_TYPE, TestKeyValueGenerator.DEFAULT_ROW_TYPE, z, it.next(), createStore(z2, 1, changelogProducer), createStore(false, 1, changelogProducer));
            testCommitThread.start();
            arrayList.add(testCommitThread);
        }
        TestFileStore createStore = createStore(false, 1, changelogProducer);
        ArrayList arrayList2 = new ArrayList();
        for (TestCommitThread testCommitThread2 : arrayList) {
            testCommitThread2.join();
            arrayList2.addAll(testCommitThread2.getResult());
        }
        Map<BinaryRow, BinaryRow> kvMap = createStore.toKvMap(arrayList2);
        Long latestSnapshotId = createStore.snapshotManager().latestSnapshotId();
        Assertions.assertThat(latestSnapshotId).isNotNull();
        List<KeyValue> readKvsFromSnapshot = createStore.readKvsFromSnapshot(latestSnapshotId.longValue());
        this.gen.sort(readKvsFromSnapshot);
        logData(() -> {
            return readKvsFromSnapshot;
        }, "raw read results");
        Map<BinaryRow, BinaryRow> kvMap2 = createStore.toKvMap(readKvsFromSnapshot);
        logData(() -> {
            return kvMapToKvList(kvMap);
        }, "expected");
        logData(() -> {
            return kvMapToKvList(kvMap2);
        }, "actual");
        Assertions.assertThat(kvMap2).isEqualTo(kvMap);
        if (changelogProducer != CoreOptions.ChangelogProducer.NONE) {
            List<KeyValue> readAllChangelogUntilSnapshot = createStore.readAllChangelogUntilSnapshot(latestSnapshotId.longValue());
            logData(() -> {
                return readAllChangelogUntilSnapshot;
            }, "raw changelog results");
            Map<BinaryRow, BinaryRow> kvMap3 = createStore.toKvMap(readAllChangelogUntilSnapshot);
            logData(() -> {
                return kvMapToKvList(kvMap3);
            }, "actual changelog map");
            Assertions.assertThat(kvMap3).isEqualTo(kvMap);
            if (changelogProducer == CoreOptions.ChangelogProducer.FULL_COMPACTION) {
                validateFullChangelog(readAllChangelogUntilSnapshot);
            }
        }
    }

    private void validateFullChangelog(List<KeyValue> list) {
        HashMap hashMap = new HashMap();
        HashMap hashMap2 = new HashMap();
        for (KeyValue keyValue : list) {
            BinaryRow copy = TestKeyValueGenerator.KEY_SERIALIZER.toBinaryRow(keyValue.key()).copy();
            switch (AnonymousClass1.$SwitchMap$org$apache$paimon$types$RowKind[keyValue.valueKind().ordinal()]) {
                case 1:
                    Assertions.assertThat(hashMap).doesNotContainKey(copy);
                    if (hashMap2.containsKey(copy)) {
                        Assertions.assertThat((Comparable) hashMap2.get(copy)).isEqualTo(RowKind.DELETE);
                    }
                    hashMap.put(copy, keyValue);
                    hashMap2.put(copy, RowKind.INSERT);
                    break;
                case ReadWriteTableTestUtil.DEFAULT_PARALLELISM /* 2 */:
                    Assertions.assertThat(hashMap).doesNotContainKey(copy);
                    Assertions.assertThat((Comparable) hashMap2.get(copy)).isEqualTo(RowKind.UPDATE_BEFORE);
                    hashMap.put(copy, keyValue);
                    hashMap2.put(copy, RowKind.UPDATE_AFTER);
                    break;
                case 3:
                case 4:
                    Assertions.assertThat(hashMap).containsKey(copy);
                    Assertions.assertThat(keyValue.value()).isEqualTo(((KeyValue) hashMap.get(copy)).value());
                    Assertions.assertThat((Comparable) hashMap2.get(copy)).isIn(new Object[]{RowKind.INSERT, RowKind.UPDATE_AFTER});
                    hashMap.remove(copy);
                    hashMap2.put(copy, keyValue.valueKind());
                    break;
                default:
                    throw new UnsupportedOperationException("Unknown value kind " + keyValue.valueKind().name());
            }
        }
    }

    @Test
    public void testOverwritePartialCommit() throws Exception {
        Map<BinaryRow, List<KeyValue>> generateData = generateData(ThreadLocalRandom.current().nextInt(1000) + 1);
        logData(() -> {
            return (List) generateData.values().stream().flatMap((v0) -> {
                return v0.stream();
            }).collect(Collectors.toList());
        }, "data1");
        TestFileStore createStore = createStore(false);
        List<KeyValue> list = (List) generateData.values().stream().flatMap((v0) -> {
            return v0.stream();
        }).collect(Collectors.toList());
        TestKeyValueGenerator testKeyValueGenerator = this.gen;
        testKeyValueGenerator.getClass();
        createStore.commitData(list, testKeyValueGenerator::getPartition, keyValue -> {
            return 0;
        });
        String binaryString = ((BinaryRow) new ArrayList(generateData.keySet()).get(ThreadLocalRandom.current().nextInt(generateData.size()))).getString(0).toString();
        HashMap hashMap = new HashMap();
        hashMap.put("dt", binaryString);
        if (LOG.isDebugEnabled()) {
            LOG.debug("dtToOverwrite " + binaryString);
        }
        Map<BinaryRow, List<KeyValue>> generateData2 = generateData(ThreadLocalRandom.current().nextInt(1000) + 1);
        generateData2.entrySet().removeIf(entry -> {
            return !binaryString.equals(((BinaryRow) entry.getKey()).getString(0).toString());
        });
        logData(() -> {
            return (List) generateData2.values().stream().flatMap((v0) -> {
                return v0.stream();
            }).collect(Collectors.toList());
        }, "data2");
        List<KeyValue> list2 = (List) generateData2.values().stream().flatMap((v0) -> {
            return v0.stream();
        }).collect(Collectors.toList());
        TestKeyValueGenerator testKeyValueGenerator2 = this.gen;
        testKeyValueGenerator2.getClass();
        Assertions.assertThat(createStore.overwriteData(list2, testKeyValueGenerator2::getPartition, keyValue2 -> {
            return 0;
        }, hashMap).get(0).commitKind()).isEqualTo(Snapshot.CommitKind.OVERWRITE);
        ArrayList arrayList = new ArrayList();
        for (Map.Entry<BinaryRow, List<KeyValue>> entry2 : generateData.entrySet()) {
            if (!binaryString.equals(entry2.getKey().getString(0).toString())) {
                arrayList.addAll(entry2.getValue());
            }
        }
        Collection<List<KeyValue>> values = generateData2.values();
        arrayList.getClass();
        values.forEach((v1) -> {
            r1.addAll(v1);
        });
        this.gen.sort(arrayList);
        Map<BinaryRow, BinaryRow> kvMap = createStore.toKvMap(arrayList);
        List<KeyValue> readKvsFromSnapshot = createStore.readKvsFromSnapshot(createStore.snapshotManager().latestSnapshotId().longValue());
        this.gen.sort(readKvsFromSnapshot);
        Map<BinaryRow, BinaryRow> kvMap2 = createStore.toKvMap(readKvsFromSnapshot);
        logData(() -> {
            return kvMapToKvList(kvMap);
        }, "expected");
        logData(() -> {
            return kvMapToKvList(kvMap2);
        }, "actual");
        Assertions.assertThat(kvMap2).isEqualTo(kvMap);
    }

    @Test
    public void testSnapshotAddLogOffset() throws Exception {
        TestFileStore createStore = createStore(false, 2);
        HashMap hashMap = new HashMap();
        hashMap.put(0, 1L);
        hashMap.put(1, 3L);
        List<KeyValue> generateDataList = generateDataList(10);
        TestKeyValueGenerator testKeyValueGenerator = this.gen;
        testKeyValueGenerator.getClass();
        Assertions.assertThat(createStore.commitData(generateDataList, testKeyValueGenerator::getPartition, keyValue -> {
            return 0;
        }, hashMap).get(0).logOffsets()).isEqualTo(hashMap);
        HashMap hashMap2 = new HashMap();
        hashMap2.put(1, 8L);
        List<KeyValue> generateDataList2 = generateDataList(10);
        TestKeyValueGenerator testKeyValueGenerator2 = this.gen;
        testKeyValueGenerator2.getClass();
        Snapshot snapshot = createStore.commitData(generateDataList2, testKeyValueGenerator2::getPartition, keyValue2 -> {
            return 0;
        }, hashMap2).get(0);
        HashMap hashMap3 = new HashMap();
        hashMap3.put(0, 1L);
        hashMap3.put(1, 8L);
        Assertions.assertThat(snapshot.logOffsets()).isEqualTo(hashMap3);
    }

    @Test
    public void testSnapshotRecordCount() throws Exception {
        TestFileStore createStore = createStore(false);
        List<KeyValue> generateDataList = generateDataList(10);
        TestKeyValueGenerator testKeyValueGenerator = this.gen;
        testKeyValueGenerator.getClass();
        Snapshot snapshot = createStore.commitData(generateDataList, testKeyValueGenerator::getPartition, keyValue -> {
            return 0;
        }, Collections.emptyMap()).get(0);
        long longValue = snapshot.deltaRecordCount().longValue();
        Assertions.assertThat(longValue).isNotEqualTo(0L);
        Assertions.assertThat(snapshot.totalRecordCount()).isEqualTo(longValue);
        Assertions.assertThat(snapshot.changelogRecordCount()).isEqualTo(0L);
        List<KeyValue> generateDataList2 = generateDataList(20);
        TestKeyValueGenerator testKeyValueGenerator2 = this.gen;
        testKeyValueGenerator2.getClass();
        Snapshot snapshot2 = createStore.commitData(generateDataList2, testKeyValueGenerator2::getPartition, keyValue2 -> {
            return 0;
        }, Collections.emptyMap()).get(0);
        long longValue2 = snapshot2.deltaRecordCount().longValue();
        Assertions.assertThat(longValue2).isNotEqualTo(0L);
        Assertions.assertThat(snapshot2.totalRecordCount()).isEqualTo(snapshot.totalRecordCount().longValue() + longValue2);
        Assertions.assertThat(snapshot2.changelogRecordCount()).isEqualTo(0L);
        List<KeyValue> generateDataList3 = generateDataList(30);
        TestKeyValueGenerator testKeyValueGenerator3 = this.gen;
        testKeyValueGenerator3.getClass();
        Snapshot snapshot3 = createStore.commitData(generateDataList3, testKeyValueGenerator3::getPartition, keyValue3 -> {
            return 0;
        }, Collections.emptyMap()).get(0);
        long longValue3 = snapshot3.deltaRecordCount().longValue();
        Assertions.assertThat(longValue3).isNotEqualTo(0L);
        Assertions.assertThat(snapshot3.totalRecordCount()).isEqualTo(snapshot2.totalRecordCount().longValue() + longValue3);
        Assertions.assertThat(snapshot3.changelogRecordCount()).isEqualTo(0L);
    }

    @Test
    public void testCommitEmpty() throws Exception {
        TestFileStore createStore = createStore(false, 2);
        List<KeyValue> generateDataList = generateDataList(10);
        TestKeyValueGenerator testKeyValueGenerator = this.gen;
        testKeyValueGenerator.getClass();
        Snapshot snapshot = createStore.commitData(generateDataList, testKeyValueGenerator::getPartition, keyValue -> {
            return 0;
        }, Collections.emptyMap()).get(0);
        List<KeyValue> emptyList = Collections.emptyList();
        TestKeyValueGenerator testKeyValueGenerator2 = this.gen;
        testKeyValueGenerator2.getClass();
        createStore.commitDataImpl(emptyList, testKeyValueGenerator2::getPartition, keyValue2 -> {
            return 0;
        }, false, null, null, Collections.emptyList(), (fileStoreCommit, manifestCommittable) -> {
            fileStoreCommit.commit(manifestCommittable, Collections.emptyMap());
        });
        Assertions.assertThat(createStore.snapshotManager().latestSnapshotId()).isEqualTo(snapshot.id());
        List<KeyValue> emptyList2 = Collections.emptyList();
        TestKeyValueGenerator testKeyValueGenerator3 = this.gen;
        testKeyValueGenerator3.getClass();
        createStore.commitDataImpl(emptyList2, testKeyValueGenerator3::getPartition, keyValue3 -> {
            return 0;
        }, false, null, null, Collections.emptyList(), (fileStoreCommit2, manifestCommittable2) -> {
            fileStoreCommit2.ignoreEmptyCommit(false);
            fileStoreCommit2.commit(manifestCommittable2, Collections.emptyMap());
        });
        Assertions.assertThat(createStore.snapshotManager().latestSnapshotId()).isEqualTo(snapshot.id() + 1);
    }

    @Test
    public void testCommitOldSnapshotAgain() throws Exception {
        TestFileStore createStore = createStore(false, 2);
        ArrayList arrayList = new ArrayList();
        for (int i = 0; i < 3; i++) {
            List<KeyValue> generateDataList = generateDataList(10);
            TestKeyValueGenerator testKeyValueGenerator = this.gen;
            testKeyValueGenerator.getClass();
            createStore.commitDataImpl(generateDataList, testKeyValueGenerator::getPartition, keyValue -> {
                return 0;
            }, false, Long.valueOf(i), null, Collections.emptyList(), (fileStoreCommit, manifestCommittable) -> {
                fileStoreCommit.commit(manifestCommittable, Collections.emptyMap());
                arrayList.add(manifestCommittable);
            });
        }
        for (int i2 = 0; i2 < 3; i2++) {
            Assertions.assertThatThrownBy(() -> {
                createStore.newCommit().commit((ManifestCommittable) arrayList.get(0), Collections.emptyMap());
            }).isInstanceOf(RuntimeException.class).hasMessageContaining("Give up committing.");
        }
    }

    @Test
    public void testCommitWatermarkWithValue() throws Exception {
        TestFileStore createStore = createStore(false, 2);
        List<KeyValue> generateDataList = generateDataList(10);
        TestKeyValueGenerator testKeyValueGenerator = this.gen;
        testKeyValueGenerator.getClass();
        Assertions.assertThat(createStore.commitDataWatermark(generateDataList, testKeyValueGenerator::getPartition, 1024L).get(0).watermark()).isEqualTo(1024L);
    }

    @Test
    public void testCommitWatermark() throws Exception {
        TestFileStore createStore = createStore(false, 2);
        List<KeyValue> generateDataList = generateDataList(10);
        TestKeyValueGenerator testKeyValueGenerator = this.gen;
        testKeyValueGenerator.getClass();
        Assertions.assertThat(createStore.commitDataWatermark(generateDataList, testKeyValueGenerator::getPartition, null).get(0).watermark()).isEqualTo((Object) null);
        List<KeyValue> generateDataList2 = generateDataList(10);
        TestKeyValueGenerator testKeyValueGenerator2 = this.gen;
        testKeyValueGenerator2.getClass();
        Assertions.assertThat(createStore.commitDataWatermark(generateDataList2, testKeyValueGenerator2::getPartition, 1024L).get(0).watermark()).isEqualTo(1024L);
        List<KeyValue> generateDataList3 = generateDataList(10);
        TestKeyValueGenerator testKeyValueGenerator3 = this.gen;
        testKeyValueGenerator3.getClass();
        Assertions.assertThat(createStore.commitDataWatermark(generateDataList3, testKeyValueGenerator3::getPartition, 600L).get(0).watermark()).isEqualTo(1024L);
        List<KeyValue> generateDataList4 = generateDataList(10);
        TestKeyValueGenerator testKeyValueGenerator4 = this.gen;
        testKeyValueGenerator4.getClass();
        Assertions.assertThat(createStore.commitDataWatermark(generateDataList4, testKeyValueGenerator4::getPartition, null).get(0).watermark()).isEqualTo(1024L);
        List<KeyValue> generateDataList5 = generateDataList(10);
        TestKeyValueGenerator testKeyValueGenerator5 = this.gen;
        testKeyValueGenerator5.getClass();
        Assertions.assertThat(createStore.commitDataWatermark(generateDataList5, testKeyValueGenerator5::getPartition, 2048L).get(0).watermark()).isEqualTo(2048L);
    }

    @Test
    public void testDropPartitions() throws Exception {
        Map<BinaryRow, List<KeyValue>> generateData = generateData(ThreadLocalRandom.current().nextInt(50, 1000));
        logData(() -> {
            return (List) generateData.values().stream().flatMap((v0) -> {
                return v0.stream();
            }).collect(Collectors.toList());
        }, "data");
        TestFileStore createStore = createStore(false);
        List<KeyValue> list = (List) generateData.values().stream().flatMap((v0) -> {
            return v0.stream();
        }).collect(Collectors.toList());
        TestKeyValueGenerator testKeyValueGenerator = this.gen;
        testKeyValueGenerator.getClass();
        createStore.commitData(list, testKeyValueGenerator::getPartition, keyValue -> {
            return 0;
        });
        ThreadLocalRandom current = ThreadLocalRandom.current();
        int nextInt = current.nextInt(generateData.size()) + 1;
        boolean nextBoolean = current.nextBoolean();
        int nextInt2 = current.nextInt((generateData.size() - nextInt) + 1);
        ArrayList arrayList = new ArrayList();
        for (int i = 0; i < nextInt; i++) {
            HashMap hashMap = new HashMap();
            hashMap.put("dt", ((BinaryRow) new ArrayList(generateData.keySet()).get(nextInt2)).getString(0).toString());
            if (nextBoolean && current.nextBoolean()) {
                hashMap.put("hr", String.valueOf(((BinaryRow) new ArrayList(generateData.keySet()).get(nextInt2)).getInt(1)));
            }
            nextInt2++;
            arrayList.add(hashMap);
        }
        if (LOG.isDebugEnabled()) {
            LOG.debug("partitionsToDrop " + ((String) arrayList.stream().map((v0) -> {
                return Objects.toString(v0);
            }).collect(Collectors.joining(","))));
        }
        Assertions.assertThat(createStore.dropPartitions(arrayList).commitKind()).isEqualTo(Snapshot.CommitKind.OVERWRITE);
        RowDataToObjectArrayConverter rowDataToObjectArrayConverter = new RowDataToObjectArrayConverter(TestKeyValueGenerator.DEFAULT_PART_TYPE);
        org.apache.paimon.predicate.Predicate predicate = (org.apache.paimon.predicate.Predicate) arrayList.stream().map(map -> {
            return PredicateBuilder.partition(map, TestKeyValueGenerator.DEFAULT_PART_TYPE);
        }).reduce((predicate2, predicate3) -> {
            return PredicateBuilder.or(new org.apache.paimon.predicate.Predicate[]{predicate2, predicate3});
        }).get();
        ArrayList arrayList2 = new ArrayList();
        for (Map.Entry<BinaryRow, List<KeyValue>> entry : generateData.entrySet()) {
            if (!predicate.test(rowDataToObjectArrayConverter.convert(entry.getKey()))) {
                arrayList2.addAll(entry.getValue());
            }
        }
        this.gen.sort(arrayList2);
        Map<BinaryRow, BinaryRow> kvMap = createStore.toKvMap(arrayList2);
        List<KeyValue> readKvsFromSnapshot = createStore.readKvsFromSnapshot(createStore.snapshotManager().latestSnapshotId().longValue());
        this.gen.sort(readKvsFromSnapshot);
        Map<BinaryRow, BinaryRow> kvMap2 = createStore.toKvMap(readKvsFromSnapshot);
        logData(() -> {
            return kvMapToKvList(kvMap);
        }, "expected");
        logData(() -> {
            return kvMapToKvList(kvMap2);
        }, "actual");
        Assertions.assertThat(kvMap2).isEqualTo(kvMap);
    }

    @Test
    public void testDropEmptyPartition() throws Exception {
        TestFileStore createStore = createStore(false);
        Assertions.assertThatThrownBy(() -> {
            createStore.dropPartitions(Collections.emptyList());
        }).satisfies(new ThrowingConsumer[]{AssertionUtils.anyCauseMatches(IllegalArgumentException.class, "Partitions list cannot be empty.")});
    }

    @Test
    public void testIndexFiles() throws Exception {
        TestFileStore createStore = createStore(false, 2);
        IndexFileHandler newIndexFileHandler = createStore.newIndexFileHandler();
        KeyValue next = this.gen.next();
        BinaryRow partition = this.gen.getPartition(next);
        KeyValue keyValue = next;
        BinaryRow binaryRow = partition;
        while (true) {
            BinaryRow binaryRow2 = binaryRow;
            if (!partition.equals(binaryRow2)) {
                TestKeyValueGenerator testKeyValueGenerator = this.gen;
                testKeyValueGenerator.getClass();
                createStore.commitDataIndex(next, testKeyValueGenerator::getPartition, 0, newIndexFileHandler.writeHashIndex(new int[]{1, 2, 5}));
                TestKeyValueGenerator testKeyValueGenerator2 = this.gen;
                testKeyValueGenerator2.getClass();
                createStore.commitDataIndex(next, testKeyValueGenerator2::getPartition, 1, newIndexFileHandler.writeHashIndex(new int[]{6, 8}));
                TestKeyValueGenerator testKeyValueGenerator3 = this.gen;
                testKeyValueGenerator3.getClass();
                createStore.commitDataIndex(keyValue, testKeyValueGenerator3::getPartition, 2, newIndexFileHandler.writeHashIndex(new int[]{3, 5}));
                Snapshot latestSnapshot = createStore.snapshotManager().latestSnapshot();
                List scan = newIndexFileHandler.scan(latestSnapshot.id(), "HASH", partition);
                Assertions.assertThat(scan.size()).isEqualTo(2);
                Assertions.assertThat(((IndexManifestEntry) scan.get(0)).bucket()).isEqualTo(0);
                Assertions.assertThat(newIndexFileHandler.readHashIndexList(((IndexManifestEntry) scan.get(0)).indexFile())).containsExactlyInAnyOrder(new Integer[]{1, 2, 5});
                Assertions.assertThat(((IndexManifestEntry) scan.get(1)).bucket()).isEqualTo(1);
                Assertions.assertThat(newIndexFileHandler.readHashIndexList(((IndexManifestEntry) scan.get(1)).indexFile())).containsExactlyInAnyOrder(new Integer[]{6, 8});
                List scan2 = newIndexFileHandler.scan(latestSnapshot.id(), "HASH", binaryRow2);
                Assertions.assertThat(scan2.size()).isEqualTo(1);
                Assertions.assertThat(((IndexManifestEntry) scan2.get(0)).bucket()).isEqualTo(2);
                Assertions.assertThat(newIndexFileHandler.readHashIndexList(((IndexManifestEntry) scan2.get(0)).indexFile())).containsExactlyInAnyOrder(new Integer[]{3, 5});
                TestKeyValueGenerator testKeyValueGenerator4 = this.gen;
                testKeyValueGenerator4.getClass();
                createStore.commitDataIndex(next, testKeyValueGenerator4::getPartition, 0, newIndexFileHandler.writeHashIndex(new int[]{1, 4}));
                Snapshot latestSnapshot2 = createStore.snapshotManager().latestSnapshot();
                List scan3 = newIndexFileHandler.scan(latestSnapshot2.id(), "HASH", partition);
                Assertions.assertThat(scan3.size()).isEqualTo(2);
                Assertions.assertThat(((IndexManifestEntry) scan3.get(0)).bucket()).isEqualTo(0);
                Assertions.assertThat(newIndexFileHandler.readHashIndexList(((IndexManifestEntry) scan3.get(0)).indexFile())).containsExactlyInAnyOrder(new Integer[]{1, 4});
                Assertions.assertThat(((IndexManifestEntry) scan3.get(1)).bucket()).isEqualTo(1);
                Assertions.assertThat(newIndexFileHandler.readHashIndexList(((IndexManifestEntry) scan3.get(1)).indexFile())).containsExactlyInAnyOrder(new Integer[]{6, 8});
                Optional scan4 = newIndexFileHandler.scan(latestSnapshot2.id(), "HASH", partition, 0);
                Assertions.assertThat(scan4).isPresent();
                Assertions.assertThat(newIndexFileHandler.readHashIndexList((IndexFileMeta) scan4.get())).containsExactlyInAnyOrder(new Integer[]{1, 4});
                createStore.options().toConfiguration().set(CoreOptions.DYNAMIC_PARTITION_OVERWRITE, true);
                List<KeyValue> singletonList = Collections.singletonList(next);
                TestKeyValueGenerator testKeyValueGenerator5 = this.gen;
                testKeyValueGenerator5.getClass();
                createStore.overwriteData(singletonList, testKeyValueGenerator5::getPartition, keyValue2 -> {
                    return 0;
                }, new HashMap());
                Snapshot latestSnapshot3 = createStore.snapshotManager().latestSnapshot();
                Assertions.assertThat(newIndexFileHandler.scan(latestSnapshot3.id(), "HASH", partition, 0)).isEmpty();
                Assertions.assertThat(newIndexFileHandler.scan(latestSnapshot3.id(), "HASH", binaryRow2, 2)).isPresent();
                createStore.options().toConfiguration().set(CoreOptions.DYNAMIC_PARTITION_OVERWRITE, false);
                List<KeyValue> singletonList2 = Collections.singletonList(next);
                TestKeyValueGenerator testKeyValueGenerator6 = this.gen;
                testKeyValueGenerator6.getClass();
                createStore.overwriteData(singletonList2, testKeyValueGenerator6::getPartition, keyValue3 -> {
                    return 0;
                }, new HashMap());
                Assertions.assertThat(newIndexFileHandler.scan(createStore.snapshotManager().latestSnapshot().id(), "HASH", binaryRow2, 2)).isEmpty();
                return;
            }
            keyValue = this.gen.next();
            binaryRow = this.gen.getPartition(keyValue);
        }
    }

    private TestFileStore createStore(boolean z) throws Exception {
        return createStore(z, 1);
    }

    private TestFileStore createStore(boolean z, int i) throws Exception {
        return createStore(z, i, CoreOptions.ChangelogProducer.NONE);
    }

    private TestFileStore createStore(boolean z, int i, CoreOptions.ChangelogProducer changelogProducer) throws Exception {
        String failingPath = z ? FailingFileIO.getFailingPath(this.failingName, this.tempDir.toString()) : "traceable://" + this.tempDir.toString();
        SchemaUtils.forceCommit(new SchemaManager(new LocalFileIO(), new org.apache.paimon.fs.Path(this.tempDir.toUri())), new Schema(TestKeyValueGenerator.DEFAULT_ROW_TYPE.getFields(), TestKeyValueGenerator.DEFAULT_PART_TYPE.getFieldNames(), TestKeyValueGenerator.getPrimaryKeys(TestKeyValueGenerator.GeneratorMode.MULTI_PARTITIONED), Collections.emptyMap(), (String) null));
        return new TestFileStore.Builder("avro", failingPath, i, TestKeyValueGenerator.DEFAULT_PART_TYPE, TestKeyValueGenerator.KEY_TYPE, TestKeyValueGenerator.DEFAULT_ROW_TYPE, TestKeyValueGenerator.TestKeyValueFieldsExtractor.EXTRACTOR, DeduplicateMergeFunction.factory()).changelogProducer(changelogProducer).build();
    }

    private List<KeyValue> generateDataList(int i) {
        return (List) generateData(i).values().stream().flatMap((v0) -> {
            return v0.stream();
        }).collect(Collectors.toList());
    }

    private Map<BinaryRow, List<KeyValue>> generateData(int i) {
        HashMap hashMap = new HashMap();
        for (int i2 = 0; i2 < i; i2++) {
            KeyValue next = this.gen.next();
            ((List) hashMap.computeIfAbsent(this.gen.getPartition(next), binaryRow -> {
                return new ArrayList();
            })).add(next);
        }
        return hashMap;
    }

    private List<KeyValue> kvMapToKvList(Map<BinaryRow, BinaryRow> map) {
        return (List) map.entrySet().stream().map(entry -> {
            return new KeyValue().replace((InternalRow) entry.getKey(), -1L, RowKind.INSERT, (InternalRow) entry.getValue());
        }).collect(Collectors.toList());
    }

    private void logData(Supplier<List<KeyValue>> supplier, String str) {
        if (LOG.isDebugEnabled()) {
            LOG.debug("========== Beginning of " + str + " ==========");
            Iterator<KeyValue> it = supplier.get().iterator();
            while (it.hasNext()) {
                LOG.debug(it.next().toString(TestKeyValueGenerator.KEY_TYPE, TestKeyValueGenerator.DEFAULT_ROW_TYPE));
            }
            LOG.debug("========== End of " + str + " ==========");
        }
    }
}
