package org.apache.paimon.flink.sink.cdc;

import java.lang.invoke.SerializedLambda;
import java.nio.file.Path;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Predicate;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.runtime.state.JavaSerializer;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.paimon.flink.sink.Committable;
import org.apache.paimon.flink.sink.CommittableTypeInfo;
import org.apache.paimon.flink.sink.StoreSinkWriteImpl;
import org.apache.paimon.fs.local.LocalFileIO;
import org.apache.paimon.options.Options;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.schema.SchemaChange;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.schema.SchemaUtils;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.FileStoreTableFactory;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.RowKind;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.TraceableFileIO;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.api.io.TempDir;

/* loaded from: input_file:org/apache/paimon/flink/sink/cdc/CdcRecordStoreWriteOperatorTest.class */
public class CdcRecordStoreWriteOperatorTest {

    @TempDir
    Path tempDir;
    private org.apache.paimon.fs.Path tablePath;
    private String commitUser;

    /* loaded from: input_file:org/apache/paimon/flink/sink/cdc/CdcRecordStoreWriteOperatorTest$Runner.class */
    private static class Runner implements Runnable {
        private final OneInputStreamOperatorTestHarness<CdcRecord, Committable> harness;
        private final BlockingQueue<CdcRecord> toProcess;
        private final BlockingQueue<CdcRecord> processed;
        private final AtomicBoolean running;

