package org.apache.paimon.flink;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.flink.table.api.config.ExecutionConfigOptions;
import org.apache.flink.types.Row;
import org.apache.flink.types.RowKind;
import org.apache.flink.util.CloseableIterator;
import org.assertj.core.api.Assertions;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.Test;

/* loaded from: input_file:org/apache/paimon/flink/PartialUpdateITCase.class */
public class PartialUpdateITCase extends CatalogITCaseBase {
    @Override // org.apache.paimon.flink.CatalogITCaseBase
    protected List<String> ddl() {
        return Arrays.asList("CREATE TABLE IF NOT EXISTS T (j INT, k INT, a INT, b INT, c STRING, PRIMARY KEY (j,k) NOT ENFORCED) WITH ('merge-engine'='partial-update');", "CREATE TABLE IF NOT EXISTS dwd_orders (OrderID INT, OrderNumber INT, PersonID INT, LastName STRING, FirstName STRING, Age INT, PRIMARY KEY (OrderID) NOT ENFORCED) WITH ('merge-engine'='partial-update', 'partial-update.ignore-delete'='true');", "CREATE TABLE IF NOT EXISTS ods_orders (OrderID INT, OrderNumber INT, PersonID INT, PRIMARY KEY (OrderID) NOT ENFORCED) WITH ('changelog-producer'='input', 'continuous.discovery-interval'='1s');", "CREATE TABLE IF NOT EXISTS dim_persons (PersonID INT, LastName STRING, FirstName STRING, Age INT, PRIMARY KEY (PersonID) NOT ENFORCED) WITH ('changelog-producer'='input', 'continuous.discovery-interval'='1s');");
    }

    @Test
    public void testMergeInMemory() {
        batchSql("INSERT INTO T VALUES (1, 2, 3, CAST(NULL AS INT), '5'), (1, 2, CAST(NULL AS INT), 6, CAST(NULL AS STRING))", new Object[0]);
        Assertions.assertThat(batchSql("SELECT * FROM T", new Object[0])).containsExactlyInAnyOrder(new Row[]{Row.of(new Object[]{1, 2, 3, 6, "5"})});
    }

    @Test
    public void testMergeRead() {
        batchSql("INSERT INTO T VALUES (1, 2, 3, CAST(NULL AS INT), CAST(NULL AS STRING))", new Object[0]);
        batchSql("INSERT INTO T VALUES (1, 2, 4, 5, CAST(NULL AS STRING))", new Object[0]);
        batchSql("INSERT INTO T VALUES (1, 2, 4, CAST(NULL AS INT), '6')", new Object[0]);
        Assertions.assertThat(batchSql("SELECT * FROM T", new Object[0])).containsExactlyInAnyOrder(new Row[]{Row.of(new Object[]{1, 2, 4, 5, "6"})});
        Assertions.assertThat(batchSql("SELECT a FROM T", new Object[0])).containsExactlyInAnyOrder(new Row[]{Row.of(new Object[]{4})});
    }

    @Test
    public void testMergeCompaction() {
        batchSql("ALTER TABLE T SET ('commit.force-compact'='true')", new Object[0]);
        batchSql("INSERT INTO T VALUES (1, 2, 3, CAST(NULL AS INT), CAST(NULL AS STRING))", new Object[0]);
        batchSql("INSERT INTO T VALUES (1, 2, 4, 5, CAST(NULL AS STRING))", new Object[0]);
        batchSql("INSERT INTO T VALUES (1, 2, 4, CAST(NULL AS INT), '6')", new Object[0]);
        batchSql("INSERT INTO T VALUES (1, 3, CAST(NULL AS INT), 1, '1')", new Object[0]);
        batchSql("INSERT INTO T VALUES (1, 3, 2, 3, CAST(NULL AS STRING))", new Object[0]);
        batchSql("INSERT INTO T VALUES (1, 3, CAST(NULL AS INT), 4, CAST(NULL AS STRING))", new Object[0]);
        Assertions.assertThat(batchSql("SELECT * FROM T", new Object[0])).containsExactlyInAnyOrder(new Row[]{Row.of(new Object[]{1, 2, 4, 5, "6"}), Row.of(new Object[]{1, 3, 2, 4, "1"})});
    }

