package org.apache.paimon.spark;

import java.io.File;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.paimon.fs.Path;
import org.apache.paimon.fs.local.LocalFileIO;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.table.FileStoreTableFactory;
import org.apache.paimon.types.ArrayType;
import org.apache.paimon.types.BigIntType;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DoubleType;
import org.apache.paimon.types.VarCharType;
import org.apache.spark.sql.AnalysisException;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.catalyst.analysis.NamespaceAlreadyExistsException;
import org.apache.spark.sql.catalyst.analysis.NoSuchNamespaceException;
import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;

/* loaded from: input_file:org/apache/paimon/spark/SparkReadITCase.class */
public class SparkReadITCase extends SparkReadTestBase {
    @Test
    public void testNormal() {
        innerTestSimpleType(spark.table("t1"));
        innerTestNestedType(spark.table("t2"));
    }

    @Test
    public void testFilterPushDown() {
        innerTestSimpleTypeFilterPushDown(spark.table("t1"));
        innerTestNestedTypeFilterPushDown(spark.table("t2"));
    }

    @Test
    public void testCatalogNormal() {
        innerTestSimpleType(spark.table("t1"));
        innerTestNestedType(spark.table("t2"));
    }

    @Test
    public void testSnapshotsTable() {
        List collectAsList = spark.table("`t1$snapshots`").select("snapshot_id", new String[]{"schema_id", "commit_user", "commit_kind"}).collectAsList();
        Assertions.assertThat(collectAsList.toString()).isEqualTo(String.format("[[1,0,%s,APPEND]]", ((Row) collectAsList.get(0)).getString(2)));
        spark.sql("CREATE TABLE schemasTable (\na BIGINT,\nb STRING)\nTBLPROPERTIES ('primary-key' = 'a')");
        spark.sql("ALTER TABLE schemasTable ADD COLUMN c STRING");
        Assertions.assertThat((List) ((List) spark.table("`schemasTable$schemas`").collectAsList().stream().map(row -> {
            return row.get(1);
        }).collect(Collectors.toList())).stream().map((v0) -> {
            return v0.toString();
        }).collect(Collectors.toList())).containsExactlyInAnyOrder(new String[]{"[{\"id\":0,\"name\":\"a\",\"type\":\"BIGINT NOT NULL\"},{\"id\":1,\"name\":\"b\",\"type\":\"STRING\"}]", "[{\"id\":0,\"name\":\"a\",\"type\":\"BIGINT NOT NULL\"},{\"id\":1,\"name\":\"b\",\"type\":\"STRING\"},{\"id\":2,\"name\":\"c\",\"type\":\"STRING\"}]"});
    }

    @Test
    public void testSnapshotsTableWithRecordCount() {
        Assertions.assertThat(spark.table("`t1$snapshots`").select("snapshot_id", new String[]{"total_record_count", "delta_record_count", "changelog_record_count"}).collectAsList().toString()).isEqualTo("[[1,3,3,0]]");
    }

    @Test
    public void testManifestsTable() {
        List collectAsList = spark.table("`t1$manifests`").select("schema_id", new String[]{"file_name", "file_size"}).collectAsList();
        Long valueOf = Long.valueOf(((Row) collectAsList.get(0)).getLong(0));
        String string = ((Row) collectAsList.get(0)).getString(1);
        Long valueOf2 = Long.valueOf(((Row) collectAsList.get(0)).getLong(2));
        Assertions.assertThat(valueOf).isEqualTo(0L);
        Assertions.assertThat(string).startsWith("manifest");
        Assertions.assertThat(valueOf2).isGreaterThan(0L);
    }

    @Test
    public void testManifestsTableWithRecordCount() {
        Assertions.assertThat(spark.table("`t1$manifests`").select("num_added_files", new String[]{"num_deleted_files"}).collectAsList().toString()).isEqualTo("[[1,0]]");
    }

    @Test
    public void testCatalogFilterPushDown() {
        innerTestSimpleTypeFilterPushDown(spark.table("t1"));
        innerTestNestedTypeFilterPushDown(spark.table("t2"));
    }

