package org.apache.paimon.flink.action.cdc.kafka;

import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ThreadLocalRandom;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.RowType;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;

/* loaded from: input_file:org/apache/paimon/flink/action/cdc/kafka/KafkaCanalSyncTableActionITCase.class */
public class KafkaCanalSyncTableActionITCase extends KafkaActionITCaseBase {
    @Timeout(60)
    @Test
    public void testSchemaEvolution() throws Exception {
        runSingleTableSchemaEvolution("schemaevolution");
    }

    @Timeout(60)
    @Test
    public void testSchemaEvolutionWithMissingDdl() throws Exception {
        runSingleTableSchemaEvolution("schemaevolutionmissingddl");
    }

    private void runSingleTableSchemaEvolution(String str) throws Exception {
        createTestTopic("schema_evolution", 1, 1);
        try {
            writeRecordsToKafka("schema_evolution", readLines(String.format("kafka/canal/table/%s/canal-data-1.txt", str)));
            Map<String, String> basicKafkaConfig = getBasicKafkaConfig();
            basicKafkaConfig.put("value.format", "canal-json");
            basicKafkaConfig.put("topic", "schema_evolution");
            StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
            executionEnvironment.setParallelism(2);
            executionEnvironment.enableCheckpointing(1000L);
            executionEnvironment.setRestartStrategy(RestartStrategies.noRestart());
            ThreadLocalRandom current = ThreadLocalRandom.current();
            HashMap hashMap = new HashMap();
            hashMap.put("bucket", String.valueOf(current.nextInt(3) + 1));
            hashMap.put("sink.parallelism", String.valueOf(current.nextInt(3) + 1));
            new KafkaSyncTableAction(basicKafkaConfig, this.warehouse, this.database, this.tableName, Collections.singletonList("pt"), Arrays.asList("pt", "_id"), Collections.emptyList(), Collections.emptyMap(), hashMap).build(executionEnvironment);
            waitJobRunning(executionEnvironment.executeAsync());
            testSchemaEvolutionImpl("schema_evolution", str);
        } catch (Exception e) {
            throw new Exception("Failed to write canal data to Kafka.", e);
        }
    }