    @Test
    public void testForeignKeyJoin() throws Exception {
        this.sEnv.getConfig().set(ExecutionConfigOptions.TABLE_EXEC_SINK_UPSERT_MATERIALIZE, ExecutionConfigOptions.UpsertMaterialize.NONE);
        CloseableIterator<Row> streamSqlIter = streamSqlIter("INSERT INTO dwd_orders SELECT OrderID, OrderNumber, PersonID, CAST(NULL AS STRING), CAST(NULL AS STRING), CAST(NULL AS INT) FROM ods_orders UNION ALL SELECT OrderID, CAST(NULL AS INT), dim_persons.PersonID, LastName, FirstName, Age FROM dim_persons JOIN ods_orders ON dim_persons.PersonID = ods_orders.PersonID;", new Object[0]);
        batchSql("INSERT INTO ods_orders VALUES (1, 2, 3)", new Object[0]);
        batchSql("INSERT INTO dim_persons VALUES (3, 'snow', 'jon', 23)", new Object[0]);
        Awaitility.await().pollInSameThread().atMost(5L, TimeUnit.SECONDS).untilAsserted(() -> {
            Assertions.assertThat(rowsToList(batchSql("SELECT * FROM dwd_orders", new Object[0]))).containsExactly(new List[]{Arrays.asList(1, 2, 3, "snow", "jon", 23)});
        });
        batchSql("INSERT INTO ods_orders VALUES (1, 4, 3)", new Object[0]);
        batchSql("INSERT INTO dim_persons VALUES (3, 'snow', 'targaryen', 23)", new Object[0]);
        Awaitility.await().pollInSameThread().atMost(5L, TimeUnit.SECONDS).untilAsserted(() -> {
            Assertions.assertThat(rowsToList(batchSql("SELECT * FROM dwd_orders", new Object[0]))).containsExactly(new List[]{Arrays.asList(1, 4, 3, "snow", "targaryen", 23)});
        });
        streamSqlIter.close();
    }

    private List<List<Object>> rowsToList(List<Row> list) {
        return (List) list.stream().map(this::toList).collect(Collectors.toList());
    }

    private List<Object> toList(Row row) {
        Assertions.assertThat(row.getKind()).isIn(new Object[]{RowKind.INSERT, RowKind.UPDATE_AFTER});
        ArrayList arrayList = new ArrayList();
        for (int i = 0; i < row.getArity(); i++) {
            arrayList.add(row.getField(i));
        }
        return arrayList;
    }

    @Test
    public void testStreamingRead() {
        Assertions.assertThatThrownBy(() -> {
            this.sEnv.from("T").execute().print();
        }, "Partial update continuous reading is not supported", new Object[0]);
    }

    @Test
    public void testSequenceGroup() {
        sql("CREATE TABLE SG (k INT, a INT, b INT, g_1 INT, c INT, d INT, g_2 INT, PRIMARY KEY (k) NOT ENFORCED) WITH ('merge-engine'='partial-update', 'fields.g_1.sequence-group'='a,b', 'fields.g_2.sequence-group'='c,d');", new Object[0]);
        sql("INSERT INTO SG VALUES (1, 1, 1, 1, 1, 1, 1)", new Object[0]);
        sql("INSERT INTO SG VALUES (1, 2, 2, 2, 2, 2, CAST(NULL AS INT))", new Object[0]);
        Assertions.assertThat(sql("SELECT * FROM SG", new Object[0])).containsExactlyInAnyOrder(new Row[]{Row.of(new Object[]{1, 2, 2, 2, 1, 1, 1})});
        Assertions.assertThat(sql("SELECT c, d FROM SG", new Object[0])).containsExactlyInAnyOrder(new Row[]{Row.of(new Object[]{1, 1})});
        sql("INSERT INTO SG VALUES (1, 3, 3, 1, 3, 3, 3)", new Object[0]);
        Assertions.assertThat(sql("SELECT * FROM SG", new Object[0])).containsExactlyInAnyOrder(new Row[]{Row.of(new Object[]{1, 2, 2, 2, 3, 3, 3})});
        sql("INSERT INTO SG VALUES (1, 3, 3, 3, 2, 2, CAST(NULL AS INT))", new Object[0]);
        sql("INSERT INTO SG VALUES (1, 4, 4, 4, 2, 2, CAST(NULL AS INT))", new Object[0]);
        sql("INSERT INTO SG VALUES (1, 5, 5, 3, 5, CAST(NULL AS INT), 4)", new Object[0]);
        Assertions.assertThat(sql("SELECT a, b FROM SG", new Object[0])).containsExactlyInAnyOrder(new Row[]{Row.of(new Object[]{4, 4})});
        Assertions.assertThat(sql("SELECT c, d FROM SG", new Object[0])).containsExactlyInAnyOrder(new Row[]{Row.of(new Object[]{5, null})});
    }
}