    @Test
    public void testDefaultNamespace() {
        Assertions.assertThat(spark.sql("SHOW CURRENT NAMESPACE").collectAsList().toString()).isEqualTo("[[paimon,default]]");
    }

    @Test
    public void testCreateTable() {
        spark.sql("CREATE TABLE testCreateTable(\na BIGINT,\nb VARCHAR(10),\nc CHAR(10))");
        Assertions.assertThat(spark.sql("SELECT fields FROM `testCreateTable$schemas`").collectAsList().toString()).isEqualTo("[[[{\"id\":0,\"name\":\"a\",\"type\":\"BIGINT\"},{\"id\":1,\"name\":\"b\",\"type\":\"VARCHAR(10)\"},{\"id\":2,\"name\":\"c\",\"type\":\"CHAR(10)\"}]]]");
    }

    @Test
    public void testCreateTableAs() {
        spark.sql("CREATE TABLE testCreateTable(\na BIGINT,\nb VARCHAR(10),\nc CHAR(10))");
        spark.sql("INSERT INTO testCreateTable VALUES(1,'a','b')");
        spark.sql("CREATE TABLE testCreateTableAs AS SELECT * FROM testCreateTable");
        Assertions.assertThat(spark.sql("SELECT * FROM testCreateTableAs").collectAsList().stream().map((v0) -> {
            return v0.toString();
        })).containsExactlyInAnyOrder(new String[]{"[1,a,b]"});
        spark.sql("CREATE TABLE partitionedTable (\na BIGINT,\nb STRING,\nc STRING)\nPARTITIONED BY (a,b)");
        spark.sql("INSERT INTO partitionedTable VALUES(1,'aaa','bbb')");
        spark.sql("CREATE TABLE partitionedTableAs PARTITIONED BY (a) AS SELECT * FROM partitionedTable");
        Assertions.assertThat(spark.sql("SHOW CREATE TABLE partitionedTableAs").collectAsList().toString()).isEqualTo(String.format("[[%sPARTITIONED BY (a)\nTBLPROPERTIES (\n  'path' = '%s')\n]]", showCreateString("partitionedTableAs", "a BIGINT", "b STRING", "c STRING"), new Path(warehousePath, "default.db/partitionedTableAs")));
        Assertions.assertThat(spark.sql("SELECT * FROM partitionedTableAs").collectAsList().stream().map((v0) -> {
            return v0.toString();
        })).containsExactlyInAnyOrder(new String[]{"[1,aaa,bbb]"});
        spark.sql("CREATE TABLE testTable(\na BIGINT,\nb VARCHAR(10),\nc CHAR(10))\n TBLPROPERTIES(\n 'file.format' = 'orc'\n)");
        spark.sql("INSERT INTO testTable VALUES(1,'a','b')");
        spark.sql("CREATE TABLE testTableAs TBLPROPERTIES ('file.format' = 'parquet') AS SELECT * FROM testTable");
        Assertions.assertThat(spark.sql("SHOW CREATE TABLE testTableAs").collectAsList().toString()).isEqualTo(String.format("[[%sTBLPROPERTIES (\n  'file.format' = 'parquet',\n  'path' = '%s')\n]]", showCreateString("testTableAs", "a BIGINT", "b STRING", "c STRING"), new Path(warehousePath, "default.db/testTableAs")));
        Assertions.assertThat(spark.sql("SELECT * FROM testTableAs").collectAsList().stream().map((v0) -> {
            return v0.toString();
        })).containsExactlyInAnyOrder(new String[]{"[1,a,b]"});
        spark.sql("CREATE TABLE t_pk (\na BIGINT,\nb STRING,\nc STRING\n) TBLPROPERTIES (\n  'primary-key' = 'a,b'\n)\nCOMMENT 'table comment'");
        spark.sql("INSERT INTO t_pk VALUES(1,'aaa','bbb')");
        spark.sql("CREATE TABLE t_pk_as TBLPROPERTIES ('primary-key' = 'a') AS SELECT * FROM t_pk");
        Assertions.assertThat(spark.sql("SHOW CREATE TABLE t_pk_as").collectAsList().toString()).isEqualTo(String.format("[[%sTBLPROPERTIES (\n  'path' = '%s',\n  'primary-key' = 'a')\n]]", showCreateString("t_pk_as", "a BIGINT", "b STRING", "c STRING"), new Path(warehousePath, "default.db/t_pk_as")));
        Assertions.assertThat(spark.sql("SELECT * FROM t_pk_as").collectAsList().stream().map((v0) -> {
            return v0.toString();
        })).containsExactlyInAnyOrder(new String[]{"[1,aaa,bbb]"});
        spark.sql("CREATE TABLE t_all (\n    user_id BIGINT,\n    item_id BIGINT,\n    behavior STRING,\n    dt STRING,\n    hh STRING\n) PARTITIONED BY (dt, hh) TBLPROPERTIES (\n    'primary-key' = 'dt,hh,user_id'\n)");
        spark.sql("INSERT INTO t_all VALUES(1,2,'bbb','2020-01-01','12')");
        spark.sql("CREATE TABLE t_all_as PARTITIONED BY (dt) TBLPROPERTIES ('primary-key' = 'dt,hh') AS SELECT * FROM t_all");
        Assertions.assertThat(spark.sql("SHOW CREATE TABLE t_all_as").collectAsList().toString()).isEqualTo(String.format("[[%sPARTITIONED BY (dt)\nTBLPROPERTIES (\n  'path' = '%s',\n  'primary-key' = 'dt,hh')\n]]", showCreateString("t_all_as", "user_id BIGINT", "item_id BIGINT", "behavior STRING", "dt STRING", "hh STRING"), new Path(warehousePath, "default.db/t_all_as")));
        Assertions.assertThat(spark.sql("SELECT * FROM t_all_as").collectAsList().stream().map((v0) -> {
            return v0.toString();
        })).containsExactlyInAnyOrder(new String[]{"[1,2,bbb,2020-01-01,12]"});
    }