        private Runner(OneInputStreamOperatorTestHarness<CdcRecord, Committable> oneInputStreamOperatorTestHarness) {
            this.toProcess = new LinkedBlockingQueue();
            this.processed = new LinkedBlockingQueue();
            this.running = new AtomicBoolean(true);
            this.harness = oneInputStreamOperatorTestHarness;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void offer(CdcRecord cdcRecord) {
            this.toProcess.offer(cdcRecord);
        }

        /* JADX INFO: Access modifiers changed from: private */
        public CdcRecord take() throws Exception {
            return this.processed.take();
        }

        /* JADX INFO: Access modifiers changed from: private */
        public CdcRecord poll(long j) throws Exception {
            return this.processed.poll(j, TimeUnit.SECONDS);
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void stop() {
            this.running.set(false);
        }

        /* JADX WARN: Type inference failed for: r0v14, types: [org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness<org.apache.paimon.flink.sink.cdc.CdcRecord, org.apache.paimon.flink.sink.Committable>, org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness] */
        @Override // java.lang.Runnable
        public void run() {
            long j = 0;
            while (this.running.get()) {
                try {
                    if (this.toProcess.isEmpty()) {
                        Thread.sleep(10L);
                    } else {
                        CdcRecord poll = this.toProcess.poll();
                        ?? r0 = this.harness;
                        long j2 = j + 1;
                        j = r0;
                        r0.processElement(poll, j2);
                        this.processed.offer(poll);
                    }
                } catch (Exception e) {
                    throw new RuntimeException(e);
                }
            }
        }
    }

    @BeforeEach
    public void before() {
        this.tablePath = new org.apache.paimon.fs.Path("traceable://" + this.tempDir.toString());
        this.commitUser = UUID.randomUUID().toString();
    }

    @AfterEach
    public void after() {
        Predicate predicate = path -> {
            return path.toString().contains(this.tempDir.toString());
        };
        Assertions.assertThat(TraceableFileIO.openInputStreams(predicate)).isEmpty();
        Assertions.assertThat(TraceableFileIO.openOutputStreams(predicate)).isEmpty();
    }

    @Timeout(30)
    @Test
    public void testAddColumn() throws Exception {
        FileStoreTable createFileStoreTable = createFileStoreTable(RowType.of(new DataType[]{DataTypes.INT(), DataTypes.BIGINT(), DataTypes.STRING()}, new String[]{"pt", "k", "v"}), Collections.singletonList("pt"), Arrays.asList("pt", "k"));
        OneInputStreamOperatorTestHarness<CdcRecord, Committable> createTestHarness = createTestHarness(createFileStoreTable);
        createTestHarness.open();
        Runner runner = new Runner(createTestHarness);
        Thread thread = new Thread(runner);
        thread.start();
        HashMap hashMap = new HashMap();
        hashMap.put("pt", "0");
        hashMap.put("k", "1");
        hashMap.put("v", "10");
        CdcRecord cdcRecord = new CdcRecord(RowKind.INSERT, hashMap);
        runner.offer(cdcRecord);
        Assertions.assertThat(runner.take()).isEqualTo(cdcRecord);
        HashMap hashMap2 = new HashMap();
        hashMap2.put("pt", "0");
        hashMap2.put("k", "2");
        CdcRecord cdcRecord2 = new CdcRecord(RowKind.INSERT, hashMap2);
        runner.offer(cdcRecord2);
        Assertions.assertThat(runner.take()).isEqualTo(cdcRecord2);
        HashMap hashMap3 = new HashMap();
        hashMap3.put("pt", "0");
        hashMap3.put("k", "3");
        hashMap3.put("v", "30");
        hashMap3.put("v2", "300");
        CdcRecord cdcRecord3 = new CdcRecord(RowKind.INSERT, hashMap3);
        runner.offer(cdcRecord3);
        Assertions.assertThat(runner.poll(1L)).isNull();
        new SchemaManager(createFileStoreTable.fileIO(), createFileStoreTable.location()).commitChanges(new SchemaChange[]{SchemaChange.addColumn("v2", DataTypes.INT())});
        Assertions.assertThat(runner.take()).isEqualTo(cdcRecord3);
        runner.stop();
        thread.join();
        createTestHarness.close();
    }

    @Timeout(30)
    @Test
    public void testUpdateColumnType() throws Exception {
        FileStoreTable createFileStoreTable = createFileStoreTable(RowType.of(new DataType[]{DataTypes.INT(), DataTypes.INT(), DataTypes.FLOAT(), DataTypes.VARCHAR(5), DataTypes.VARBINARY(5)}, new String[]{"k", "v1", "v2", "v3", "v4"}), Collections.emptyList(), Collections.singletonList("k"));
        OneInputStreamOperatorTestHarness<CdcRecord, Committable> createTestHarness = createTestHarness(createFileStoreTable);
        createTestHarness.open();
        Runner runner = new Runner(createTestHarness);
        Thread thread = new Thread(runner);
        thread.start();
        HashMap hashMap = new HashMap();
        hashMap.put("k", "1");
        hashMap.put("v1", "10");
        hashMap.put("v2", "0.625");
        hashMap.put("v3", "one");
        hashMap.put("v4", "b_one");
        CdcRecord cdcRecord = new CdcRecord(RowKind.INSERT, hashMap);
        runner.offer(cdcRecord);
        Assertions.assertThat(runner.take()).isEqualTo(cdcRecord);
        HashMap hashMap2 = new HashMap();
        hashMap2.put("k", "2");
        hashMap2.put("v1", "12345678987654321");
        hashMap2.put("v2", "0.25");
        CdcRecord cdcRecord2 = new CdcRecord(RowKind.INSERT, hashMap2);
        runner.offer(cdcRecord2);
        Assertions.assertThat(runner.poll(1L)).isNull();
        SchemaManager schemaManager = new SchemaManager(createFileStoreTable.fileIO(), createFileStoreTable.location());
        schemaManager.commitChanges(new SchemaChange[]{SchemaChange.updateColumnType("v1", DataTypes.BIGINT())});
        Assertions.assertThat(runner.take()).isEqualTo(cdcRecord2);
        HashMap hashMap3 = new HashMap();
        hashMap3.put("k", "3");
        hashMap3.put("v1", "100");
        hashMap3.put("v2", "1.0000000000009095");
        CdcRecord cdcRecord3 = new CdcRecord(RowKind.INSERT, hashMap3);
        runner.offer(cdcRecord3);
        Assertions.assertThat(runner.poll(1L)).isNull();
        schemaManager.commitChanges(new SchemaChange[]{SchemaChange.updateColumnType("v2", DataTypes.DOUBLE())});
        Assertions.assertThat(runner.take()).isEqualTo(cdcRecord3);
        HashMap hashMap4 = new HashMap();
        hashMap4.put("k", "4");
        hashMap4.put("v1", "40");
        hashMap4.put("v3", "long four");
        CdcRecord cdcRecord4 = new CdcRecord(RowKind.INSERT, hashMap4);
        runner.offer(cdcRecord4);
        Assertions.assertThat(runner.poll(1L)).isNull();
        schemaManager.commitChanges(new SchemaChange[]{SchemaChange.updateColumnType("v3", DataTypes.VARCHAR(10))});
        Assertions.assertThat(runner.take()).isEqualTo(cdcRecord4);
        HashMap hashMap5 = new HashMap();
        hashMap5.put("k", "5");
        hashMap5.put("v1", "50");
        hashMap5.put("v4", "long five~");
        CdcRecord cdcRecord5 = new CdcRecord(RowKind.INSERT, hashMap5);
        runner.offer(cdcRecord5);
        Assertions.assertThat(runner.poll(1L)).isNull();
        schemaManager.commitChanges(new SchemaChange[]{SchemaChange.updateColumnType("v4", DataTypes.VARBINARY(10))});
        Assertions.assertThat(runner.take()).isEqualTo(cdcRecord5);
        runner.stop();
        thread.join();
        createTestHarness.close();
    }

    private OneInputStreamOperatorTestHarness<CdcRecord, Committable> createTestHarness(FileStoreTable fileStoreTable) throws Exception {
        CdcRecordStoreWriteOperator cdcRecordStoreWriteOperator = new CdcRecordStoreWriteOperator(fileStoreTable, (fileStoreTable2, str, storeSinkWriteState, iOManager, memorySegmentPool) -> {
            return new StoreSinkWriteImpl(fileStoreTable2, str, storeSinkWriteState, iOManager, false, false, true, memorySegmentPool);
        }, this.commitUser);
        JavaSerializer javaSerializer = new JavaSerializer();
        TypeSerializer createSerializer = new CommittableTypeInfo().createSerializer(new ExecutionConfig());
        OneInputStreamOperatorTestHarness<CdcRecord, Committable> oneInputStreamOperatorTestHarness = new OneInputStreamOperatorTestHarness<>(cdcRecordStoreWriteOperator, javaSerializer);
        oneInputStreamOperatorTestHarness.setup(createSerializer);
        return oneInputStreamOperatorTestHarness;
    }

    private FileStoreTable createFileStoreTable(RowType rowType, List<String> list, List<String> list2) throws Exception {
        Options options = new Options();
        options.set(CdcRecordStoreWriteOperator.RETRY_SLEEP_TIME, Duration.ofMillis(10L));
        return FileStoreTableFactory.create(LocalFileIO.create(), this.tablePath, SchemaUtils.forceCommit(new SchemaManager(LocalFileIO.create(), this.tablePath), new Schema(rowType.getFields(), list, list2, options.toMap(), "")));
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -842811170:
                if (implMethodName.equals("lambda$createTestHarness$43d749fa$1")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/paimon/flink/sink/StoreSinkWrite$Provider") && serializedLambda.getFunctionalInterfaceMethodName().equals("provide") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Lorg/apache/paimon/table/FileStoreTable;Ljava/lang/String;Lorg/apache/paimon/flink/sink/StoreSinkWriteState;Lorg/apache/flink/runtime/io/disk/iomanager/IOManager;Lorg/apache/paimon/memory/MemorySegmentPool;)Lorg/apache/paimon/flink/sink/StoreSinkWrite;") && serializedLambda.getImplClass().equals("org/apache/paimon/flink/sink/cdc/CdcRecordStoreWriteOperatorTest") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/paimon/table/FileStoreTable;Ljava/lang/String;Lorg/apache/paimon/flink/sink/StoreSinkWriteState;Lorg/apache/flink/runtime/io/disk/iomanager/IOManager;Lorg/apache/paimon/memory/MemorySegmentPool;)Lorg/apache/paimon/flink/sink/StoreSinkWrite;")) {
                    return (fileStoreTable2, str, storeSinkWriteState, iOManager, memorySegmentPool) -> {
                        return new StoreSinkWriteImpl(fileStoreTable2, str, storeSinkWriteState, iOManager, false, false, true, memorySegmentPool);
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
