package org.apache.paimon.flink.action.cdc.mysql;

import java.nio.file.Path;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ThreadLocalRandom;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.core.execution.SavepointFormatType;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.graph.StreamGraph;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.flink.action.cdc.DatabaseSyncMode;
import org.apache.paimon.options.CatalogOptions;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.JsonSerdeUtil;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.api.io.TempDir;

/* loaded from: input_file:org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseActionITCase.class */
public class MySqlSyncDatabaseActionITCase extends MySqlActionITCaseBase {

    @TempDir
    Path tempDir;

    /* loaded from: input_file:org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseActionITCase$SyncNewTableJob.class */
    private class SyncNewTableJob implements Runnable {
        private final int ith;
        private final Statement statement;
        private final List<Tuple2<Integer, String>> records;

        SyncNewTableJob(int i, Statement statement, List<Tuple2<Integer, String>> list) {
            this.ith = i;
            this.statement = statement;
            this.records = list;
        }

        @Override // java.lang.Runnable
        public void run() {
            String str = "t" + this.ith;
            try {
                MySqlSyncDatabaseActionITCase.this.createNewTable(this.statement, str);
                this.statement.executeUpdate(String.format("INSERT INTO %s VALUES %s", str, this.records.stream().map(tuple2 -> {
                    return String.format("(%d, '%s')", tuple2.f0, tuple2.f1);
                }).collect(Collectors.joining(", "))));
            } catch (SQLException e) {
                throw new RuntimeException(e);
            }
        }
    }

    @BeforeAll
    public static void startContainers() {
        MYSQL_CONTAINER.withSetupSQL("mysql/sync_database_setup.sql");
        start();
    }