    @Test
    public void testConflictOption() {
        Assertions.assertThatThrownBy(() -> {
            spark.sql("CREATE TABLE T (a INT) TBLPROPERTIES ('changelog-producer' = 'input')");
        }).getRootCause().isInstanceOf(UnsupportedOperationException.class).hasMessageContaining("Can not set changelog-producer on table without primary keys");
        spark.sql("CREATE TABLE T (a INT)");
        Assertions.assertThatThrownBy(() -> {
            spark.sql("ALTER TABLE T SET TBLPROPERTIES('changelog-producer' 'input')");
        }).getRootCause().isInstanceOf(UnsupportedOperationException.class).hasMessageContaining("Can not set changelog-producer on table without primary keys");
    }

    @Test
    public void testShowTablesSorted() {
        spark.sql("create table t3(id int, name string)");
        spark.sql("create table t4(id int, name string)");
        Assertions.assertThat(spark.sql("SHOW TABLES").collectAsList().toString()).isEqualTo("[[default,t1,false], [default,t2,false], [default,t3,false], [default,t4,false]]");
    }

    @Test
    public void testCreateTableWithNullablePk() {
        spark.sql("CREATE TABLE PkTable (\na BIGINT,\nb STRING)\nTBLPROPERTIES ('primary-key' = 'a')");
        Assertions.assertThat(FileStoreTableFactory.create(LocalFileIO.create(), new Path(warehousePath, "default.db/PkTable")).schema().logicalRowType().getTypeAt(0).isNullable()).isFalse();
    }

    @Test
    public void testDescribeTable() {
        spark.sql("CREATE TABLE PartitionedTable (\na BIGINT,\nb STRING)\nPARTITIONED BY (a)\n");
        Assertions.assertThat(spark.sql("DESCRIBE PartitionedTable").collectAsList().toString()).isEqualTo("[[a,bigint,null], [b,string,null], [# Partition Information,,], [# col_name,data_type,comment], [a,bigint,null]]");
    }

