package org.apache.paimon.flink.sink.cdc;

import java.lang.invoke.SerializedLambda;
import java.nio.file.Path;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.ThreadLocalRandom;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.catalog.CatalogUtils;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.flink.FlinkCatalogFactory;
import org.apache.paimon.flink.util.AbstractTestBase;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.local.LocalFileIO;
import org.apache.paimon.options.MemorySize;
import org.apache.paimon.options.Options;
import org.apache.paimon.reader.RecordReaderIterator;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.schema.SchemaUtils;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.shade.org.apache.parquet.hadoop.TestParquetWriterAppendBlocks;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.FileStoreTableFactory;
import org.apache.paimon.table.source.ReadBuilder;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.FailingFileIO;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.api.io.TempDir;

/* loaded from: input_file:org/apache/paimon/flink/sink/cdc/FlinkCdcSyncTableSinkITCase.class */
public class FlinkCdcSyncTableSinkITCase extends AbstractTestBase {
    private static final String DATABASE_NAME = "test";
    private static final String TABLE_NAME = "test_tbl";

    @TempDir
    Path tempDir;

    @Timeout(120)
    @Test
    public void testRandomCdcEvents() throws Exception {
        innerTestRandomCdcEvents(ThreadLocalRandom.current().nextInt(5) + 1, false);
    }

    @Timeout(120)
    @Test
    public void testRandomCdcEventsDynamicBucket() throws Exception {
        innerTestRandomCdcEvents(-1, false);
    }

    @Timeout(120)
    @Test
    public void testRandomCdcEventsGlobalDynamicBucket() throws Exception {
        innerTestRandomCdcEvents(-1, true);
    }

    private void innerTestRandomCdcEvents(int i, boolean z) throws Exception {
        org.apache.paimon.fs.Path path;
        FailingFileIO create;
        ThreadLocalRandom current = ThreadLocalRandom.current();
        int nextInt = current.nextInt(1500) + 1;
        int min = Math.min(nextInt / 2, current.nextInt(10) + 1);
        int nextInt2 = current.nextInt(3) + 1;
        int nextInt3 = current.nextInt(150) + 1;
        boolean nextBoolean = current.nextBoolean();
        TestTable testTable = new TestTable(TABLE_NAME, nextInt, min, nextInt2, nextInt3);
        String uuid = UUID.randomUUID().toString();
        if (nextBoolean) {
            path = new org.apache.paimon.fs.Path(FailingFileIO.getFailingPath(uuid, CatalogUtils.stringifyPath(this.tempDir.toString(), DATABASE_NAME, TABLE_NAME)));
            create = new FailingFileIO();
        } else {
            path = new org.apache.paimon.fs.Path("traceable://" + CatalogUtils.stringifyPath(this.tempDir.toString(), DATABASE_NAME, TABLE_NAME));
            create = LocalFileIO.create();
        }
        FailingFileIO.reset(uuid, 0, 1);
        FileStoreTable createFileStoreTable = createFileStoreTable(path, create, testTable.initialRowType(), Collections.singletonList("pt"), z ? Collections.singletonList("k") : Arrays.asList("pt", "k"), i);
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.getCheckpointConfig().setCheckpointInterval(100L);
        if (!nextBoolean) {
            executionEnvironment.setRestartStrategy(RestartStrategies.noRestart());
        }
        DataStreamSource addSource = executionEnvironment.addSource(new TestCdcSourceFunction(testTable.events()));
        addSource.setParallelism(2);
        Options options = new Options();
        options.set("warehouse", this.tempDir.toString());
        new CdcSinkBuilder().withInput(addSource).withParserFactory(TestCdcEventParser::new).withTable(createFileStoreTable).withParallelism(3).withIdentifier(Identifier.create(DATABASE_NAME, TABLE_NAME)).withCatalogLoader(() -> {
            return FlinkCatalogFactory.createPaimonCatalog(options);
        }).build();
        FailingFileIO.reset(uuid, 10, TestParquetWriterAppendBlocks.FILE_SIZE);
        executionEnvironment.execute();
        FailingFileIO.reset(uuid, 0, 1);
        FileStoreTable copyWithLatestSchema = createFileStoreTable.copyWithLatestSchema();
        TableSchema tableSchema = (TableSchema) new SchemaManager(copyWithLatestSchema.fileIO(), copyWithLatestSchema.location()).latest().get();
        ReadBuilder newReadBuilder = copyWithLatestSchema.newReadBuilder();
        RecordReaderIterator recordReaderIterator = new RecordReaderIterator(newReadBuilder.newRead().createReader(newReadBuilder.newScan().plan()));
        Throwable th = null;
        try {
            try {
                testTable.assertResult(tableSchema, recordReaderIterator);
                if (recordReaderIterator != null) {
                    if (0 == 0) {
                        recordReaderIterator.close();
                        return;
                    }
                    try {
                        recordReaderIterator.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (recordReaderIterator != null) {
                if (th != null) {
                    try {
                        recordReaderIterator.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    recordReaderIterator.close();
                }
            }
            throw th4;
        }
    }

    private FileStoreTable createFileStoreTable(org.apache.paimon.fs.Path path, FileIO fileIO, RowType rowType, List<String> list, List<String> list2, int i) throws Exception {
        Options options = new Options();
        options.set(CoreOptions.BUCKET, Integer.valueOf(i));
        options.set(CoreOptions.DYNAMIC_BUCKET_TARGET_ROW_NUM, 100L);
        options.set(CoreOptions.WRITE_BUFFER_SIZE, new MemorySize(12288L));
        options.set(CoreOptions.PAGE_SIZE, new MemorySize(4096L));
        return FileStoreTableFactory.create(fileIO, path, SchemaUtils.forceCommit(new SchemaManager(fileIO, path), new Schema(rowType.getFields(), list, list2, options.toMap(), "")));
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case 1109521201:
                if (implMethodName.equals("lambda$innerTestRandomCdcEvents$511b7700$1")) {
                    z = false;
                    break;
                }
                break;
            case 1818100338:
                if (implMethodName.equals("<init>")) {
                    z = true;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/paimon/catalog/Catalog$Loader") && serializedLambda.getFunctionalInterfaceMethodName().equals("load") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("()Lorg/apache/paimon/catalog/Catalog;") && serializedLambda.getImplClass().equals("org/apache/paimon/flink/sink/cdc/FlinkCdcSyncTableSinkITCase") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/paimon/options/Options;)Lorg/apache/paimon/catalog/Catalog;")) {
                    Options options = (Options) serializedLambda.getCapturedArg(0);
                    return () -> {
                        return FlinkCatalogFactory.createPaimonCatalog(options);
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 8 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/paimon/flink/sink/cdc/EventParser$Factory") && serializedLambda.getFunctionalInterfaceMethodName().equals("create") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("()Lorg/apache/paimon/flink/sink/cdc/EventParser;") && serializedLambda.getImplClass().equals("org/apache/paimon/flink/sink/cdc/TestCdcEventParser") && serializedLambda.getImplMethodSignature().equals("()V")) {
                    return TestCdcEventParser::new;
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