    private void testSchemaEvolutionImpl(String str, String str2) throws Exception {
        FileStoreTable fileStoreTable = getFileStoreTable(this.tableName);
        RowType of = RowType.of(new DataType[]{DataTypes.INT().notNull(), DataTypes.INT().notNull(), DataTypes.VARCHAR(10)}, new String[]{"pt", "_id", "v1"});
        List<String> asList = Arrays.asList("pt", "_id");
        waitForResult(Arrays.asList("+I[1, 1, one]", "+I[1, 2, two]", "+I[2, 4, four]"), fileStoreTable, of, asList);
        try {
            writeRecordsToKafka(str, readLines(String.format("kafka/canal/table/%s/canal-data-2.txt", str2)));
            waitForResult(Arrays.asList("+I[1, 1, one, NULL]", "+I[1, 2, second, NULL]", "+I[2, 3, three, 30]", "+I[2, 4, four, NULL]", "+I[1, 5, five, 50]", "+I[1, 6, six, 60]"), fileStoreTable, RowType.of(new DataType[]{DataTypes.INT().notNull(), DataTypes.INT().notNull(), DataTypes.VARCHAR(10), DataTypes.INT()}, new String[]{"pt", "_id", "v1", "v2"}), asList);
            try {
                writeRecordsToKafka(str, readLines(String.format("kafka/canal/table/%s/canal-data-3.txt", str2)));
                waitForResult(Arrays.asList("+I[1, 1, one, NULL]", "+I[1, 2, second, NULL]", "+I[2, 3, three, 30000000000]", "+I[2, 4, four, NULL]", "+I[1, 6, six, 60]", "+I[2, 7, seven, 70000000000]", "+I[2, 8, eight, 80000000000]"), fileStoreTable, RowType.of(new DataType[]{DataTypes.INT().notNull(), DataTypes.INT().notNull(), DataTypes.VARCHAR(10), DataTypes.BIGINT()}, new String[]{"pt", "_id", "v1", "v2"}), asList);
                try {
                    writeRecordsToKafka(str, readLines(String.format("kafka/canal/table/%s/canal-data-4.txt", str2)));
                    waitForResult(Arrays.asList("+I[1, 1, one, NULL, NULL, NULL, NULL]", "+I[1, 2, second, NULL, NULL, NULL, NULL]", "+I[2, 3, three, 30000000000, NULL, NULL, NULL]", "+I[2, 4, four, NULL, NULL, NULL, NULL]", "+I[1, 6, six, 60, NULL, NULL, NULL]", "+I[2, 7, seven, 70000000000, NULL, NULL, NULL]", "+I[2, 8, very long string, 80000000000, NULL, NULL, NULL]", "+I[1, 9, nine, 90000000000, 99999.999, [110, 105, 110, 101, 46, 98, 105, 110], 9.9]"), fileStoreTable, RowType.of(new DataType[]{DataTypes.INT().notNull(), DataTypes.INT().notNull(), DataTypes.VARCHAR(20), DataTypes.BIGINT(), DataTypes.DECIMAL(8, 3), DataTypes.VARBINARY(10), DataTypes.FLOAT()}, new String[]{"pt", "_id", "v1", "v2", "v3", "v4", "v5"}), asList);
                    try {
                        writeRecordsToKafka(str, readLines(String.format("kafka/canal/table/%s/canal-data-5.txt", str2)));
                        waitForResult(Arrays.asList("+I[1, 1, one, NULL, NULL, NULL, NULL]", "+I[1, 2, second, NULL, NULL, NULL, NULL]", "+I[2, 3, three, 30000000000, NULL, NULL, NULL]", "+I[2, 4, four, NULL, NULL, [102, 111, 117, 114, 46, 98, 105, 110, 46, 108, 111, 110, 103], 4.00000000004]", "+I[1, 6, six, 60, NULL, NULL, NULL]", "+I[2, 7, seven, 70000000000, NULL, NULL, NULL]", "+I[2, 8, very long string, 80000000000, NULL, NULL, NULL]", "+I[1, 9, nine, 90000000000, 99999.999, [110, 105, 110, 101, 46, 98, 105, 110, 46, 108, 111, 110, 103], 9.00000000009]"), fileStoreTable, RowType.of(new DataType[]{DataTypes.INT().notNull(), DataTypes.INT().notNull(), DataTypes.VARCHAR(20), DataTypes.BIGINT(), DataTypes.DECIMAL(8, 3), DataTypes.VARBINARY(20), DataTypes.DOUBLE()}, new String[]{"pt", "_id", "v1", "v2", "v3", "v4", "v5"}), asList);
                    } catch (Exception e) {
                        throw new Exception("Failed to write canal data to Kafka.", e);
                    }
                } catch (Exception e2) {
                    throw new Exception("Failed to write canal data to Kafka.", e2);
                }
            } catch (Exception e3) {
                throw new Exception("Failed to write canal data to Kafka.", e3);
            }
        } catch (Exception e4) {
            throw new Exception("Failed to write canal data to Kafka.", e4);
        }
    }

    @Timeout(60)
    @Test
    public void testMultipleSchemaEvolutions() throws Exception {
        createTestTopic("schema_evolution_multiple", 1, 1);
        try {
            writeRecordsToKafka("schema_evolution_multiple", readLines("kafka/canal/table/schemaevolutionmultiple/canal-data-1.txt"));
            Map<String, String> basicKafkaConfig = getBasicKafkaConfig();
            basicKafkaConfig.put("value.format", "canal-json");
            basicKafkaConfig.put("topic", "schema_evolution_multiple");
            StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
            executionEnvironment.setParallelism(1);
            executionEnvironment.enableCheckpointing(1000L);
            executionEnvironment.setRestartStrategy(RestartStrategies.noRestart());
            new KafkaSyncTableAction(basicKafkaConfig, this.warehouse, this.database, this.tableName, Collections.emptyList(), Collections.singletonList("_id"), Collections.emptyList(), Collections.emptyMap(), Collections.emptyMap()).build(executionEnvironment);
            waitJobRunning(executionEnvironment.executeAsync());
            testSchemaEvolutionMultipleImpl("schema_evolution_multiple");
        } catch (Exception e) {
            throw new Exception("Failed to write canal data to Kafka.", e);
        }
    }