    @Test
    public void testShowCreateTable() {
        spark.sql("CREATE TABLE tbl (\n  a INT COMMENT 'a comment',\n  b STRING\n) USING paimon\nPARTITIONED BY (b)\nCOMMENT 'tbl comment'\nTBLPROPERTIES (\n  'primary-key' = 'a,b',\n  'k1' = 'v1'\n)");
        Assertions.assertThat(spark.sql("SHOW CREATE TABLE tbl").collectAsList().toString()).isEqualTo(String.format("[[%sPARTITIONED BY (b)\nCOMMENT 'tbl comment'\nTBLPROPERTIES (\n  'k1' = 'v1',\n  'path' = '%s',\n  'primary-key' = 'a,b')\n]]", showCreateString("tbl", "a INT COMMENT 'a comment'", "b STRING"), new Path(warehousePath, "default.db/tbl")));
    }

    @Test
    public void testShowTableProperties() {
        spark.sql("CREATE TABLE tbl (\n  a INT)\nTBLPROPERTIES (\n  'k1' = 'v1',\n  'k2' = 'v2'\n)");
        Assertions.assertThat((List) spark.sql("SHOW TBLPROPERTIES tbl").collectAsList().stream().map((v0) -> {
            return v0.toString();
        }).collect(Collectors.toList())).contains(new String[]{"[k1,v1]", "[k2,v2]"});
    }

    @Test
    public void testCreateTableWithNonexistentPk() {
        spark.sql("USE paimon");
        Assertions.assertThatThrownBy(() -> {
            spark.sql("CREATE TABLE default.PartitionedPkTable (\na BIGINT,\nb STRING,\nc DOUBLE) USING paimon\nCOMMENT 'table comment'\nPARTITIONED BY (b)\nTBLPROPERTIES ('primary-key' = 'd')");
        }).isInstanceOf(IllegalStateException.class).hasMessageContaining("Table column [a, b, c] should include all primary key constraint [d]");
    }

    @Test
    public void testCreateTableWithNonexistentPartition() {
        Assertions.assertThatThrownBy(() -> {
            spark.sql("CREATE TABLE PartitionedPkTable (\na BIGINT,\nb STRING,\nc DOUBLE)\nPARTITIONED BY (d)\nTBLPROPERTIES ('primary-key' = 'a')");
        }).isInstanceOf(AnalysisException.class).hasMessageContaining("Couldn't find column d");
    }

    @Test
    public void testCreateAndDropTable() {
        innerTest("MyTable1", true, true);
        innerTest("MyTable2", true, false);
        innerTest("MyTable3", false, false);
        innerTest("MyTable4", false, false);
        innerTest("MyTable5", false, true);
        innerTest("MyTable6", false, true);
    }