    @Timeout(60)
    @Test
    public void testSchemaEvolution() throws Exception {
        Map<String, String> basicMySqlConfig = getBasicMySqlConfig();
        basicMySqlConfig.put("database-name", "paimon_sync_database");
        runActionWithDefaultEnv(new MySqlSyncDatabaseAction(this.warehouse, this.database, basicMySqlConfig).withTableConfig(getBasicTableConfig()));
        Statement statement = getStatement();
        Throwable th = null;
        try {
            try {
                testSchemaEvolutionImpl(statement);
                if (statement != null) {
                    if (0 == 0) {
                        statement.close();
                        return;
                    }
                    try {
                        statement.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (statement != null) {
                if (th != null) {
                    try {
                        statement.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    statement.close();
                }
            }
            throw th4;
        }
    }

    private void testSchemaEvolutionImpl(Statement statement) throws Exception {
        FileStoreTable fileStoreTable = getFileStoreTable("t1");
        FileStoreTable fileStoreTable2 = getFileStoreTable("t2");
        statement.executeUpdate("USE paimon_sync_database");
        statement.executeUpdate("INSERT INTO t1 VALUES (1, 'one')");
        statement.executeUpdate("INSERT INTO t2 VALUES (2, 'two', 20, 200)");
        statement.executeUpdate("INSERT INTO t1 VALUES (3, 'three')");
        statement.executeUpdate("INSERT INTO t2 VALUES (4, 'four', 40, 400)");
        statement.executeUpdate("INSERT INTO t3 VALUES (-1)");
        RowType of = RowType.of(new DataType[]{DataTypes.INT().notNull(), DataTypes.VARCHAR(10)}, new String[]{"k", "v1"});
        List<String> singletonList = Collections.singletonList("k");
        waitForResult(Arrays.asList("+I[1, one]", "+I[3, three]"), fileStoreTable, of, singletonList);
        RowType of2 = RowType.of(new DataType[]{DataTypes.INT().notNull(), DataTypes.VARCHAR(10).notNull(), DataTypes.INT(), DataTypes.BIGINT()}, new String[]{"k1", "k2", "v1", "v2"});
        List<String> asList = Arrays.asList("k1", "k2");
        waitForResult(Arrays.asList("+I[2, two, 20, 200]", "+I[4, four, 40, 400]"), fileStoreTable2, of2, asList);
        statement.executeUpdate("ALTER TABLE t1 ADD COLUMN v2 INT");
        statement.executeUpdate("INSERT INTO t1 VALUES (5, 'five', 50)");
        statement.executeUpdate("ALTER TABLE t2 ADD COLUMN v3 VARCHAR(10)");
        statement.executeUpdate("INSERT INTO t2 VALUES (6, 'six', 60, 600, 'string_6')");
        statement.executeUpdate("INSERT INTO t1 VALUES (7, 'seven', 70)");
        statement.executeUpdate("INSERT INTO t2 VALUES (8, 'eight', 80, 800, 'string_8')");
        waitForResult(Arrays.asList("+I[1, one, NULL]", "+I[3, three, NULL]", "+I[5, five, 50]", "+I[7, seven, 70]"), fileStoreTable, RowType.of(new DataType[]{DataTypes.INT().notNull(), DataTypes.VARCHAR(10), DataTypes.INT()}, new String[]{"k", "v1", "v2"}), singletonList);
        waitForResult(Arrays.asList("+I[2, two, 20, 200, NULL]", "+I[4, four, 40, 400, NULL]", "+I[6, six, 60, 600, string_6]", "+I[8, eight, 80, 800, string_8]"), fileStoreTable2, RowType.of(new DataType[]{DataTypes.INT().notNull(), DataTypes.VARCHAR(10).notNull(), DataTypes.INT(), DataTypes.BIGINT(), DataTypes.VARCHAR(10)}, new String[]{"k1", "k2", "v1", "v2", "v3"}), asList);
        statement.executeUpdate("ALTER TABLE t1 MODIFY COLUMN v2 BIGINT");
        statement.executeUpdate("INSERT INTO t1 VALUES (9, 'nine', 9000000000000)");
        statement.executeUpdate("ALTER TABLE t2 MODIFY COLUMN v3 VARCHAR(20)");
        statement.executeUpdate("INSERT INTO t2 VALUES (10, 'ten', 100, 1000, 'long_long_string_10')");
        waitForResult(Arrays.asList("+I[1, one, NULL]", "+I[3, three, NULL]", "+I[5, five, 50]", "+I[7, seven, 70]", "+I[9, nine, 9000000000000]"), fileStoreTable, RowType.of(new DataType[]{DataTypes.INT().notNull(), DataTypes.VARCHAR(10), DataTypes.BIGINT()}, new String[]{"k", "v1", "v2"}), singletonList);
        waitForResult(Arrays.asList("+I[2, two, 20, 200, NULL]", "+I[4, four, 40, 400, NULL]", "+I[6, six, 60, 600, string_6]", "+I[8, eight, 80, 800, string_8]", "+I[10, ten, 100, 1000, long_long_string_10]"), fileStoreTable2, RowType.of(new DataType[]{DataTypes.INT().notNull(), DataTypes.VARCHAR(10).notNull(), DataTypes.INT(), DataTypes.BIGINT(), DataTypes.VARCHAR(20)}, new String[]{"k1", "k2", "v1", "v2", "v3"}), asList);
    }

    @Test
    public void testSpecifiedMySqlTable() {
        Map<String, String> basicMySqlConfig = getBasicMySqlConfig();
        basicMySqlConfig.put("database-name", "paimon_sync_database");
        basicMySqlConfig.put("table-name", "my_table");
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        MySqlSyncDatabaseAction mySqlSyncDatabaseAction = new MySqlSyncDatabaseAction(this.warehouse, this.database, basicMySqlConfig);
        Assertions.assertThatThrownBy(() -> {
            mySqlSyncDatabaseAction.build(executionEnvironment);
        }).isInstanceOf(IllegalArgumentException.class).hasMessage("table-name cannot be set for mysql-sync-database. If you want to sync several MySQL tables into one Paimon table, use mysql-sync-table instead.");
    }

    @Test
    public void testInvalidDatabase() {
        Map<String, String> basicMySqlConfig = getBasicMySqlConfig();
        basicMySqlConfig.put("database-name", "invalid");
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        MySqlSyncDatabaseAction mySqlSyncDatabaseAction = new MySqlSyncDatabaseAction(this.warehouse, this.database, basicMySqlConfig);
        Assertions.assertThatThrownBy(() -> {
            mySqlSyncDatabaseAction.build(executionEnvironment);
        }).isInstanceOf(IllegalArgumentException.class).hasMessage("No tables found in MySQL database invalid, or MySQL database does not exist.");
    }

    @Timeout(60)
    @Test
    public void testIgnoreIncompatibleTables() throws Exception {
        Catalog catalog = catalog();
        catalog.createDatabase(this.database, true);
        catalog.createTable(Identifier.create(this.database, "incompatible"), Schema.newBuilder().column("k", DataTypes.STRING()).column("v1", DataTypes.STRING()).primaryKey(new String[]{"k"}).build(), false);
        Map<String, String> basicMySqlConfig = getBasicMySqlConfig();
        basicMySqlConfig.put("database-name", "paimon_sync_database_ignore_incompatible");
        runActionWithDefaultEnv(new MySqlSyncDatabaseAction(this.warehouse, this.database, basicMySqlConfig).withTableConfig(getBasicTableConfig()).ignoreIncompatible(true));
        Statement statement = getStatement();
        Throwable th = null;
        try {
            FileStoreTable fileStoreTable = getFileStoreTable("compatible");
            statement.executeUpdate("USE paimon_sync_database_ignore_incompatible");
            statement.executeUpdate("INSERT INTO compatible VALUES (2, 'two', 20, 200)");
            statement.executeUpdate("INSERT INTO compatible VALUES (4, 'four', 40, 400)");
            waitForResult(Arrays.asList("+I[2, two, 20, 200]", "+I[4, four, 40, 400]"), fileStoreTable, RowType.of(new DataType[]{DataTypes.INT().notNull(), DataTypes.VARCHAR(10).notNull(), DataTypes.INT(), DataTypes.BIGINT()}, new String[]{"k1", "k2", "v1", "v2"}), Arrays.asList("k1", "k2"));
            if (statement != null) {
                if (0 == 0) {
                    statement.close();
                    return;
                }
                try {
                    statement.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (statement != null) {
                if (0 != 0) {
                    try {
                        statement.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    statement.close();
                }
            }
            throw th3;
        }
    }

    @Timeout(60)
    @Test
    public void testTableAffix() throws Exception {
        Catalog catalog = catalog();
        catalog.createDatabase(this.database, true);
        catalog.createTable(Identifier.create(this.database, "test_prefix_t1_test_suffix"), Schema.newBuilder().column("k1", DataTypes.INT().notNull()).column("v0", DataTypes.VARCHAR(10)).primaryKey(new String[]{"k1"}).build(), false);
        Map<String, String> basicMySqlConfig = getBasicMySqlConfig();
        basicMySqlConfig.put("database-name", "paimon_sync_database_affix");
        runActionWithDefaultEnv(new MySqlSyncDatabaseAction(this.warehouse, this.database, basicMySqlConfig).withTableConfig(getBasicTableConfig()).withTablePrefix("test_prefix_").withTableSuffix("_test_suffix"));
        Statement statement = getStatement();
        Throwable th = null;
        try {
            try {
                testTableAffixImpl(statement);
                if (statement != null) {
                    if (0 == 0) {
                        statement.close();
                        return;
                    }
                    try {
                        statement.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (statement != null) {
                if (th != null) {
                    try {
                        statement.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    statement.close();
                }
            }
            throw th4;
        }
    }

    private void testTableAffixImpl(Statement statement) throws Exception {
        FileStoreTable fileStoreTable = getFileStoreTable("test_prefix_t1_test_suffix");
        FileStoreTable fileStoreTable2 = getFileStoreTable("test_prefix_t2_test_suffix");
        statement.executeUpdate("USE paimon_sync_database_affix");
        statement.executeUpdate("INSERT INTO t1 VALUES (1, 'one')");
        statement.executeUpdate("INSERT INTO t2 VALUES (2, 'two')");
        statement.executeUpdate("INSERT INTO t1 VALUES (3, 'three')");
        statement.executeUpdate("INSERT INTO t2 VALUES (4, 'four')");
        RowType of = RowType.of(new DataType[]{DataTypes.INT().notNull(), DataTypes.VARCHAR(10)}, new String[]{"k1", "v0"});
        List<String> singletonList = Collections.singletonList("k1");
        waitForResult(Arrays.asList("+I[1, one]", "+I[3, three]"), fileStoreTable, of, singletonList);
        RowType of2 = RowType.of(new DataType[]{DataTypes.INT().notNull(), DataTypes.VARCHAR(10)}, new String[]{"k2", "v0"});
        List<String> singletonList2 = Collections.singletonList("k2");
        waitForResult(Arrays.asList("+I[2, two]", "+I[4, four]"), fileStoreTable2, of2, singletonList2);
        statement.executeUpdate("ALTER TABLE t1 ADD COLUMN v1 INT");
        statement.executeUpdate("INSERT INTO t1 VALUES (5, 'five', 50)");
        statement.executeUpdate("ALTER TABLE t2 ADD COLUMN v1 VARCHAR(10)");
        statement.executeUpdate("INSERT INTO t2 VALUES (6, 'six', 's_6')");
        statement.executeUpdate("INSERT INTO t1 VALUES (7, 'seven', 70)");
        statement.executeUpdate("INSERT INTO t2 VALUES (8, 'eight', 's_8')");
        waitForResult(Arrays.asList("+I[1, one, NULL]", "+I[3, three, NULL]", "+I[5, five, 50]", "+I[7, seven, 70]"), fileStoreTable, RowType.of(new DataType[]{DataTypes.INT().notNull(), DataTypes.VARCHAR(10), DataTypes.INT()}, new String[]{"k1", "v0", "v1"}), singletonList);
        waitForResult(Arrays.asList("+I[2, two, NULL]", "+I[4, four, NULL]", "+I[6, six, s_6]", "+I[8, eight, s_8]"), fileStoreTable2, RowType.of(new DataType[]{DataTypes.INT().notNull(), DataTypes.VARCHAR(10), DataTypes.VARCHAR(10)}, new String[]{"k2", "v0", "v1"}), singletonList2);
        statement.executeUpdate("ALTER TABLE t1 MODIFY COLUMN v1 BIGINT");
        statement.executeUpdate("INSERT INTO t1 VALUES (9, 'nine', 9000000000000)");
        statement.executeUpdate("ALTER TABLE t2 MODIFY COLUMN v1 VARCHAR(20)");
        statement.executeUpdate("INSERT INTO t2 VALUES (10, 'ten', 'long_s_10')");
        waitForResult(Arrays.asList("+I[1, one, NULL]", "+I[3, three, NULL]", "+I[5, five, 50]", "+I[7, seven, 70]", "+I[9, nine, 9000000000000]"), fileStoreTable, RowType.of(new DataType[]{DataTypes.INT().notNull(), DataTypes.VARCHAR(10), DataTypes.BIGINT()}, new String[]{"k1", "v0", "v1"}), singletonList);
        waitForResult(Arrays.asList("+I[2, two, NULL]", "+I[4, four, NULL]", "+I[6, six, s_6]", "+I[8, eight, s_8]", "+I[10, ten, long_s_10]"), fileStoreTable2, RowType.of(new DataType[]{DataTypes.INT().notNull(), DataTypes.VARCHAR(10), DataTypes.VARCHAR(20)}, new String[]{"k2", "v0", "v1"}), singletonList2);
    }

    @Timeout(60)
    @Test
    public void testIncludingTables() throws Exception {
        includingAndExcludingTablesImpl("paimon_sync_database_including", "flink|paimon.+", null, Arrays.asList("flink", "paimon_1", "paimon_2"), Collections.singletonList("ignored"));
    }

    @Timeout(60)
    @Test
    public void testExcludingTables() throws Exception {
        includingAndExcludingTablesImpl("paimon_sync_database_excluding", null, "flink|paimon.+", Collections.singletonList("sync"), Arrays.asList("flink", "paimon_1", "paimon_2"));
    }

    @Timeout(60)
    @Test
    public void testIncludingAndExcludingTables() throws Exception {
        includingAndExcludingTablesImpl("paimon_sync_database_in_excluding", "flink|paimon.+", "paimon_1", Arrays.asList("flink", "paimon_2"), Arrays.asList("paimon_1", "test"));
    }

    private void includingAndExcludingTablesImpl(String str, @Nullable String str2, @Nullable String str3, List<String> list, List<String> list2) throws Exception {
        Map<String, String> basicMySqlConfig = getBasicMySqlConfig();
        basicMySqlConfig.put("database-name", str);
        runActionWithDefaultEnv(new MySqlSyncDatabaseAction(this.warehouse, this.database, basicMySqlConfig).withTableConfig(getBasicTableConfig()).includingTables(str2).excludingTables(str3));
        assertTableExists(list);
        assertTableNotExists(list2);
    }

    @Timeout(60)
    @Test
    public void testIgnoreCase() throws Exception {
        Map<String, String> basicMySqlConfig = getBasicMySqlConfig();
        basicMySqlConfig.put("database-name", "paimon_ignore_CASE");
        runActionWithDefaultEnv(new MySqlSyncDatabaseAction(this.warehouse, this.database, Collections.singletonMap(CatalogOptions.METASTORE.key(), "test-case-insensitive"), basicMySqlConfig).withTableConfig(getBasicTableConfig()));
        FileStoreTable fileStoreTable = getFileStoreTable("t");
        Assertions.assertThat(JsonSerdeUtil.toFlatJson(fileStoreTable.schema().fields())).isEqualTo("[{\"id\":0,\"name\":\"k\",\"type\":\"INT NOT NULL\",\"description\":\"\"},{\"id\":1,\"name\":\"uppercase_v0\",\"type\":\"VARCHAR(20)\",\"description\":\"\"}]");
        Statement statement = getStatement();
        Throwable th = null;
        try {
            try {
                statement.executeUpdate("USE paimon_ignore_CASE");
                statement.executeUpdate("INSERT INTO T VALUES (1, 'Hi')");
                waitForResult(Collections.singletonList("+I[1, Hi]"), fileStoreTable, RowType.of(new DataType[]{DataTypes.INT().notNull(), DataTypes.VARCHAR(20)}, new String[]{"k", "uppercase_v0"}), Collections.singletonList("k"));
                statement.executeUpdate("ALTER TABLE T MODIFY COLUMN UPPERCASE_V0 VARCHAR(30)");
                statement.executeUpdate("INSERT INTO T VALUES (2, 'Paimon')");
                waitForResult(Arrays.asList("+I[1, Hi]", "+I[2, Paimon]"), fileStoreTable, RowType.of(new DataType[]{DataTypes.INT().notNull(), DataTypes.VARCHAR(30)}, new String[]{"k", "uppercase_v0"}), Collections.singletonList("k"));
                statement.executeUpdate("ALTER TABLE T ADD COLUMN UPPERCASE_V1 DOUBLE");
                statement.executeUpdate("INSERT INTO T VALUES (3, 'Test', 0.5)");
                waitForResult(Arrays.asList("+I[1, Hi, NULL]", "+I[2, Paimon, NULL]", "+I[3, Test, 0.5]"), fileStoreTable, RowType.of(new DataType[]{DataTypes.INT().notNull(), DataTypes.VARCHAR(30), DataTypes.DOUBLE()}, new String[]{"k", "uppercase_v0", "uppercase_v1"}), Collections.singletonList("k"));
                if (statement != null) {
                    if (0 == 0) {
                        statement.close();
                        return;
                    }
                    try {
                        statement.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (statement != null) {
                if (th != null) {
                    try {
                        statement.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    statement.close();
                }
            }
            throw th4;
        }
    }

    @Timeout(600)
    @Test
    public void testNewlyAddedTables() throws Exception {
        testNewlyAddedTable(1, true, false, "paimon_sync_database_newly_added_tables");
    }

    @Timeout(600)
    @Test
    public void testNewlyAddedTableSingleTable() throws Exception {
        testNewlyAddedTable(1, false, false, "paimon_sync_database_newly_added_tables_1");
    }

    @Timeout(600)
    @Test
    public void testNewlyAddedTableMultipleTables() throws Exception {
        testNewlyAddedTable(3, false, false, "paimon_sync_database_newly_added_tables_2");
    }

    @Timeout(600)
    @Test
    public void testNewlyAddedTableSchemaChange() throws Exception {
        testNewlyAddedTable(1, false, true, "paimon_sync_database_newly_added_tables_3");
    }

    @Timeout(600)
    @Test
    public void testNewlyAddedTableSingleTableWithSavepoint() throws Exception {
        testNewlyAddedTable(1, true, true, "paimon_sync_database_newly_added_tables_4");
    }

    @Timeout(120)
    @Test
    public void testAddIgnoredTable() throws Exception {
        Map<String, String> basicMySqlConfig = getBasicMySqlConfig();
        basicMySqlConfig.put("database-name", "paimon_sync_database_add_ignored_table");
        runActionWithDefaultEnv(new MySqlSyncDatabaseAction(this.warehouse, this.database, basicMySqlConfig).withTableConfig(getBasicTableConfig()).includingTables("t.+").excludingTables(".*a$").withMode(DatabaseSyncMode.COMBINED));
        Statement statement = getStatement();
        Throwable th = null;
        try {
            try {
                FileStoreTable fileStoreTable = getFileStoreTable("t1");
                statement.executeUpdate("USE paimon_sync_database_add_ignored_table");
                statement.executeUpdate("INSERT INTO t1 VALUES (1, 'one')");
                statement.executeUpdate("INSERT INTO a VALUES (1, 'one')");
                RowType of = RowType.of(new DataType[]{DataTypes.INT().notNull(), DataTypes.VARCHAR(10)}, new String[]{"k", "v1"});
                List<String> singletonList = Collections.singletonList("k");
                waitForResult(Collections.singletonList("+I[1, one]"), fileStoreTable, of, singletonList);
                statement.executeUpdate("CREATE TABLE t2 (k INT, v1 VARCHAR(10), PRIMARY KEY (k))");
                statement.executeUpdate("INSERT INTO t2 VALUES (1, 'Hi')");
                statement.executeUpdate("CREATE TABLE t22 LIKE t2");
                statement.executeUpdate("INSERT INTO t22 VALUES (1, 'Hello')");
                statement.executeUpdate("CREATE TABLE ta (k INT, v1 VARCHAR(10), PRIMARY KEY (k))");
                statement.executeUpdate("INSERT INTO ta VALUES (1, 'Apache')");
                statement.executeUpdate("CREATE TABLE t3 (k INT, v1 VARCHAR(10))");
                statement.executeUpdate("INSERT INTO t3 VALUES (1, 'Paimon')");
                statement.executeUpdate("CREATE TABLE t4 SELECT * FROM t2");
                statement.executeUpdate("INSERT INTO t1 VALUES (2, 'two')");
                waitForResult(Arrays.asList("+I[1, one]", "+I[2, two]"), fileStoreTable, of, singletonList);
                assertTableExists(Arrays.asList("t1", "t2", "t22"));
                assertTableNotExists(Arrays.asList("a", "ta", "t3", "t4"));
                waitForResult(Collections.singletonList("+I[1, Hi]"), getFileStoreTable("t2"), of, singletonList);
                waitForResult(Collections.singletonList("+I[1, Hello]"), getFileStoreTable("t22"), of, singletonList);
                if (statement != null) {
                    if (0 == 0) {
                        statement.close();
                        return;
                    }
                    try {
                        statement.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (statement != null) {
                if (th != null) {
                    try {
                        statement.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    statement.close();
                }
            }
            throw th4;
        }
    }

    public void testNewlyAddedTable(int i, boolean z, boolean z2, String str) throws Exception {
        JobClient buildSyncDatabaseActionWithNewlyAddedTables = buildSyncDatabaseActionWithNewlyAddedTables(str, z2);
        waitJobRunning(buildSyncDatabaseActionWithNewlyAddedTables);
        Statement statement = getStatement();
        Throwable th = null;
        try {
            try {
                testNewlyAddedTableImpl(buildSyncDatabaseActionWithNewlyAddedTables, statement, i, z, z2, str);
                if (statement != null) {
                    if (0 == 0) {
                        statement.close();
                        return;
                    }
                    try {
                        statement.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (statement != null) {
                if (th != null) {
                    try {
                        statement.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    statement.close();
                }
            }
            throw th4;
        }
    }

    private void testNewlyAddedTableImpl(JobClient jobClient, Statement statement, int i, boolean z, boolean z2, String str) throws Exception {
        FileStoreTable fileStoreTable = getFileStoreTable("t1");
        FileStoreTable fileStoreTable2 = getFileStoreTable("t2");
        statement.executeUpdate("USE " + str);
        statement.executeUpdate("INSERT INTO t1 VALUES (1, 'one')");
        statement.executeUpdate("INSERT INTO t2 VALUES (2, 'two', 20, 200)");
        statement.executeUpdate("INSERT INTO t1 VALUES (3, 'three')");
        statement.executeUpdate("INSERT INTO t2 VALUES (4, 'four', 40, 400)");
        waitForResult(Arrays.asList("+I[1, one]", "+I[3, three]"), fileStoreTable, RowType.of(new DataType[]{DataTypes.INT().notNull(), DataTypes.VARCHAR(10)}, new String[]{"k", "v1"}), Collections.singletonList("k"));
        RowType of = RowType.of(new DataType[]{DataTypes.INT().notNull(), DataTypes.VARCHAR(10).notNull(), DataTypes.INT(), DataTypes.BIGINT()}, new String[]{"k1", "k2", "v1", "v2"});
        List<String> asList = Arrays.asList("k1", "k2");
        waitForResult(Arrays.asList("+I[2, two, 20, 200]", "+I[4, four, 40, 400]"), fileStoreTable2, of, asList);
        HashMap hashMap = new HashMap();
        List<String> singletonList = Collections.singletonList("k");
        RowType of2 = RowType.of(new DataType[]{DataTypes.INT().notNull(), DataTypes.VARCHAR(10)}, new String[]{"k", "v1"});
        String newTableName = getNewTableName(0);
        createNewTable(statement, newTableName);
        statement.executeUpdate(String.format("INSERT INTO `%s`.`t2` VALUES (8, 'eight', 80, 800)", str));
        List<Tuple2<Integer, String>> newTableRecords = getNewTableRecords();
        hashMap.put(newTableName, newTableRecords);
        List<String> newTableExpected = getNewTableExpected(newTableRecords);
        insertRecordsIntoNewTable(statement, str, newTableName, newTableRecords);
        if (z) {
            String str2 = (String) jobClient.stopWithSavepoint(false, this.tempDir.toUri().toString(), SavepointFormatType.CANONICAL).join();
            Assertions.assertThat(str2).isNotBlank();
            waitJobRunning(buildSyncDatabaseActionWithNewlyAddedTables(str2, str, z2));
        }
        waitForResult(Arrays.asList("+I[2, two, 20, 200]", "+I[4, four, 40, 400]", "+I[8, eight, 80, 800]"), fileStoreTable2, of, asList);
        waitForResult(newTableExpected, getFileStoreTable(newTableName), of2, singletonList);
        for (int i2 = 1; i2 < i; i2++) {
            String newTableName2 = getNewTableName(i2);
            createNewTable(statement, newTableName2);
            Thread.sleep(5000L);
            List<Tuple2<Integer, String>> newTableRecords2 = getNewTableRecords();
            hashMap.put(newTableName2, newTableRecords2);
            insertRecordsIntoNewTable(statement, str, newTableName2, newTableRecords2);
            waitForResult(getNewTableExpected(newTableRecords2), getFileStoreTable(newTableName2), of2, singletonList);
        }
        ThreadLocalRandom current = ThreadLocalRandom.current();
        String newTableName3 = getNewTableName(current.nextInt(i));
        List<Tuple2<Integer, String>> list = (List) hashMap.get(newTableName3);
        list.add(Tuple2.of(80, "eighty"));
        FileStoreTable fileStoreTable3 = getFileStoreTable(newTableName3);
        List<String> newTableExpected2 = getNewTableExpected(list);
        statement.executeUpdate(String.format("INSERT INTO `%s`.`%s` VALUES (80, 'eighty')", str, newTableName3));
        waitForResult(newTableExpected2, fileStoreTable3, of2, singletonList);
        if (z2) {
            String newTableName4 = getNewTableName(current.nextInt(i));
            List list2 = (List) hashMap.get(newTableName4);
            statement.executeUpdate(String.format("ALTER TABLE `%s`.`%s` ADD COLUMN v2 INT", str, newTableName4));
            statement.executeUpdate(String.format("INSERT INTO `%s`.`%s` VALUES (100, 'hundred', 10000)", str, newTableName4));
            List<String> list3 = (List) list2.stream().map(tuple2 -> {
                return String.format("+I[%d, %s, NULL]", tuple2.f0, tuple2.f1);
            }).collect(Collectors.toList());
            list3.add("+I[100, hundred, 10000]");
            waitForResult(list3, getFileStoreTable(newTableName4), RowType.of(new DataType[]{DataTypes.INT().notNull(), DataTypes.VARCHAR(10), DataTypes.INT()}, new String[]{"k", "v1", "v2"}), singletonList);
            Assertions.assertThat(getFileStoreTable(newTableName4).options()).containsEntry("alter-table-test", "true");
        }
    }

    private List<String> getNewTableExpected(List<Tuple2<Integer, String>> list) {
        return (List) list.stream().map(tuple2 -> {
            return String.format("+I[%d, %s]", tuple2.f0, tuple2.f1);
        }).collect(Collectors.toList());
    }

    private List<Tuple2<Integer, String>> getNewTableRecords() {
        LinkedList linkedList = new LinkedList();
        int nextInt = ThreadLocalRandom.current().nextInt(10) + 1;
        for (int i = 0; i < nextInt; i++) {
            linkedList.add(Tuple2.of(Integer.valueOf(i), "varchar_" + i));
        }
        return linkedList;
    }

    private void insertRecordsIntoNewTable(Statement statement, String str, String str2, List<Tuple2<Integer, String>> list) throws SQLException {
        statement.executeUpdate(String.format("INSERT INTO `%s`.`%s` VALUES %s", str, str2, list.stream().map(tuple2 -> {
            return String.format("(%d, '%s')", tuple2.f0, tuple2.f1);
        }).collect(Collectors.joining(", "))));
    }

    private String getNewTableName(int i) {
        return "t_new_table_" + i;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void createNewTable(Statement statement, String str) throws SQLException {
        statement.executeUpdate(String.format("CREATE TABLE %s (k INT, v1 VARCHAR(10), PRIMARY KEY (k))", str));
    }

    private JobClient buildSyncDatabaseActionWithNewlyAddedTables(String str, boolean z) throws Exception {
        return buildSyncDatabaseActionWithNewlyAddedTables(null, str, z);
    }

    private JobClient buildSyncDatabaseActionWithNewlyAddedTables(String str, String str2, boolean z) throws Exception {
        Map<String, String> basicMySqlConfig = getBasicMySqlConfig();
        basicMySqlConfig.put("database-name", str2);
        basicMySqlConfig.put("scan.incremental.snapshot.chunk.size", "1");
        MySqlSyncDatabaseAction withMode = new MySqlSyncDatabaseAction(this.warehouse, this.database, z ? Collections.singletonMap(CatalogOptions.METASTORE.key(), "test-alter-table") : Collections.emptyMap(), basicMySqlConfig).withTableConfig(getBasicTableConfig()).includingTables("t.+").withMode(DatabaseSyncMode.COMBINED);
        StreamExecutionEnvironment basicEnv = getBasicEnv();
        withMode.build(basicEnv);
        if (!Objects.nonNull(str)) {
            return basicEnv.executeAsync();
        }
        StreamGraph streamGraph = basicEnv.getStreamGraph();
        streamGraph.getJobGraph().setSavepointRestoreSettings(SavepointRestoreSettings.forPath(str, true));
        return basicEnv.executeAsync(streamGraph);
    }

    @Timeout(240)
    @Test
    public void testSyncManyTableWithLimitedMemory() throws Exception {
        ArrayList arrayList = new ArrayList();
        ArrayList arrayList2 = new ArrayList();
        for (int i = 0; i < 100; i++) {
            arrayList.add(Tuple2.of(Integer.valueOf(i), "string_" + i));
            arrayList2.add(String.format("+I[%d, %s]", Integer.valueOf(i), "string_" + i));
        }
        Map<String, String> basicMySqlConfig = getBasicMySqlConfig();
        basicMySqlConfig.put("database-name", "many_table_sync_test");
        basicMySqlConfig.put("scan.incremental.snapshot.chunk.size", "1");
        Map<String, String> basicTableConfig = getBasicTableConfig();
        basicTableConfig.put("sink.parallelism", "1");
        basicTableConfig.put(CoreOptions.WRITE_BUFFER_SIZE.key(), "4 mb");
        runActionWithDefaultEnv(new MySqlSyncDatabaseAction(this.warehouse, this.database, basicMySqlConfig).withTableConfig(basicTableConfig).withMode(DatabaseSyncMode.COMBINED));
        Statement statement = getStatement();
        Throwable th = null;
        try {
            try {
                statement.executeUpdate("USE many_table_sync_test");
                Thread.sleep(2000L);
                ArrayList arrayList3 = new ArrayList();
                arrayList3.add("a");
                for (int i2 = 0; i2 < 100; i2++) {
                    arrayList3.add("t" + i2);
                    new Thread(new SyncNewTableJob(i2, statement, arrayList)).start();
                }
                waitingTables(arrayList3);
                RowType of = RowType.of(new DataType[]{DataTypes.INT().notNull(), DataTypes.VARCHAR(10)}, new String[]{"k", "v1"});
                List<String> singletonList = Collections.singletonList("k");
                for (int i3 = 0; i3 < 100; i3++) {
                    waitForResult(arrayList2, getFileStoreTable("t" + i3), of, singletonList);
                }
                if (statement != null) {
                    if (0 == 0) {
                        statement.close();
                        return;
                    }
                    try {
                        statement.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (statement != null) {
                if (th != null) {
                    try {
                        statement.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    statement.close();
                }
            }
            throw th4;
        }
    }

    @Timeout(60)
    @Test
    public void testSyncMultipleShards() throws Exception {
        Map<String, String> basicMySqlConfig = getBasicMySqlConfig();
        basicMySqlConfig.put("database-name", ThreadLocalRandom.current().nextBoolean() ? "database_shard_.*" : "database_shard_1|database_shard_2");
        DatabaseSyncMode databaseSyncMode = ThreadLocalRandom.current().nextBoolean() ? DatabaseSyncMode.DIVIDED : DatabaseSyncMode.COMBINED;
        runActionWithDefaultEnv(new MySqlSyncDatabaseAction(this.warehouse, this.database, basicMySqlConfig).withTableConfig(getBasicTableConfig()).withMode(databaseSyncMode));
        Statement statement = getStatement();
        Throwable th = null;
        try {
            try {
                statement.executeUpdate("INSERT INTO database_shard_1.t1 VALUES (1, 'db1_1')");
                statement.executeUpdate("INSERT INTO database_shard_1.t1 VALUES (2, 'db1_2')");
                statement.executeUpdate("INSERT INTO database_shard_2.t1 VALUES (3, 'db2_3', 300)");
                statement.executeUpdate("INSERT INTO database_shard_2.t1 VALUES (4, 'db2_4', 400)");
                waitForResult(Arrays.asList("+I[1, db1_1, NULL]", "+I[2, db1_2, NULL]", "+I[3, db2_3, 300]", "+I[4, db2_4, 400]"), getFileStoreTable("t1"), RowType.of(new DataType[]{DataTypes.INT().notNull(), DataTypes.VARCHAR(20), DataTypes.BIGINT()}, new String[]{"k", "v1", "v2"}), Collections.singletonList("k"));
                statement.executeUpdate("ALTER TABLE database_shard_1.t2 ADD COLUMN v2 INT");
                statement.executeUpdate("ALTER TABLE database_shard_2.t2 ADD COLUMN v3 VARCHAR(10)");
                statement.executeUpdate("INSERT INTO database_shard_1.t2 VALUES (1, 1.1, 1)");
                statement.executeUpdate("INSERT INTO database_shard_1.t2 VALUES (2, 2.2, 2)");
                statement.executeUpdate("INSERT INTO database_shard_2.t2 VALUES (3, 3.3, 'db2_3')");
                statement.executeUpdate("INSERT INTO database_shard_2.t2 VALUES (4, 4.4, 'db2_4')");
                waitForResult(Arrays.asList("+I[1, 1.1, 1, NULL]", "+I[2, 2.2, 2, NULL]", "+I[3, 3.3, NULL, db2_3]", "+I[4, 4.4, NULL, db2_4]"), getFileStoreTable("t2"), RowType.of(new DataType[]{DataTypes.BIGINT().notNull(), DataTypes.DOUBLE(), DataTypes.INT(), DataTypes.VARCHAR(10)}, new String[]{"k", "v1", "v2", "v3"}), Collections.singletonList("k"));
                statement.executeUpdate("INSERT INTO database_shard_2.t3 VALUES (1, 'db2_1'), (2, 'db2_2')");
                statement.executeUpdate("INSERT INTO database_shard_1.t3 VALUES (3, 'db1_3'), (4, 'db1_4')");
                waitForResult(Arrays.asList("+I[3, db1_3]", "+I[4, db1_4]"), getFileStoreTable("t3"), RowType.of(new DataType[]{DataTypes.INT().notNull(), DataTypes.VARCHAR(10)}, new String[]{"k", "v1"}), Collections.singletonList("k"));
                if (databaseSyncMode == DatabaseSyncMode.COMBINED) {
                    statement.executeUpdate("CREATE TABLE database_shard_1.t4 (k INT, v1 VARCHAR(10), PRIMARY KEY (k))");
                    statement.executeUpdate("INSERT INTO database_shard_1.t4 VALUES (1, 'db1_1')");
                    statement.executeUpdate("CREATE TABLE database_shard_2.t4 (k INT, v1 VARCHAR(10), PRIMARY KEY (k))");
                    statement.executeUpdate("INSERT INTO database_shard_2.t4 VALUES (2, 'db2_2')");
                    waitingTables("t4");
                    waitForResult(Arrays.asList("+I[1, db1_1]", "+I[2, db2_2]"), getFileStoreTable("t4"), RowType.of(new DataType[]{DataTypes.INT().notNull(), DataTypes.VARCHAR(10)}, new String[]{"k", "v1"}), Collections.singletonList("k"));
                }
                if (statement != null) {
                    if (0 == 0) {
                        statement.close();
                        return;
                    }
                    try {
                        statement.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (statement != null) {
                if (th != null) {
                    try {
                        statement.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    statement.close();
                }
            }
            throw th4;
        }
    }

    @Timeout(60)
    @Test
    public void testSyncMultipleShardsWithoutMerging() throws Exception {
        Map<String, String> basicMySqlConfig = getBasicMySqlConfig();
        basicMySqlConfig.put("database-name", "without_merging_shard_.*");
        DatabaseSyncMode databaseSyncMode = ThreadLocalRandom.current().nextBoolean() ? DatabaseSyncMode.DIVIDED : DatabaseSyncMode.COMBINED;
        runActionWithDefaultEnv(new MySqlSyncDatabaseAction(this.warehouse, this.database, basicMySqlConfig).withTableConfig(getBasicTableConfig()).mergeShards(false).withMode(databaseSyncMode));
        Statement statement = getStatement();
        Throwable th = null;
        try {
            Thread.sleep(5000L);
            Assertions.assertThat(catalog().listTables(this.database)).containsExactlyInAnyOrder(new String[]{"without_merging_shard_1_t1", "without_merging_shard_1_t2", "without_merging_shard_2_t1"});
            statement.executeUpdate("INSERT INTO without_merging_shard_1.t1 VALUES (1, 'db1_1'), (2, 'db1_2')");
            waitForResult(Arrays.asList("+I[1, db1_1]", "+I[2, db1_2]"), getFileStoreTable("without_merging_shard_1_t1"), RowType.of(new DataType[]{DataTypes.INT().notNull(), DataTypes.VARCHAR(10)}, new String[]{"k", "v1"}), Collections.singletonList("k"));
            statement.executeUpdate("INSERT INTO without_merging_shard_2.t1 VALUES (3, 'db2_3', 300), (4, 'db2_4', 400)");
            waitForResult(Arrays.asList("+I[3, db2_3, 300]", "+I[4, db2_4, 400]"), getFileStoreTable("without_merging_shard_2_t1"), RowType.of(new DataType[]{DataTypes.INT().notNull(), DataTypes.VARCHAR(20), DataTypes.BIGINT()}, new String[]{"k", "v1", "v2"}), Collections.singletonList("k"));
            statement.executeUpdate("ALTER TABLE without_merging_shard_1.t2 ADD COLUMN v2 DOUBLE");
            statement.executeUpdate("INSERT INTO without_merging_shard_1.t2 VALUES (1, 'Apache', 1.1)");
            statement.executeUpdate("INSERT INTO without_merging_shard_1.t2 VALUES (2, 'Paimon', 2.2)");
            waitForResult(Arrays.asList("+I[1, Apache, 1.1]", "+I[2, Paimon, 2.2]"), getFileStoreTable("without_merging_shard_1_t2"), RowType.of(new DataType[]{DataTypes.INT().notNull(), DataTypes.VARCHAR(10), DataTypes.DOUBLE()}, new String[]{"k", "v1", "v2"}), Collections.singletonList("k"));
            if (databaseSyncMode == DatabaseSyncMode.COMBINED) {
                statement.executeUpdate("CREATE TABLE without_merging_shard_1.t3 (k INT, v1 VARCHAR(10), PRIMARY KEY (k))");
                statement.executeUpdate("INSERT INTO without_merging_shard_1.t3 VALUES (1, 'test')");
                statement.executeUpdate("CREATE TABLE without_merging_shard_2.t3 (k INT, v1 VARCHAR(10), PRIMARY KEY (k))");
                statement.executeUpdate("INSERT INTO without_merging_shard_2.t3 VALUES (2, 'test')");
                waitingTables("without_merging_shard_1_t3", "without_merging_shard_2_t3");
                FileStoreTable fileStoreTable = getFileStoreTable("without_merging_shard_1_t3");
                RowType of = RowType.of(new DataType[]{DataTypes.INT().notNull(), DataTypes.VARCHAR(10)}, new String[]{"k", "v1"});
                waitForResult(Collections.singletonList("+I[1, test]"), fileStoreTable, of, Collections.singletonList("k"));
                waitForResult(Collections.singletonList("+I[2, test]"), getFileStoreTable("without_merging_shard_2_t3"), of, Collections.singletonList("k"));
            }
            if (statement != null) {
                if (0 == 0) {
                    statement.close();
                    return;
                }
                try {
                    statement.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (statement != null) {
                if (0 != 0) {
                    try {
                        statement.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    statement.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void testMonitoredAndExcludedTablesWithMering() throws Exception {
        Catalog catalog = catalog();
        catalog.createDatabase(this.database, true);
        catalog.createTable(Identifier.create(this.database, "t2"), Schema.newBuilder().column("k", DataTypes.STRING()).column("v1", DataTypes.STRING()).primaryKey(new String[]{"k"}).build(), false);
        Map<String, String> basicMySqlConfig = getBasicMySqlConfig();
        basicMySqlConfig.put("database-name", "monitored_and_excluded_shard_.*");
        MySqlSyncDatabaseAction withMode = new MySqlSyncDatabaseAction(this.warehouse, this.database, basicMySqlConfig).ignoreIncompatible(true).withMode(DatabaseSyncMode.COMBINED);
        withMode.build(StreamExecutionEnvironment.getExecutionEnvironment());
        Assertions.assertThat(withMode.monitoredTables()).containsOnly(new Identifier[]{Identifier.create("monitored_and_excluded_shard_1", "t1"), Identifier.create("monitored_and_excluded_shard_1", "t3"), Identifier.create("monitored_and_excluded_shard_2", "t1")});
        Assertions.assertThat(withMode.excludedTables()).containsOnly(new Identifier[]{Identifier.create("monitored_and_excluded_shard_1", "t2"), Identifier.create("monitored_and_excluded_shard_2", "t2"), Identifier.create("monitored_and_excluded_shard_2", "t3")});
    }

    private void assertTableExists(List<String> list) {
        Catalog catalog = catalog();
        Iterator<String> it = list.iterator();
        while (it.hasNext()) {
            Assertions.assertThat(catalog.tableExists(Identifier.create(this.database, it.next()))).isTrue();
        }
    }

    private void assertTableNotExists(List<String> list) {
        Catalog catalog = catalog();
        Iterator<String> it = list.iterator();
        while (it.hasNext()) {
            Assertions.assertThat(catalog.tableExists(Identifier.create(this.database, it.next()))).isFalse();
        }
    }
}