    private void testSchemaEvolutionMultipleImpl(String str) throws Exception {
        FileStoreTable fileStoreTable = getFileStoreTable(this.tableName);
        RowType of = RowType.of(new DataType[]{DataTypes.INT().notNull(), DataTypes.VARCHAR(10), DataTypes.INT(), DataTypes.VARCHAR(10)}, new String[]{"_id", "v1", "v2", "v3"});
        List<String> singletonList = Collections.singletonList("_id");
        waitForResult(Collections.singletonList("+I[1, one, 10, string_1]"), fileStoreTable, of, singletonList);
        try {
            writeRecordsToKafka(str, readLines("kafka/canal/table/schemaevolutionmultiple/canal-data-2.txt"));
            waitForResult(Arrays.asList("+I[1, one, 10, string_1, NULL, NULL, NULL, NULL]", "+I[2, long_string_two, 2000000000000, string_2, 20, 20.5, 20.002, test_2]"), fileStoreTable, RowType.of(new DataType[]{DataTypes.INT().notNull(), DataTypes.VARCHAR(20), DataTypes.BIGINT(), DataTypes.VARCHAR(10), DataTypes.INT(), DataTypes.DOUBLE(), DataTypes.DECIMAL(5, 3), DataTypes.VARCHAR(10)}, new String[]{"_id", "v1", "v2", "v3", "v4", "v5", "v6", "$% ^,& *("}), singletonList);
        } catch (Exception e) {
            throw new Exception("Failed to write canal data to Kafka.", e);
        }
    }

    @Timeout(60)
    @Test
    public void testAllTypes() throws Exception {
        for (int i = 0; i < 2; i++) {
            testAllTypesOnce();
            Thread.sleep(3000L);
        }
    }

    private void testAllTypesOnce() throws Exception {
        String str = "all_type" + UUID.randomUUID();
        createTestTopic(str, 1, 1);
        try {
            writeRecordsToKafka(str, readLines("kafka/canal/table/alltype/canal-data.txt"));
            Map<String, String> basicKafkaConfig = getBasicKafkaConfig();
            basicKafkaConfig.put("value.format", "canal-json");
            basicKafkaConfig.put("topic", str);
            StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
            executionEnvironment.setParallelism(1);
            executionEnvironment.enableCheckpointing(1000L);
            executionEnvironment.setRestartStrategy(RestartStrategies.noRestart());
            new KafkaSyncTableAction(basicKafkaConfig, this.warehouse, this.database, this.tableName, Collections.singletonList("pt"), Arrays.asList("pt", "_id"), Collections.emptyMap(), Collections.emptyMap()).build(executionEnvironment);
            JobClient executeAsync = executionEnvironment.executeAsync();
            waitJobRunning(executeAsync);
            testAllTypesImpl();
            executeAsync.cancel().get();
        } catch (Exception e) {
            throw new Exception("Failed to write canal data to Kafka.", e);
        }
    }

