package org.apache.paimon.spark;

import java.time.LocalDateTime;
import java.util.HashMap;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.data.BinaryString;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.testutils.assertj.PaimonAssertions;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.ThrowingConsumer;
import org.junit.jupiter.api.Test;

/* loaded from: input_file:org/apache/paimon/spark/SparkTimeTravelITCase.class */
public class SparkTimeTravelITCase extends SparkReadTestBase {
    @Test
    public void testTravelToVersion() throws Exception {
        spark.sql("CREATE TABLE t (k INT, v STRING)");
        writeTable("t", GenericRow.of(new Object[]{1, BinaryString.fromString("Hello")}), GenericRow.of(new Object[]{2, BinaryString.fromString("Paimon")}));
        writeTable("t", GenericRow.of(new Object[]{3, BinaryString.fromString("Test")}), GenericRow.of(new Object[]{4, BinaryString.fromString("Case")}));
        Assertions.assertThat(spark.sql("SELECT * FROM t").collectAsList().toString()).isEqualTo("[[1,Hello], [2,Paimon], [3,Test], [4,Case]]");
        Assertions.assertThat(spark.sql("SELECT * FROM t VERSION AS OF 1").collectAsList().toString()).isEqualTo("[[1,Hello], [2,Paimon]]");
    }

    @Test
    public void testTravelToTimestampString() throws Exception {
        spark.sql("CREATE TABLE t (k INT, v STRING)");
        writeTable("t", GenericRow.of(new Object[]{1, BinaryString.fromString("Hello")}), GenericRow.of(new Object[]{2, BinaryString.fromString("Paimon")}));
        String localDateTime = LocalDateTime.now().toString();
        writeTable("t", GenericRow.of(new Object[]{3, BinaryString.fromString("Test")}), GenericRow.of(new Object[]{4, BinaryString.fromString("Case")}));
        Assertions.assertThat(spark.sql("SELECT * FROM t").collectAsList().toString()).isEqualTo("[[1,Hello], [2,Paimon], [3,Test], [4,Case]]");
        Assertions.assertThat(spark.sql(String.format("SELECT * FROM t TIMESTAMP AS OF '%s'", localDateTime)).collectAsList().toString()).isEqualTo("[[1,Hello], [2,Paimon]]");
    }

    @Test
    public void testTravelToTimestampNumber() throws Exception {
        spark.sql("CREATE TABLE t (k INT, v STRING)");
        writeTable("t", GenericRow.of(new Object[]{1, BinaryString.fromString("Hello")}), GenericRow.of(new Object[]{2, BinaryString.fromString("Paimon")}));
        Thread.sleep(1000L);
        long currentTimeMillis = System.currentTimeMillis() / 1000;
        writeTable("t", GenericRow.of(new Object[]{3, BinaryString.fromString("Test")}), GenericRow.of(new Object[]{4, BinaryString.fromString("Case")}));
        Assertions.assertThat(spark.sql("SELECT * FROM t").collectAsList().toString()).isEqualTo("[[1,Hello], [2,Paimon], [3,Test], [4,Case]]");
        Assertions.assertThat(spark.sql(String.format("SELECT * FROM t TIMESTAMP AS OF %s", Long.valueOf(currentTimeMillis))).collectAsList().toString()).isEqualTo("[[1,Hello], [2,Paimon]]");
    }

    @Test
    public void testTravelToOldSchema() throws Exception {
        spark.sql("CREATE TABLE t (k INT, v STRING)");
        writeTable("t", GenericRow.of(new Object[]{1, BinaryString.fromString("Hello")}), GenericRow.of(new Object[]{2, BinaryString.fromString("Paimon")}));
        spark.sql("ALTER TABLE t ADD COLUMN dt STRING");
        writeTable("t", GenericRow.of(new Object[]{3, BinaryString.fromString("Test"), BinaryString.fromString("0401")}), GenericRow.of(new Object[]{4, BinaryString.fromString("Case"), BinaryString.fromString("0402")}));
        Assertions.assertThat(spark.sql("SELECT * FROM t").collectAsList().toString()).isEqualTo("[[1,Hello,null], [2,Paimon,null], [3,Test,0401], [4,Case,0402]]");
        Assertions.assertThat(spark.sql("SELECT * FROM t VERSION AS OF 1").collectAsList().toString()).isEqualTo("[[1,Hello], [2,Paimon]]");
    }

    @Test
    public void testTravelToNonExistedVersion() {
        spark.sql("CREATE TABLE t (k INT, v STRING)");
        Assertions.assertThat(spark.sql("SELECT * FROM t VERSION AS OF 2").collectAsList()).isEmpty();
    }

    @Test
    public void testTravelToNonExistedTimestamp() {
        long currentTimeMillis = System.currentTimeMillis() / 1000;
        spark.sql("CREATE TABLE t (k INT, v STRING)");
        Assertions.assertThat(spark.sql(String.format("SELECT * FROM t TIMESTAMP AS OF %s", Long.valueOf(currentTimeMillis))).collectAsList()).isEmpty();
    }

    @Test
    public void testSystemTableTimeTravel() throws Exception {
        spark.sql("CREATE TABLE t (k INT, v STRING) TBLPROPERTIES ('bucket' = '1')");
        writeTable("t", GenericRow.of(new Object[]{1, BinaryString.fromString("Hello")}), GenericRow.of(new Object[]{2, BinaryString.fromString("Paimon")}));
        String localDateTime = LocalDateTime.now().toString();
        writeTable("t", GenericRow.of(new Object[]{3, BinaryString.fromString("Test")}), GenericRow.of(new Object[]{4, BinaryString.fromString("Case")}));
        Assertions.assertThat(spark.sql("SELECT * FROM `t$files`").collectAsList().size()).isEqualTo(2);
        Assertions.assertThat(spark.sql("SELECT * FROM `t$files` VERSION AS OF 1").collectAsList().size()).isEqualTo(1);
        Assertions.assertThat(spark.sql(String.format("SELECT * FROM `t$files` TIMESTAMP AS OF '%s'", localDateTime)).collectAsList().size()).isEqualTo(1);
    }

