package org.apache.paimon.flink.action.cdc.mongodb;

import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.RowType;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;

/* loaded from: input_file:org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncTableActionITCase.class */
public class MongoDBSyncTableActionITCase extends MongoDBActionITCaseBase {
    @Timeout(60)
    @Test
    public void testSchemaEvolution() throws Exception {
        runSingleTableSchemaEvolution("inventory-1");
    }

    private void runSingleTableSchemaEvolution(String str) throws Exception {
        String createRecordsToMongoDB = createRecordsToMongoDB(str, "table");
        Map<String, String> basicMongoDBConfig = getBasicMongoDBConfig();
        basicMongoDBConfig.put("database", createRecordsToMongoDB);
        basicMongoDBConfig.put("collection", "products");
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.setParallelism(1);
        executionEnvironment.enableCheckpointing(1000L);
        executionEnvironment.setRestartStrategy(RestartStrategies.noRestart());
        new MongoDBSyncTableAction(basicMongoDBConfig, this.warehouse, this.database, this.tableName, Collections.emptyList(), Collections.emptyMap(), getBasicTableConfig()).build(executionEnvironment);
        waitJobRunning(executionEnvironment.executeAsync());
        testSchemaEvolutionImpl(createRecordsToMongoDB);
    }

    private void testSchemaEvolutionImpl(String str) throws Exception {
        waitTablesCreated(this.tableName);
        FileStoreTable fileStoreTable = getFileStoreTable(this.tableName);
        List<String> singletonList = Collections.singletonList("_id");
        waitForResult(Arrays.asList("+I[100000000000000000000101, scooter, Small 2-wheel scooter, 3.14]", "+I[100000000000000000000102, car battery, 12V car battery, 8.1]", "+I[100000000000000000000103, 12-pack drill bits, 12-pack of drill bits with sizes ranging from #40 to #3, 0.8]"), fileStoreTable, RowType.of(new DataType[]{DataTypes.STRING().notNull(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING()}, new String[]{"_id", "name", "description", "weight"}), singletonList);
        writeRecordsToMongoDB("inventory-2", str, "table");
        waitForResult(Arrays.asList("+U[100000000000000000000101, scooter, Small 2-wheel scooter, 350]", "+U[100000000000000000000102, car battery, High-performance car battery, 8.1]", "+U[100000000000000000000103, 12-pack drill bits, Set of 12 professional-grade drill bits, 0.8]"), fileStoreTable, RowType.of(new DataType[]{DataTypes.STRING().notNull(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING()}, new String[]{"_id", "name", "description", "weight"}), singletonList);
        writeRecordsToMongoDB("inventory-3", str, "table");
        waitForResult(Arrays.asList("+U[100000000000000000000102, car battery, High-performance car battery, 8.1, NULL, 18, NULL]", "+U[100000000000000000000103, 12-pack drill bits, Set of 12 professional-grade drill bits, 0.8, NULL, NULL, I live in Sanlitun]", "+U[100000000000000000000101, scooter, Small 2-wheel scooter, 350, playing computer games, NULL, NULL]"), fileStoreTable, RowType.of(new DataType[]{DataTypes.STRING().notNull(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING()}, new String[]{"_id", "name", "description", "weight", "hobby", "age", "address"}), singletonList);
    }

    @Timeout(60)
    @Test
    public void testSpecifiedMode() throws Exception {
        String createRecordsToMongoDB = createRecordsToMongoDB("inventory-1", "table");
        Map<String, String> basicMongoDBConfig = getBasicMongoDBConfig();
        basicMongoDBConfig.put("database", createRecordsToMongoDB);
        basicMongoDBConfig.put("collection", "products");
        basicMongoDBConfig.put("field.name", "_id,name,description");
        basicMongoDBConfig.put("parser.path", "_id,name,description");
        basicMongoDBConfig.put("schema.start.mode", "specified");
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.setParallelism(1);
        executionEnvironment.enableCheckpointing(1000L);
        executionEnvironment.setRestartStrategy(RestartStrategies.noRestart());
        new MongoDBSyncTableAction(basicMongoDBConfig, this.warehouse, this.database, this.tableName, Collections.emptyList(), Collections.emptyMap(), getBasicTableConfig()).build(executionEnvironment);
        waitJobRunning(executionEnvironment.executeAsync());
        waitTablesCreated(this.tableName);
        waitForResult(Arrays.asList("+I[100000000000000000000101, scooter, Small 2-wheel scooter]", "+I[100000000000000000000102, car battery, 12V car battery]", "+I[100000000000000000000103, 12-pack drill bits, 12-pack of drill bits with sizes ranging from #40 to #3]"), getFileStoreTable(this.tableName), RowType.of(new DataType[]{DataTypes.STRING().notNull(), DataTypes.STRING(), DataTypes.STRING()}, new String[]{"_id", "name", "description"}), Collections.singletonList("_id"));
    }
}