    private void innerTest(String str, boolean z, boolean z2) {
        HashMap hashMap = new HashMap();
        hashMap.put("foo", "bar");
        hashMap.put("file.format", "avro");
        List asList = Arrays.asList("order_id", "buyer_id", "coupon_info", "order_amount", "dt", "hh");
        List asList2 = Arrays.asList(new BigIntType(false), new BigIntType(false), new ArrayType(false, VarCharType.STRING_TYPE), new DoubleType(false), VarCharType.stringType(false), VarCharType.stringType(false));
        List list = (List) IntStream.range(0, asList.size()).boxed().map(num -> {
            return new DataField(num.intValue(), (String) asList.get(num.intValue()), (DataType) asList2.get(num.intValue()), (String) asList.get(num.intValue()));
        }).collect(Collectors.toList());
        if (z) {
            hashMap.put("primary-key", z2 ? "order_id,dt,hh" : "order_id");
        }
        String format = String.format("CREATE TABLE default.%s (\norder_id BIGINT NOT NULL comment 'order_id',\nbuyer_id BIGINT NOT NULL COMMENT 'buyer_id',\ncoupon_info ARRAY<STRING> NOT NULL COMMENT 'coupon_info',\norder_amount DOUBLE NOT NULL COMMENT 'order_amount',\ndt STRING NOT NULL COMMENT 'dt',\nhh STRING NOT NULL COMMENT 'hh')\nCOMMENT 'table comment'\n%s\nTBLPROPERTIES (%s)", str, z2 ? "PARTITIONED BY (dt, hh)" : "", hashMap.entrySet().stream().map(entry -> {
            return String.format("'%s' = '%s'", entry.getKey(), entry.getValue());
        }).collect(Collectors.joining(", ")));
        spark.sql(format);
        Assertions.assertThatThrownBy(() -> {
            spark.sql(format);
        }).isInstanceOf(TableAlreadyExistsException.class).hasMessageContaining(String.format("Cannot create table or view `default`.`%s` because it already exists", str));
        Assertions.assertThatThrownBy(() -> {
            spark.sql(format.replace("default", "foo"));
        }).isInstanceOf(NoSuchNamespaceException.class).hasMessageContaining("The schema `foo` cannot be found");
        Assertions.assertThatThrownBy(() -> {
            spark.sql(String.format("ALTER TABLE %s UNSET TBLPROPERTIES('primary-key')", str));
        }).isInstanceOf(UnsupportedOperationException.class).hasMessageContaining("Alter primary key is not supported");
        Path path = new Path(warehousePath, String.format("default.db/%s", str));
        TableSchema schema = FileStoreTableFactory.create(LocalFileIO.create(), path).schema();
        Assertions.assertThat(schema.fields()).containsExactlyElementsOf(list);
        Assertions.assertThat(schema.options()).containsEntry("foo", "bar");
        Assertions.assertThat(schema.options()).doesNotContainKey("primary-key");
        if (z) {
            if (z2) {
                Assertions.assertThat(schema.primaryKeys()).containsExactly(new String[]{"order_id", "dt", "hh"});
            } else {
                Assertions.assertThat(schema.primaryKeys()).containsExactly(new String[]{"order_id"});
            }
            Assertions.assertThat(schema.trimmedPrimaryKeys()).containsOnly(new String[]{"order_id"});
        } else {
            Assertions.assertThat(schema.primaryKeys()).isEmpty();
        }
        if (z2) {
            Assertions.assertThat(schema.partitionKeys()).containsExactly(new String[]{"dt", "hh"});
        } else {
            Assertions.assertThat(schema.partitionKeys()).isEmpty();
        }
        Assertions.assertThat(schema.comment()).isEqualTo("table comment");
        writeTable(str, "(1L, 10L, array('loyalty_discount', 'shipping_discount'), 199.0d, '2022-07-20', '12')");
        Dataset load = spark.read().format("paimon").load(path.toString());
        Assertions.assertThat(load.select("order_id", new String[]{"buyer_id", "dt"}).collectAsList().toString()).isEqualTo("[[1,10,2022-07-20]]");
        Assertions.assertThat(load.select("coupon_info", new String[0]).collectAsList().toString()).isEqualTo("[[WrappedArray(loyalty_discount, shipping_discount)]]");
        Assertions.assertThat(spark.sql(String.format("SHOW TABLES IN paimon.default LIKE '%s'", str)).select("namespace", new String[]{"tableName"}).collectAsList().toString()).isEqualTo(String.format("[[default,%s]]", str));
        spark.sql(String.format("DROP TABLE %s", str));
        Assertions.assertThat(spark.sql(String.format("SHOW TABLES IN paimon.default LIKE '%s'", str)).select("namespace", new String[]{"tableName"}).collectAsList().toString()).isEqualTo("[]");
        Assertions.assertThat(new File(path.toUri())).doesNotExist();
    }

    @Test
    public void testCreateAndDropNamespace() {
        spark.sql("CREATE NAMESPACE bar");
        Assertions.assertThatThrownBy(() -> {
            spark.sql("CREATE NAMESPACE bar");
        }).isInstanceOf(NamespaceAlreadyExistsException.class).hasMessageContaining("Cannot create schema `bar` because it already exists");
        Assertions.assertThat((List) spark.sql("SHOW NAMESPACES").collectAsList().stream().map(row -> {
            return row.getString(0);
        }).collect(Collectors.toList())).containsExactlyInAnyOrder(new String[]{"bar", "default"});
        Path path = new Path(warehousePath, "bar.db");
        Assertions.assertThat(new File(path.toUri())).exists();
        spark.sql("DROP NAMESPACE bar");
        Assertions.assertThat(spark.sql("SHOW NAMESPACES").collectAsList().toString()).isEqualTo("[[default]]");
        Assertions.assertThat(new File(path.toUri())).doesNotExist();
    }