    @Test
    public void testTravelToTag() throws Exception {
        spark.sql("CREATE TABLE t (k INT, v STRING)");
        writeTable("t", GenericRow.of(new Object[]{1, BinaryString.fromString("Hello")}), GenericRow.of(new Object[]{2, BinaryString.fromString("Paimon")}));
        writeTable("t", GenericRow.of(new Object[]{3, BinaryString.fromString("Test")}), GenericRow.of(new Object[]{4, BinaryString.fromString("Case")}));
        writeTable("t", GenericRow.of(new Object[]{5, BinaryString.fromString("Time")}), GenericRow.of(new Object[]{6, BinaryString.fromString("Travel")}));
        getTable("t").createTag("tag2", 2L);
        Assertions.assertThat(spark.sql("SELECT * FROM t VERSION AS OF 'tag2'").collectAsList().toString()).isEqualTo("[[1,Hello], [2,Paimon], [3,Test], [4,Case]]");
    }

    @Test
    public void testTravelToNonExistingTag() {
        spark.sql("CREATE TABLE t (k INT, v STRING)");
        Assertions.assertThatThrownBy(() -> {
            spark.sql("SELECT * FROM t VERSION AS OF 'unknown'").collectAsList();
        }).satisfies(new ThrowingConsumer[]{PaimonAssertions.anyCauseMatches(RuntimeException.class, "Cannot find a time travel version for unknown")});
    }

    @Test
    public void testTravelToTagWithSnapshotExpiration() throws Exception {
        spark.sql("CREATE TABLE t (k INT, v STRING)");
        writeTable("t", GenericRow.of(new Object[]{1, BinaryString.fromString("Hello")}), GenericRow.of(new Object[]{2, BinaryString.fromString("Paimon")}));
        writeTable("t", GenericRow.of(new Object[]{3, BinaryString.fromString("Test")}), GenericRow.of(new Object[]{4, BinaryString.fromString("Case")}));
        writeTable("t", GenericRow.of(new Object[]{5, BinaryString.fromString("Time")}), GenericRow.of(new Object[]{6, BinaryString.fromString("Travel")}));
        FileStoreTable table = getTable("t");
        table.createTag("tag2", 2L);
        HashMap hashMap = new HashMap();
        hashMap.put(CoreOptions.SNAPSHOT_NUM_RETAINED_MAX.key(), "1");
        hashMap.put(CoreOptions.SNAPSHOT_NUM_RETAINED_MIN.key(), "1");
        table.copy(hashMap).newCommit("").expireSnapshots();
        Assertions.assertThat(table.snapshotManager().snapshotCount()).isEqualTo(1L);
        Assertions.assertThat(spark.sql("SELECT * FROM t VERSION AS OF 'tag2'").collectAsList().toString()).isEqualTo("[[1,Hello], [2,Paimon], [3,Test], [4,Case]]");
    }

    @Test
    public void testTravelToTagWithDigitalName() throws Exception {
        spark.sql("CREATE TABLE t (k INT, v STRING)");
        writeTable("t", GenericRow.of(new Object[]{1, BinaryString.fromString("Hello")}), GenericRow.of(new Object[]{2, BinaryString.fromString("Paimon")}));
        writeTable("t", GenericRow.of(new Object[]{3, BinaryString.fromString("Test")}), GenericRow.of(new Object[]{4, BinaryString.fromString("Case")}));
        getTable("t").createTag("1", 2L);
        Assertions.assertThat(spark.sql("SELECT * FROM t VERSION AS OF '1'").collectAsList().toString()).isEqualTo("[[1,Hello], [2,Paimon], [3,Test], [4,Case]]");
    }

    @Test
    public void testTravelWithWatermark() throws Exception {
        spark.sql("CREATE TABLE t (k INT, v STRING)");
        writeTableWithWatermark("t", 1L, GenericRow.of(new Object[]{1, BinaryString.fromString("Hello")}), GenericRow.of(new Object[]{2, BinaryString.fromString("Paimon")}));
        writeTableWithWatermark("t", null, GenericRow.of(new Object[]{1, BinaryString.fromString("Null")}), GenericRow.of(new Object[]{2, BinaryString.fromString("Watermark")}));
        writeTableWithWatermark("t", 10L, GenericRow.of(new Object[]{3, BinaryString.fromString("Time")}), GenericRow.of(new Object[]{4, BinaryString.fromString("Travel")}));
        Assertions.assertThat(spark.sql("SELECT * FROM t version as of 'watermark-1'").collectAsList().toString()).isEqualTo("[[1,Hello], [2,Paimon]]");
        try {
            spark.sql("SELECT * FROM t version as of 'watermark-11'").collectAsList();
        } catch (Exception e) {
            Assertions.assertThat(e.getMessage().equals("There is currently no snapshot later than or equal to watermark[11]"));
        }
        Assertions.assertThat(spark.sql("SELECT * FROM t version as of 'watermark-9'").collectAsList().toString()).isEqualTo("[[1,Hello], [2,Paimon], [1,Null], [2,Watermark], [3,Time], [4,Travel]]");
        Assertions.assertThat(spark.sql("SELECT * FROM t version as of 'watermark-10'").collectAsList().toString()).isEqualTo("[[1,Hello], [2,Paimon], [1,Null], [2,Watermark], [3,Time], [4,Travel]]");
    }
}