    private void testAllTypesImpl() throws Exception {
        RowType of = RowType.of(new DataType[]{DataTypes.INT().notNull(), DataTypes.DECIMAL(2, 1).notNull(), DataTypes.BOOLEAN(), DataTypes.BOOLEAN(), DataTypes.BOOLEAN(), DataTypes.TINYINT(), DataTypes.SMALLINT(), DataTypes.SMALLINT(), DataTypes.SMALLINT(), DataTypes.INT(), DataTypes.INT(), DataTypes.INT(), DataTypes.BIGINT(), DataTypes.BIGINT(), DataTypes.INT(), DataTypes.BIGINT(), DataTypes.BIGINT(), DataTypes.BIGINT(), DataTypes.DECIMAL(20, 0), DataTypes.DECIMAL(20, 0), DataTypes.DECIMAL(20, 0), DataTypes.FLOAT(), DataTypes.FLOAT(), DataTypes.FLOAT(), DataTypes.DOUBLE(), DataTypes.DOUBLE(), DataTypes.DOUBLE(), DataTypes.DOUBLE(), DataTypes.DOUBLE(), DataTypes.DOUBLE(), DataTypes.DOUBLE(), DataTypes.DOUBLE(), DataTypes.DOUBLE(), DataTypes.DECIMAL(8, 3), DataTypes.DECIMAL(8, 3), DataTypes.DECIMAL(8, 3), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.DECIMAL(8, 0), DataTypes.DECIMAL(8, 0), DataTypes.DECIMAL(8, 0), DataTypes.DATE(), DataTypes.TIMESTAMP(0), DataTypes.TIMESTAMP(3), DataTypes.TIMESTAMP(6), DataTypes.TIMESTAMP(0), DataTypes.TIMESTAMP(2), DataTypes.TIMESTAMP(6), DataTypes.TIMESTAMP(0), DataTypes.CHAR(10), DataTypes.VARCHAR(20), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.VARBINARY(10), DataTypes.VARBINARY(20), DataTypes.BYTES(), DataTypes.BYTES(), DataTypes.BYTES(), DataTypes.BYTES(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.INT(), DataTypes.TIME(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.ARRAY(DataTypes.STRING())}, new String[]{"_id", "pt", "_tinyint1", "_boolean", "_bool", "_tinyint", "_tinyint_unsigned", "_tinyint_unsigned_zerofill", "_smallint", "_smallint_unsigned", "_smallint_unsigned_zerofill", "_mediumint", "_mediumint_unsigned", "_mediumint_unsigned_zerofill", "_int", "_int_unsigned", "_int_unsigned_zerofill", "_bigint", "_bigint_unsigned", "_bigint_unsigned_zerofill", "_serial", "_float", "_float_unsigned", "_float_unsigned_zerofill", "_real", "_real_unsigned", "_real_unsigned_zerofill", "_double", "_double_unsigned", "_double_unsigned_zerofill", "_double_precision", "_double_precision_unsigned", "_double_precision_unsigned_zerofill", "_numeric", "_numeric_unsigned", "_numeric_unsigned_zerofill", "_fixed", "_fixed_unsigned", "_fixed_unsigned_zerofill", "_decimal", "_decimal_unsigned", "_decimal_unsigned_zerofill", "_date", "_datetime", "_datetime3", "_datetime6", "_datetime_p", "_datetime_p2", "_timestamp", "_timestamp0", "_char", "_varchar", "_tinytext", "_text", "_mediumtext", "_longtext", "_bin", "_varbin", "_tinyblob", "_blob", "_mediumblob", "_longblob", "_json", "_enum", "_year", "_time", "_point", "_geometry", "_linestring", "_polygon", "_multipoint", "_multiline", "_multipolygon", "_geometrycollection", "_set"});
        waitForResult(Arrays.asList("+I[1, 1.1, true, true, false, 1, 2, 3, 1000, 2000, 3000, 100000, 200000, 300000, 1000000, 2000000, 3000000, 10000000000, 20000000000, 30000000000, 40000000000, 1.5, 2.5, 3.5, 1.000001, 2.000002, 3.000003, 1.000011, 2.000022, 3.000033, 1.000111, 2.000222, 3.000333, 12345.110, 12345.220, 12345.330, 123456789876543212345678987654321.110, 123456789876543212345678987654321.220, 123456789876543212345678987654321.330, 11111, 22222, 33333, 19439, 2023-03-23T14:30:05, 2023-03-23T14:30:05.123, 2023-03-23T14:30:05.123456, 2023-03-24T14:30, 2023-03-24T14:30:05.120, 2023-03-23T15:00:10.123456, 2023-03-23T00:10, Paimon, Apache Paimon, Apache Paimon MySQL TINYTEXT Test Data, Apache Paimon MySQL Test Data, Apache Paimon MySQL MEDIUMTEXT Test Data, Apache Paimon MySQL Long Test Data, [98, 121, 116, 101, 115], [109, 111, 114, 101, 32, 98, 121, 116, 101, 115], [84, 73, 78, 89, 66, 76, 79, 66, 32, 116, 121, 112, 101, 32, 116, 101, 115, 116, 32, 100, 97, 116, 97], [66, 76, 79, 66, 32, 116, 121, 112, 101, 32, 116, 101, 115, 116, 32, 100, 97, 116, 97], [77, 69, 68, 73, 85, 77, 66, 76, 79, 66, 32, 116, 121, 112, 101, 32, 116, 101, 115, 116, 32, 100, 97, 116, 97], [76, 79, 78, 71, 66, 76, 79, 66, 32, 32, 98, 121, 116, 101, 115, 32, 116, 101, 115, 116, 32, 100, 97, 116, 97], {\"a\": \"b\"}, value1, 2023, 36803000, {\"coordinates\":[1,1],\"type\":\"Point\",\"srid\":0}, {\"coordinates\":[[[1,1],[2,1],[2,2],[1,2],[1,1]]],\"type\":\"Polygon\",\"srid\":0}, {\"coordinates\":[[3,0],[3,3],[3,5]],\"type\":\"LineString\",\"srid\":0}, {\"coordinates\":[[[1,1],[2,1],[2,2],[1,2],[1,1]]],\"type\":\"Polygon\",\"srid\":0}, {\"coordinates\":[[1,1],[2,2]],\"type\":\"MultiPoint\",\"srid\":0}, {\"coordinates\":[[[1,1],[2,2],[3,3]],[[4,4],[5,5]]],\"type\":\"MultiLineString\",\"srid\":0}, {\"coordinates\":[[[[0,0],[10,0],[10,10],[0,10],[0,0]]],[[[5,5],[7,5],[7,7],[5,7],[5,5]]]],\"type\":\"MultiPolygon\",\"srid\":0}, {\"geometries\":[{\"type\":\"Point\",\"coordinates\":[10,10]},{\"type\":\"Point\",\"coordinates\":[30,30]},{\"type\":\"LineString\",\"coordinates\":[[15,15],[20,20]]}],\"type\":\"GeometryCollection\",\"srid\":0}, [a, b]]", "+I[2, 2.2, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, 50000000000, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL]"), getFileStoreTable(this.tableName), of, Arrays.asList("pt", "_id"));
    }

    @Timeout(60)
    @Test
    public void testNotSupportFormat() throws Exception {
        createTestTopic("not_support", 1, 1);
        try {
            writeRecordsToKafka("not_support", readLines("kafka/canal/table/schemaevolution/canal-data-1.txt"));
            Map<String, String> basicKafkaConfig = getBasicKafkaConfig();
            basicKafkaConfig.put("value.format", "togg-json");
            basicKafkaConfig.put("topic", "not_support");
            StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
            executionEnvironment.setParallelism(2);
            executionEnvironment.enableCheckpointing(1000L);
            executionEnvironment.setRestartStrategy(RestartStrategies.noRestart());
            ThreadLocalRandom current = ThreadLocalRandom.current();
            HashMap hashMap = new HashMap();
            hashMap.put("bucket", String.valueOf(current.nextInt(3) + 1));
            hashMap.put("sink.parallelism", String.valueOf(current.nextInt(3) + 1));
            KafkaSyncTableAction kafkaSyncTableAction = new KafkaSyncTableAction(basicKafkaConfig, this.warehouse, this.database, this.tableName, Collections.singletonList("pt"), Arrays.asList("pt", "_id"), Collections.emptyList(), Collections.emptyMap(), hashMap);
            Assertions.assertThatThrownBy(() -> {
                kafkaSyncTableAction.build(executionEnvironment);
            }).isInstanceOf(UnsupportedOperationException.class).hasMessage("This format: togg-json is not supported.");
        } catch (Exception e) {
            throw new Exception("Failed to write canal data to Kafka.", e);
        }
    }

    @Timeout(120)
    @Test
    public void testKafkaNoNonDdlData() throws Exception {
        createTestTopic("no_non_ddl_data", 1, 1);
        try {
            writeRecordsToKafka("no_non_ddl_data", readLines("kafka/canal/table/nononddldata/canal-data-1.txt"));
            Map<String, String> basicKafkaConfig = getBasicKafkaConfig();
            basicKafkaConfig.put("value.format", "canal-json");
            basicKafkaConfig.put("topic", "no_non_ddl_data");
            StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
            executionEnvironment.setParallelism(2);
            executionEnvironment.enableCheckpointing(1000L);
            executionEnvironment.setRestartStrategy(RestartStrategies.noRestart());
            ThreadLocalRandom current = ThreadLocalRandom.current();
            HashMap hashMap = new HashMap();
            hashMap.put("bucket", String.valueOf(current.nextInt(3) + 1));
            hashMap.put("sink.parallelism", String.valueOf(current.nextInt(3) + 1));
            KafkaSyncTableAction kafkaSyncTableAction = new KafkaSyncTableAction(basicKafkaConfig, this.warehouse, this.database, this.tableName, Collections.singletonList("pt"), Arrays.asList("pt", "_id"), Collections.emptyList(), Collections.emptyMap(), hashMap);
            Assertions.assertThatThrownBy(() -> {
                kafkaSyncTableAction.build(executionEnvironment);
            }).isInstanceOf(Exception.class).hasMessage("Could not get metadata from server, topic: no_non_ddl_data");
        } catch (Exception e) {
            throw new Exception("Failed to write canal data to Kafka.", e);
        }
    }

    @Timeout(60)
    @Test
    public void testAssertSchemaCompatible() throws Exception {
        createTestTopic("assert_schema_compatible", 1, 1);
        try {
            writeRecordsToKafka("assert_schema_compatible", readLines("kafka/canal/table/schemaevolution/canal-data-1.txt"));
            Map<String, String> basicKafkaConfig = getBasicKafkaConfig();
            basicKafkaConfig.put("value.format", "canal-json");
            basicKafkaConfig.put("topic", "assert_schema_compatible");
            Catalog catalog = catalog();
            catalog.createDatabase(this.database, true);
            catalog.createTable(Identifier.create(this.database, this.tableName), Schema.newBuilder().column("k", DataTypes.STRING()).column("v1", DataTypes.STRING()).primaryKey(new String[]{"k"}).build(), false);
            StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
            executionEnvironment.setParallelism(2);
            executionEnvironment.enableCheckpointing(1000L);
            executionEnvironment.setRestartStrategy(RestartStrategies.noRestart());
            ThreadLocalRandom current = ThreadLocalRandom.current();
            HashMap hashMap = new HashMap();
            hashMap.put("bucket", String.valueOf(current.nextInt(3) + 1));
            hashMap.put("sink.parallelism", String.valueOf(current.nextInt(3) + 1));
            KafkaSyncTableAction kafkaSyncTableAction = new KafkaSyncTableAction(basicKafkaConfig, this.warehouse, this.database, this.tableName, Collections.singletonList("pt"), Arrays.asList("pt", "_id"), Collections.emptyList(), Collections.emptyMap(), hashMap);
            Assertions.assertThatThrownBy(() -> {
                kafkaSyncTableAction.build(executionEnvironment);
            }).isInstanceOf(IllegalArgumentException.class).hasMessage("Paimon schema and Kafka schema are not compatible.\nPaimon fields are: [`k` STRING NOT NULL, `v1` STRING].\nKafka fields are: [`pt` INT NOT NULL, `_id` INT NOT NULL, `v1` VARCHAR(10)]");
        } catch (Exception e) {
            throw new Exception("Failed to write canal data to Kafka.", e);
        }
    }

    @Timeout(60)
    @Test
    public void testStarUpOptionSpecific() throws Exception {
        createTestTopic("start_up_specific", 1, 1);
        try {
            writeRecordsToKafka("start_up_specific", readLines("kafka/canal/table/startupmode/canal-data-1.txt"));
            Map<String, String> basicKafkaConfig = getBasicKafkaConfig();
            basicKafkaConfig.put("value.format", "canal-json");
            basicKafkaConfig.put("topic", "start_up_specific");
            basicKafkaConfig.put("scan.startup.mode", "specific-offsets");
            basicKafkaConfig.put("scan.startup.specific-offsets", "partition:0,offset:1");
            StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
            executionEnvironment.setParallelism(2);
            executionEnvironment.enableCheckpointing(1000L);
            executionEnvironment.setRestartStrategy(RestartStrategies.noRestart());
            ThreadLocalRandom current = ThreadLocalRandom.current();
            HashMap hashMap = new HashMap();
            hashMap.put("bucket", String.valueOf(current.nextInt(3) + 1));
            hashMap.put("sink.parallelism", String.valueOf(current.nextInt(3) + 1));
            new KafkaSyncTableAction(basicKafkaConfig, this.warehouse, this.database, this.tableName, Collections.singletonList("pt"), Arrays.asList("pt", "_id"), Collections.emptyList(), Collections.emptyMap(), hashMap).build(executionEnvironment);
            waitJobRunning(executionEnvironment.executeAsync());
            waitForResult(Collections.singletonList("+I[1, 2, two]"), getFileStoreTable(this.tableName), RowType.of(new DataType[]{DataTypes.INT().notNull(), DataTypes.INT().notNull(), DataTypes.VARCHAR(10)}, new String[]{"pt", "_id", "v1"}), Arrays.asList("pt", "_id"));
        } catch (Exception e) {
            throw new Exception("Failed to write canal data to Kafka.", e);
        }
    }

    @Timeout(60)
    @Test
    public void testStarUpOptionLatest() throws Exception {
        createTestTopic("start_up_latest", 1, 1);
        try {
            writeRecordsToKafka("start_up_latest", readLines("kafka/canal/table/startupmode/canal-data-1.txt"));
            Map<String, String> basicKafkaConfig = getBasicKafkaConfig();
            basicKafkaConfig.put("value.format", "canal-json");
            basicKafkaConfig.put("topic", "start_up_latest");
            basicKafkaConfig.put("scan.startup.mode", "latest-offset");
            StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
            executionEnvironment.setParallelism(2);
            executionEnvironment.enableCheckpointing(1000L);
            executionEnvironment.setRestartStrategy(RestartStrategies.noRestart());
            ThreadLocalRandom current = ThreadLocalRandom.current();
            HashMap hashMap = new HashMap();
            hashMap.put("bucket", String.valueOf(current.nextInt(3) + 1));
            hashMap.put("sink.parallelism", String.valueOf(current.nextInt(3) + 1));
            new KafkaSyncTableAction(basicKafkaConfig, this.warehouse, this.database, this.tableName, Collections.singletonList("pt"), Arrays.asList("pt", "_id"), Collections.emptyList(), Collections.emptyMap(), hashMap).build(executionEnvironment);
            waitJobRunning(executionEnvironment.executeAsync());
            try {
                writeRecordsToKafka("start_up_latest", readLines("kafka/canal/table/startupmode/canal-data-2.txt"));
                waitForResult(Arrays.asList("+I[1, 3, three]", "+I[1, 4, four]"), getFileStoreTable(this.tableName), RowType.of(new DataType[]{DataTypes.INT().notNull(), DataTypes.INT().notNull(), DataTypes.VARCHAR(10)}, new String[]{"pt", "_id", "v1"}), Arrays.asList("pt", "_id"));
            } catch (Exception e) {
                throw new Exception("Failed to write canal data to Kafka.", e);
            }
        } catch (Exception e2) {
            throw new Exception("Failed to write canal data to Kafka.", e2);
        }
    }

    @Timeout(60)
    @Test
    public void testStarUpOptionTimestamp() throws Exception {
        createTestTopic("start_up_timestamp", 1, 1);
        try {
            writeRecordsToKafka("start_up_timestamp", readLines("kafka/canal/table/startupmode/canal-data-1.txt"));
            Map<String, String> basicKafkaConfig = getBasicKafkaConfig();
            basicKafkaConfig.put("value.format", "canal-json");
            basicKafkaConfig.put("topic", "start_up_timestamp");
            basicKafkaConfig.put("scan.startup.mode", "timestamp");
            basicKafkaConfig.put("scan.startup.timestamp-millis", String.valueOf(System.currentTimeMillis()));
            StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
            executionEnvironment.setParallelism(2);
            executionEnvironment.enableCheckpointing(1000L);
            executionEnvironment.setRestartStrategy(RestartStrategies.noRestart());
            ThreadLocalRandom current = ThreadLocalRandom.current();
            HashMap hashMap = new HashMap();
            hashMap.put("bucket", String.valueOf(current.nextInt(3) + 1));
            hashMap.put("sink.parallelism", String.valueOf(current.nextInt(3) + 1));
            new KafkaSyncTableAction(basicKafkaConfig, this.warehouse, this.database, this.tableName, Collections.singletonList("pt"), Arrays.asList("pt", "_id"), Collections.emptyList(), Collections.emptyMap(), hashMap).build(executionEnvironment);
            waitJobRunning(executionEnvironment.executeAsync());
            try {
                writeRecordsToKafka("start_up_timestamp", readLines("kafka/canal/table/startupmode/canal-data-2.txt"));
                waitForResult(Arrays.asList("+I[1, 3, three]", "+I[1, 4, four]"), getFileStoreTable(this.tableName), RowType.of(new DataType[]{DataTypes.INT().notNull(), DataTypes.INT().notNull(), DataTypes.VARCHAR(10)}, new String[]{"pt", "_id", "v1"}), Arrays.asList("pt", "_id"));
            } catch (Exception e) {
                throw new Exception("Failed to write canal data to Kafka.", e);
            }
        } catch (Exception e2) {
            throw new Exception("Failed to write canal data to Kafka.", e2);
        }
    }

    @Timeout(60)
    @Test
    public void testStarUpOptionEarliest() throws Exception {
        createTestTopic("start_up_earliest", 1, 1);
        try {
            writeRecordsToKafka("start_up_earliest", readLines("kafka/canal/table/startupmode/canal-data-1.txt"));
            Map<String, String> basicKafkaConfig = getBasicKafkaConfig();
            basicKafkaConfig.put("value.format", "canal-json");
            basicKafkaConfig.put("topic", "start_up_earliest");
            basicKafkaConfig.put("scan.startup.mode", "earliest-offset");
            StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
            executionEnvironment.setParallelism(2);
            executionEnvironment.enableCheckpointing(1000L);
            executionEnvironment.setRestartStrategy(RestartStrategies.noRestart());
            ThreadLocalRandom current = ThreadLocalRandom.current();
            HashMap hashMap = new HashMap();
            hashMap.put("bucket", String.valueOf(current.nextInt(3) + 1));
            hashMap.put("sink.parallelism", String.valueOf(current.nextInt(3) + 1));
            new KafkaSyncTableAction(basicKafkaConfig, this.warehouse, this.database, this.tableName, Collections.singletonList("pt"), Arrays.asList("pt", "_id"), Collections.emptyList(), Collections.emptyMap(), hashMap).build(executionEnvironment);
            waitJobRunning(executionEnvironment.executeAsync());
            try {
                writeRecordsToKafka("start_up_earliest", readLines("kafka/canal/table/startupmode/canal-data-2.txt"));
                waitForResult(Arrays.asList("+I[1, 1, one]", "+I[1, 2, two]", "+I[1, 3, three]", "+I[1, 4, four]"), getFileStoreTable(this.tableName), RowType.of(new DataType[]{DataTypes.INT().notNull(), DataTypes.INT().notNull(), DataTypes.VARCHAR(10)}, new String[]{"pt", "_id", "v1"}), Arrays.asList("pt", "_id"));
            } catch (Exception e) {
                throw new Exception("Failed to write canal data to Kafka.", e);
            }
        } catch (Exception e2) {
            throw new Exception("Failed to write canal data to Kafka.", e2);
        }
    }

    @Timeout(60)
    @Test
    public void testStarUpOptionGroup() throws Exception {
        createTestTopic("start_up_group", 1, 1);
        try {
            writeRecordsToKafka("start_up_group", readLines("kafka/canal/table/startupmode/canal-data-1.txt"));
            Map<String, String> basicKafkaConfig = getBasicKafkaConfig();
            basicKafkaConfig.put("value.format", "canal-json");
            basicKafkaConfig.put("topic", "start_up_group");
            basicKafkaConfig.put("scan.startup.mode", "group-offsets");
            StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
            executionEnvironment.setParallelism(2);
            executionEnvironment.enableCheckpointing(1000L);
            executionEnvironment.setRestartStrategy(RestartStrategies.noRestart());
            ThreadLocalRandom current = ThreadLocalRandom.current();
            HashMap hashMap = new HashMap();
            hashMap.put("bucket", String.valueOf(current.nextInt(3) + 1));
            hashMap.put("sink.parallelism", String.valueOf(current.nextInt(3) + 1));
            new KafkaSyncTableAction(basicKafkaConfig, this.warehouse, this.database, this.tableName, Collections.singletonList("pt"), Arrays.asList("pt", "_id"), Collections.emptyList(), Collections.emptyMap(), hashMap).build(executionEnvironment);
            waitJobRunning(executionEnvironment.executeAsync());
            try {
                writeRecordsToKafka("start_up_group", readLines("kafka/canal/table/startupmode/canal-data-2.txt"));
                waitForResult(Arrays.asList("+I[1, 1, one]", "+I[1, 2, two]", "+I[1, 3, three]", "+I[1, 4, four]"), getFileStoreTable(this.tableName), RowType.of(new DataType[]{DataTypes.INT().notNull(), DataTypes.INT().notNull(), DataTypes.VARCHAR(10)}, new String[]{"pt", "_id", "v1"}), Arrays.asList("pt", "_id"));
            } catch (Exception e) {
                throw new Exception("Failed to write canal data to Kafka.", e);
            }
        } catch (Exception e2) {
            throw new Exception("Failed to write canal data to Kafka.", e2);
        }
    }

    @Timeout(60)
    @Test
    public void testComputedColumn() throws Exception {
        createTestTopic("computed_column", 1, 1);
        try {
            writeRecordsToKafka("computed_column", readLines("kafka.canal/table/computedcolumn/canal-data-1.txt"));
            Map<String, String> basicKafkaConfig = getBasicKafkaConfig();
            basicKafkaConfig.put("value.format", "canal-json");
            basicKafkaConfig.put("topic", "computed_column");
            StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
            executionEnvironment.setParallelism(2);
            executionEnvironment.enableCheckpointing(1000L);
            executionEnvironment.setRestartStrategy(RestartStrategies.noRestart());
            ThreadLocalRandom current = ThreadLocalRandom.current();
            HashMap hashMap = new HashMap();
            hashMap.put("bucket", String.valueOf(current.nextInt(3) + 1));
            hashMap.put("sink.parallelism", String.valueOf(current.nextInt(3) + 1));
            new KafkaSyncTableAction(basicKafkaConfig, this.warehouse, this.database, this.tableName, Collections.singletonList("_year"), Arrays.asList("_id", "_year"), Collections.singletonList("_year=year(_date)"), Collections.emptyMap(), hashMap).build(executionEnvironment);
            waitJobRunning(executionEnvironment.executeAsync());
            waitForResult(Collections.singletonList("+I[1, 19439, 2023]"), getFileStoreTable(this.tableName), RowType.of(new DataType[]{DataTypes.INT().notNull(), DataTypes.DATE(), DataTypes.INT().notNull()}, new String[]{"_id", "_date", "_year"}), Arrays.asList("_id", "_year"));
        } catch (Exception e) {
            throw new Exception("Failed to write canal data to Kafka.", e);
        }
    }
}