    private void innerTestNestedType(Dataset<Row> dataset) {
        Assertions.assertThat(dataset.collectAsList().toString()).isEqualTo("[[1,WrappedArray(AAA, BBB),[[1.0,WrappedArray(null)],1]], [2,WrappedArray(CCC, DDD),[[null,WrappedArray(true)],null]], [3,WrappedArray(null, null),[[2.0,WrappedArray(true, false)],2]], [4,WrappedArray(null, EEE),[[3.0,WrappedArray(true, false, true)],3]]]");
        Assertions.assertThat(dataset.select("a", new String[0]).collectAsList().toString()).isEqualTo("[[1], [2], [3], [4]]");
        Assertions.assertThat(dataset.select("c.c1", new String[0]).collectAsList().toString()).isEqualTo("[[[1.0,WrappedArray(null)]], [[null,WrappedArray(true)]], [[2.0,WrappedArray(true, false)]], [[3.0,WrappedArray(true, false, true)]]]");
        Assertions.assertThat(dataset.select("c.c2", new String[0]).collectAsList().toString()).isEqualTo("[[1], [null], [2], [3]]");
        Assertions.assertThat(dataset.select("c.c1.c11", new String[0]).collectAsList().toString()).isEqualTo("[[1.0], [null], [2.0], [3.0]]");
        Assertions.assertThat(dataset.select("c.c1.c12", new String[0]).collectAsList().toString()).isEqualTo("[[WrappedArray(null)], [WrappedArray(true)], [WrappedArray(true, false)], [WrappedArray(true, false, true)]]");
    }

    private void innerTestSimpleTypeFilterPushDown(Dataset<Row> dataset) {
        Assertions.assertThat(dataset.filter("a < 4").select("a", new String[]{"c"}).collectAsList().toString()).isEqualTo("[[1,1]]");
        Assertions.assertThat(dataset.filter("b = 4").select("a", new String[]{"c"}).collectAsList().toString()).isEqualTo("[]");
    }

    private void innerTestNestedTypeFilterPushDown(Dataset<Row> dataset) {
        Assertions.assertThat(dataset.filter("a < 4").select("a", new String[0]).collectAsList().toString()).isEqualTo("[[1], [2], [3]]");
        Assertions.assertThat(dataset.filter("array_contains(b, 'AAA')").select("b", new String[0]).collectAsList().toString()).isEqualTo("[[WrappedArray(AAA, BBB)]]");
        Assertions.assertThat(dataset.filter("c.c1.c11 is null").select("a", new String[]{"c"}).collectAsList().toString()).isEqualTo("[[2,[[null,WrappedArray(true)],null]]]");
        Assertions.assertThat(dataset.filter("c.c1.c11 = 1.0").select("a", new String[]{"c.c1"}).collectAsList().toString()).isEqualTo("[[1,[1.0,WrappedArray(null)]]]");
        Assertions.assertThat(dataset.filter("c.c2 is null").select("a", new String[]{"c"}).collectAsList().toString()).isEqualTo("[[2,[[null,WrappedArray(true)],null]]]");
        Assertions.assertThat(dataset.filter("array_contains(c.c1.c12, false)").select("a", new String[]{"c.c1.c12", "c.c2"}).collectAsList().toString()).isEqualTo("[[3,WrappedArray(true, false),2], [4,WrappedArray(true, false, true),3]]");
    }

    @Test
    public void testCreateNestedField() {
        spark.sql("CREATE TABLE nested_table ( a INT, b STRUCT<b1: STRUCT<b11: INT, b12 INT>, b2 BIGINT>)");
        Assertions.assertThat(spark.sql("SHOW CREATE TABLE nested_table").collectAsList().toString()).contains(new CharSequence[]{showCreateString("nested_table", "a INT", "b STRUCT<b1: STRUCT<b11: INT, b12: INT>, b2: BIGINT>")});
    }
}
