package org.apache.paimon.spark;

import java.util.Arrays;
import java.util.Iterator;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import org.apache.paimon.data.BinaryString;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.fs.Path;
import org.apache.paimon.fs.local.LocalFileIO;
import org.apache.paimon.manifest.ManifestCommittable;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.FileStoreTableFactory;
import org.apache.paimon.table.sink.CommitMessage;
import org.apache.paimon.table.sink.StreamTableCommit;
import org.apache.paimon.table.sink.StreamTableWrite;
import org.apache.paimon.table.sink.StreamWriteBuilder;
import org.apache.paimon.table.sink.TableCommitImpl;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.RowKind;
import org.apache.paimon.types.RowType;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.io.TempDir;

/* loaded from: input_file:org/apache/paimon/spark/SparkReadTestBase.class */
public abstract class SparkReadTestBase {
    private static final AtomicLong COMMIT_IDENTIFIER = new AtomicLong(0);
    protected static SparkSession spark = null;
    protected static Path warehousePath = null;
    protected static Path tablePath1;
    protected static Path tablePath2;

    @BeforeAll
    public static void startMetastoreAndSpark(@TempDir java.nio.file.Path path) {
        warehousePath = new Path("file:" + path.toString());
        spark = SparkSession.builder().master("local[2]").getOrCreate();
        spark.conf().set("spark.sql.catalog.paimon", SparkCatalog.class.getName());
        spark.conf().set("spark.sql.catalog.paimon.warehouse", warehousePath.toString());
        spark.sql("USE paimon");
    }

    @AfterAll
    public static void stopMetastoreAndSpark() {
        if (spark != null) {
            spark.stop();
            spark = null;
        }
    }

    @BeforeEach
    public void beforeEach() throws Exception {
        tablePath1 = new Path(warehousePath, "default.db/t1");
        createTable("t1");
        writeTable("t1", GenericRow.of(new Object[]{1, 2L, BinaryString.fromString("1")}), GenericRow.of(new Object[]{3, 4L, BinaryString.fromString("2")}), GenericRow.of(new Object[]{5, 6L, BinaryString.fromString("3")}), GenericRow.ofKind(RowKind.DELETE, new Object[]{3, 4L, BinaryString.fromString("2")}));
        tablePath2 = new Path(warehousePath, "default.db/t2");
        spark.sql("CREATE TABLE paimon.default.t2 (a INT NOT NULL COMMENT 'comment about a', b ARRAY<STRING> NOT NULL, c STRUCT<c1: STRUCT<c11: DOUBLE, c12: ARRAY<BOOLEAN> NOT NULL> NOT NULL, c2: BIGINT COMMENT 'comment about c2'> NOT NULL COMMENT 'comment about c') TBLPROPERTIES ('file.format'='avro')");
        writeTable("t2", "(1, array('AAA', 'BBB'), struct(struct(1.0d, array(null)), 1L))", "(2, array('CCC', 'DDD'), struct(struct(null, array(true)), null))");
        writeTable("t2", "(3, array(null, null), struct(struct(2.0d, array(true, false)), 2L))", "(4, array(null, 'EEE'), struct(struct(3.0d, array(true, false, true)), 3L))");
    }

    @AfterEach
    public void afterEach() {
        spark.sql("show tables").collectAsList().forEach(row -> {
            spark.sql("DROP TABLE " + row.getString(0) + "." + row.getString(1));
        });
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void innerTestSimpleType(Dataset<Row> dataset) {
        Assertions.assertThat(dataset.collectAsList().toString()).isEqualTo("[[1,2,1], [5,6,3]]");
        Assertions.assertThat(dataset.select("a", new String[]{"c"}).collectAsList().toString()).isEqualTo("[[1,1], [5,3]]");
        Assertions.assertThat(dataset.groupBy(new Column[0]).sum(new String[]{"b"}).collectAsList().toString()).isEqualTo("[[8]]");
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public TableSchema schema1() {
        return FileStoreTableFactory.create(LocalFileIO.create(), tablePath1).schema();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public TableSchema schema2() {
        return FileStoreTableFactory.create(LocalFileIO.create(), tablePath2).schema();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public boolean fieldIsNullable(DataField dataField) {
        return dataField.type().isNullable();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public DataField getField(TableSchema tableSchema, int i) {
        return (DataField) tableSchema.fields().get(i);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public DataField getNestedField(DataField dataField, int i) {
        if (dataField.type() instanceof RowType) {
            return (DataField) dataField.type().getFields().get(i);
        }
        throw new IllegalArgumentException();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public static void createTable(String str) {
        spark.sql(String.format("CREATE TABLE paimon.default.%s (a INT NOT NULL, b BIGINT, c STRING) TBLPROPERTIES ('bucket' = '1', 'primary-key'='a', 'file.format'='avro')", str));
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public static FileStoreTable getTable(String str) {
        return FileStoreTableFactory.create(LocalFileIO.create(), new Path(warehousePath, String.format("default.db/%s", str)));
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public static void writeTable(String str, GenericRow... genericRowArr) throws Exception {
        StreamWriteBuilder newStreamWriteBuilder = getTable(str).newStreamWriteBuilder();
        StreamTableWrite newWrite = newStreamWriteBuilder.newWrite();
        StreamTableCommit newCommit = newStreamWriteBuilder.newCommit();
        for (GenericRow genericRow : genericRowArr) {
            newWrite.write(genericRow);
        }
        long andIncrement = COMMIT_IDENTIFIER.getAndIncrement();
        newCommit.commit(andIncrement, newWrite.prepareCommit(true, andIncrement));
        newWrite.close();
        newCommit.close();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public static void writeTableWithWatermark(String str, Long l, GenericRow... genericRowArr) throws Exception {
        StreamWriteBuilder newStreamWriteBuilder = getTable(str).newStreamWriteBuilder();
        StreamTableWrite newWrite = newStreamWriteBuilder.newWrite();
        TableCommitImpl newCommit = newStreamWriteBuilder.newCommit();
        for (GenericRow genericRow : genericRowArr) {
            newWrite.write(genericRow);
        }
        long andIncrement = COMMIT_IDENTIFIER.getAndIncrement();
        ManifestCommittable manifestCommittable = new ManifestCommittable(andIncrement, l);
        Iterator it = newWrite.prepareCommit(true, andIncrement).iterator();
        while (it.hasNext()) {
            manifestCommittable.addFileCommittable((CommitMessage) it.next());
        }
        newCommit.commit(manifestCommittable);
        newWrite.close();
        newCommit.close();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public static void writeTable(String str, String... strArr) {
        spark.sql(String.format("INSERT INTO paimon.default.%s VALUES %s", str, StringUtils.join(strArr, ",")));
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public String showCreateString(String str, String... strArr) {
        return String.format("CREATE TABLE paimon.default.%s (%s)\nUSING paimon\n", str, Arrays.stream(strArr).map(str2 -> {
            return "\n  " + str2;
        }).collect(Collectors.joining(",")));
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public String defaultShowCreateString(String str) {
        return showCreateString(str, "a INT", "b BIGINT", "c STRING");
    }
}
