package org.apache.paimon.flink;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import org.apache.flink.table.planner.factories.TestValuesTableFactory;
import org.apache.flink.types.Row;
import org.apache.paimon.flink.kafka.KafkaTableTestBase;
import org.apache.paimon.flink.util.ReadWriteTableTestUtil;
import org.apache.paimon.utils.BlockingIterator;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

/* loaded from: input_file:org/apache/paimon/flink/CompositePkAndMultiPartitionedTableWIthKafkaLogITCase.class */
public class CompositePkAndMultiPartitionedTableWIthKafkaLogITCase extends KafkaTableTestBase {
    @BeforeEach
    public void setUp() {
        ReadWriteTableTestUtil.init(getTempDirPath());
    }

    @Test
    public void testStreamingReadWriteMultiPartitionedRecordsWithMultiPk() throws Exception {
        String createTemporaryTable = ReadWriteTableTestUtil.createTemporaryTable(Arrays.asList("from_currency STRING", "to_currency STRING", "rate_by_to_currency DOUBLE", "dt STRING", "hh STRING"), Arrays.asList("from_currency", "to_currency", "dt", "hh"), Arrays.asList("dt", "hh"), Arrays.asList(TestValuesTableFactory.changelogRow("+I", new Object[]{"US Dollar", "US Dollar", Double.valueOf(1.0d), "2022-01-02", "20"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Euro", "US Dollar", null, "2022-01-02", "20"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"HK Dollar", "US Dollar", Double.valueOf(0.13d), "2022-01-02", "20"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Yen", "US Dollar", Double.valueOf(0.0082d), "2022-01-02", "20"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Singapore Dollar", "US Dollar", Double.valueOf(0.74d), "2022-01-02", "20"}), TestValuesTableFactory.changelogRow("+U", new Object[]{"Yen", "US Dollar", Double.valueOf(0.0081d), "2022-01-02", "20"}), TestValuesTableFactory.changelogRow("-D", new Object[]{"US Dollar", "US Dollar", Double.valueOf(1.0d), "2022-01-02", "20"}), TestValuesTableFactory.changelogRow("-D", new Object[]{"Euro", "US Dollar", null, "2022-01-02", "20"}), TestValuesTableFactory.changelogRow("+U", new Object[]{"Singapore Dollar", "US Dollar", Double.valueOf(0.76d), "2022-01-02", "20"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"US Dollar", "Euro", Double.valueOf(0.9d), "2022-01-02", "20"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Singapore Dollar", "Euro", Double.valueOf(0.67d), "2022-01-02", "20"}), TestValuesTableFactory.changelogRow("+U", new Object[]{"Singapore Dollar", "Euro", null, "2022-01-02", "20"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Yen", "Yen", Double.valueOf(1.0d), "2022-01-02", "20"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Chinese Yuan", "Yen", Double.valueOf(19.25d), "2022-01-02", "20"}), TestValuesTableFactory.changelogRow("+U", new Object[]{"Chinese Yuan", "Yen", Double.valueOf(25.6d), "2022-01-02", "20"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Singapore Dollar", "Yen", Double.valueOf(90.32d), "2022-01-02", "20"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"US Dollar", "Yen", Double.valueOf(122.46d), "2022-01-02", "21"}), TestValuesTableFactory.changelogRow("+U", new Object[]{"Singapore Dollar", "Yen", Double.valueOf(90.1d), "2022-01-02", "20"})), "dt:2022-01-02,hh:20;dt:2022-01-02,hh:21", false, "I,UA,D");
        String createTableWithKafkaLog = ReadWriteTableTestUtil.createTableWithKafkaLog(Arrays.asList("from_currency STRING", "to_currency STRING", "rate_by_to_currency DOUBLE", "dt STRING", "hh STRING"), Arrays.asList("from_currency", "to_currency", "dt", "hh"), Arrays.asList("dt", "hh"), false);
        ReadWriteTableTestUtil.insertIntoFromTable(createTemporaryTable, createTableWithKafkaLog);
        ReadWriteTableTestUtil.checkFileStorePath(createTableWithKafkaLog, Arrays.asList("dt=2022-01-02,hh=20", "dt=2022-01-02,hh=21"));
        BlockingIterator<Row, Row> testStreamingRead = ReadWriteTableTestUtil.testStreamingRead(ReadWriteTableTestUtil.buildSimpleQuery(createTableWithKafkaLog), Arrays.asList(TestValuesTableFactory.changelogRow("+I", new Object[]{"HK Dollar", "US Dollar", Double.valueOf(0.13d), "2022-01-02", "20"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Yen", "US Dollar", Double.valueOf(0.0081d), "2022-01-02", "20"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Singapore Dollar", "US Dollar", Double.valueOf(0.76d), "2022-01-02", "20"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"US Dollar", "Euro", Double.valueOf(0.9d), "2022-01-02", "20"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Singapore Dollar", "Euro", null, "2022-01-02", "20"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Yen", "Yen", Double.valueOf(1.0d), "2022-01-02", "20"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Chinese Yuan", "Yen", Double.valueOf(25.6d), "2022-01-02", "20"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"US Dollar", "Yen", Double.valueOf(122.46d), "2022-01-02", "21"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Singapore Dollar", "Yen", Double.valueOf(90.1d), "2022-01-02", "20"})));
        ReadWriteTableTestUtil.insertInto(createTableWithKafkaLog, "('Chinese Yuan', 'HK Dollar', 1.231, '2022-01-03', '15')");
        ReadWriteTableTestUtil.validateStreamingReadResult(testStreamingRead, Collections.singletonList(TestValuesTableFactory.changelogRow("+I", new Object[]{"Chinese Yuan", "HK Dollar", Double.valueOf(1.231d), "2022-01-03", "15"})));
        ReadWriteTableTestUtil.bEnv.executeSql(String.format("INSERT OVERWRITE `%s` SELECT 'US Dollar', 'US Dollar', 1, '2022-04-02', '10' FROM `%s`", createTableWithKafkaLog, createTableWithKafkaLog)).await();
        ReadWriteTableTestUtil.checkFileStorePath(createTableWithKafkaLog, Collections.singletonList("dt=2022-04-02,hh=10"));
        ReadWriteTableTestUtil.testBatchRead(ReadWriteTableTestUtil.buildSimpleQuery(createTableWithKafkaLog), Collections.singletonList(TestValuesTableFactory.changelogRow("+I", new Object[]{"US Dollar", "US Dollar", Double.valueOf(1.0d), "2022-04-02", "10"})));
        ReadWriteTableTestUtil.assertNoMoreRecords(testStreamingRead);
        testStreamingRead.close();
        String createTableWithKafkaLog2 = ReadWriteTableTestUtil.createTableWithKafkaLog(Arrays.asList("from_currency STRING", "to_currency STRING", "rate_by_to_currency DOUBLE", "dt STRING", "hh STRING"), Arrays.asList("from_currency", "to_currency", "dt", "hh"), Arrays.asList("dt", "hh"), false);
        ReadWriteTableTestUtil.insertIntoFromTable(createTemporaryTable, createTableWithKafkaLog2);
        ReadWriteTableTestUtil.testStreamingRead(ReadWriteTableTestUtil.buildQuery(createTableWithKafkaLog2, "*", "WHERE dt = '2022-01-02' AND from_currency = 'US Dollar'"), Arrays.asList(TestValuesTableFactory.changelogRow("+I", new Object[]{"US Dollar", "Euro", Double.valueOf(0.9d), "2022-01-02", "20"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"US Dollar", "Yen", Double.valueOf(122.46d), "2022-01-02", "21"}))).close();
        ReadWriteTableTestUtil.testStreamingRead(ReadWriteTableTestUtil.buildQuery(createTableWithKafkaLog2, "from_currency, to_currency", "WHERE dt = '2022-01-02' AND from_currency = 'US Dollar'"), Arrays.asList(TestValuesTableFactory.changelogRow("+I", new Object[]{"US Dollar", "Euro"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"US Dollar", "Yen"}))).close();
    }

    @Test
    public void testStreamingReadWriteSinglePartitionedRecordsWithMultiPk() throws Exception {
        String createTemporaryTable = ReadWriteTableTestUtil.createTemporaryTable(Arrays.asList("from_currency STRING", "to_currency STRING", "rate_by_to_currency DOUBLE", "dt STRING"), Arrays.asList("from_currency", "to_currency", "dt"), Collections.singletonList("dt"), Arrays.asList(TestValuesTableFactory.changelogRow("+I", new Object[]{"US Dollar", "US Dollar", null, "2022-01-01"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Euro", "US Dollar", null, "2022-01-01"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"HK Dollar", "US Dollar", Double.valueOf(0.13d), "2022-01-01"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Yen", "US Dollar", Double.valueOf(0.0082d), "2022-01-01"}), TestValuesTableFactory.changelogRow("-D", new Object[]{"Yen", "US Dollar", Double.valueOf(0.0082d), "2022-01-01"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Singapore Dollar", "US Dollar", Double.valueOf(0.74d), "2022-01-01"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Yen", "US Dollar", Double.valueOf(0.0081d), "2022-01-01"}), TestValuesTableFactory.changelogRow("+U", new Object[]{"Euro", "US Dollar", Double.valueOf(1.11d), "2022-01-01"}), TestValuesTableFactory.changelogRow("+U", new Object[]{"US Dollar", "US Dollar", Double.valueOf(1.0d), "2022-01-01"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"US Dollar", "Euro", Double.valueOf(0.9d), "2022-01-02"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Singapore Dollar", "Euro", Double.valueOf(0.67d), "2022-01-02"}), TestValuesTableFactory.changelogRow("-D", new Object[]{"Singapore Dollar", "Euro", Double.valueOf(0.67d), "2022-01-02"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Yen", "Yen", Double.valueOf(1.0d), "2022-01-02"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Chinese Yuan", "Yen", Double.valueOf(19.25d), "2022-01-02"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Singapore Dollar", "Yen", Double.valueOf(90.32d), "2022-01-02"}), TestValuesTableFactory.changelogRow("+U", new Object[]{"Singapore Dollar", "Yen", Double.valueOf(122.46d), "2022-01-02"})), "dt:2022-01-01;dt:2022-01-02", false, "I,UA,D");
        String createTableWithKafkaLog = ReadWriteTableTestUtil.createTableWithKafkaLog(Arrays.asList("from_currency STRING", "to_currency STRING", "rate_by_to_currency DOUBLE", "dt STRING"), Arrays.asList("from_currency", "to_currency", "dt"), Collections.singletonList("dt"), false);
        ReadWriteTableTestUtil.insertIntoFromTable(createTemporaryTable, createTableWithKafkaLog);
        ReadWriteTableTestUtil.checkFileStorePath(createTableWithKafkaLog, Arrays.asList("dt=2022-01-01", "dt=2022-01-02"));
        BlockingIterator<Row, Row> testStreamingRead = ReadWriteTableTestUtil.testStreamingRead(ReadWriteTableTestUtil.buildSimpleQuery(createTableWithKafkaLog), Arrays.asList(TestValuesTableFactory.changelogRow("+I", new Object[]{"HK Dollar", "US Dollar", Double.valueOf(0.13d), "2022-01-01"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Singapore Dollar", "US Dollar", Double.valueOf(0.74d), "2022-01-01"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Yen", "US Dollar", Double.valueOf(0.0081d), "2022-01-01"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Euro", "US Dollar", Double.valueOf(1.11d), "2022-01-01"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"US Dollar", "US Dollar", Double.valueOf(1.0d), "2022-01-01"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"US Dollar", "Euro", Double.valueOf(0.9d), "2022-01-02"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Yen", "Yen", Double.valueOf(1.0d), "2022-01-02"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Chinese Yuan", "Yen", Double.valueOf(19.25d), "2022-01-02"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Singapore Dollar", "Yen", Double.valueOf(122.46d), "2022-01-02"})));
        ReadWriteTableTestUtil.insertIntoPartition(createTableWithKafkaLog, "PARTITION (dt = '2022-01-03')", "('Chinese Yuan', 'HK Dollar', 1.231)");
        ReadWriteTableTestUtil.validateStreamingReadResult(testStreamingRead, Collections.singletonList(TestValuesTableFactory.changelogRow("+I", new Object[]{"Chinese Yuan", "HK Dollar", Double.valueOf(1.231d), "2022-01-03"})));
        testStreamingRead.close();
        ReadWriteTableTestUtil.testStreamingRead(ReadWriteTableTestUtil.buildQuery(createTableWithKafkaLog, "*", "WHERE dt = '2022-01-02' AND from_currency = 'US Dollar'"), Collections.singletonList(TestValuesTableFactory.changelogRow("+I", new Object[]{"US Dollar", "Euro", Double.valueOf(0.9d), "2022-01-02"}))).close();
        ReadWriteTableTestUtil.testStreamingRead(ReadWriteTableTestUtil.buildQuery(createTableWithKafkaLog, "from_currency, to_currency", "WHERE dt = '2022-01-01' AND rate_by_to_currency IS NULL"), Collections.emptyList()).close();
    }

    @Test
    public void testStreamingReadWriteMultiPartitionedRecordsWithoutPk() throws Exception {
        String createTemporaryTable = ReadWriteTableTestUtil.createTemporaryTable(Arrays.asList("currency STRING", "rate BIGINT", "dt STRING", "hh STRING"), Collections.emptyList(), Arrays.asList("dt", "hh"), Arrays.asList(TestValuesTableFactory.changelogRow("+I", new Object[]{"US Dollar", 102L, "2022-01-01", "10"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Euro", 116L, "2022-01-01", "10"}), TestValuesTableFactory.changelogRow("-D", new Object[]{"Euro", 116L, "2022-01-01", "10"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Yen", 1L, "2022-01-01", "10"}), TestValuesTableFactory.changelogRow("-D", new Object[]{"Yen", 1L, "2022-01-01", "10"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Euro", 114L, "2022-01-02", "10"}), TestValuesTableFactory.changelogRow("-U", new Object[]{"Euro", 114L, "2022-01-02", "10"}), TestValuesTableFactory.changelogRow("+U", new Object[]{"Euro", 119L, "2022-01-02", "10"}), TestValuesTableFactory.changelogRow("-D", new Object[]{"Euro", 119L, "2022-01-02", "10"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Euro", 115L, "2022-01-02", "10"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Yen", 1L, "2022-01-02", "11"})), "dt:2022-01-01,hh:10;dt:2022-01-02,hh:10;dt:2022-01-02,hh:11", false, "I,UA,UB,D");
        String createTableWithKafkaLog = ReadWriteTableTestUtil.createTableWithKafkaLog(Arrays.asList("currency STRING", "rate BIGINT", "dt STRING", "hh STRING"), Collections.emptyList(), Arrays.asList("dt", "hh"), false);
        ReadWriteTableTestUtil.insertIntoFromTable(createTemporaryTable, createTableWithKafkaLog);
        ReadWriteTableTestUtil.checkFileStorePath(createTableWithKafkaLog, Arrays.asList("dt=2022-01-01,hh=10", "dt=2022-01-02,hh=10", "dt=2022-01-02,hh=11"));
        BlockingIterator<Row, Row> testStreamingRead = ReadWriteTableTestUtil.testStreamingRead(ReadWriteTableTestUtil.buildSimpleQuery(createTableWithKafkaLog), Arrays.asList(TestValuesTableFactory.changelogRow("+I", new Object[]{"US Dollar", 102L, "2022-01-01", "10"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Euro", 115L, "2022-01-02", "10"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Yen", 1L, "2022-01-02", "11"})));
        ReadWriteTableTestUtil.insertIntoPartition(createTableWithKafkaLog, "PARTITION (dt = '2022-04-02')", "('Euro', 116, '10')");
        ReadWriteTableTestUtil.validateStreamingReadResult(testStreamingRead, Collections.singletonList(TestValuesTableFactory.changelogRow("+I", new Object[]{"Euro", 116L, "2022-04-02", "10"})));
        testStreamingRead.close();
        ReadWriteTableTestUtil.testStreamingRead(ReadWriteTableTestUtil.buildQuery(createTableWithKafkaLog, "*", "WHERE dt = '2022-01-01' AND currency = 'Yen'"), Collections.emptyList()).close();
        ReadWriteTableTestUtil.testStreamingRead(ReadWriteTableTestUtil.buildQuery(createTableWithKafkaLog, "currency", "WHERE hh = '10' AND rate = 103"), Collections.emptyList()).close();
    }

    @Test
    public void testReadLatestChangelogOfMultiPartitionedRecordsWithMultiPk() throws Exception {
        List asList = Arrays.asList(TestValuesTableFactory.changelogRow("+I", new Object[]{"US Dollar", "US Dollar", Double.valueOf(1.0d), "2022-01-02", "20"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Euro", "US Dollar", null, "2022-01-02", "20"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"HK Dollar", "US Dollar", Double.valueOf(0.13d), "2022-01-02", "20"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Yen", "US Dollar", Double.valueOf(0.0082d), "2022-01-02", "20"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Singapore Dollar", "US Dollar", Double.valueOf(0.74d), "2022-01-02", "20"}), TestValuesTableFactory.changelogRow("+U", new Object[]{"Yen", "US Dollar", Double.valueOf(0.0081d), "2022-01-02", "20"}), TestValuesTableFactory.changelogRow("-D", new Object[]{"US Dollar", "US Dollar", Double.valueOf(1.0d), "2022-01-02", "20"}), TestValuesTableFactory.changelogRow("-D", new Object[]{"Euro", "US Dollar", null, "2022-01-02", "20"}), TestValuesTableFactory.changelogRow("+U", new Object[]{"Singapore Dollar", "US Dollar", Double.valueOf(0.76d), "2022-01-02", "20"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"US Dollar", "Euro", Double.valueOf(0.9d), "2022-01-02", "20"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Singapore Dollar", "Euro", Double.valueOf(0.67d), "2022-01-02", "20"}), TestValuesTableFactory.changelogRow("+U", new Object[]{"Singapore Dollar", "Euro", null, "2022-01-02", "20"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Yen", "Yen", Double.valueOf(1.0d), "2022-01-02", "20"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Chinese Yuan", "Yen", Double.valueOf(19.25d), "2022-01-02", "20"}), TestValuesTableFactory.changelogRow("+U", new Object[]{"Chinese Yuan", "Yen", Double.valueOf(25.6d), "2022-01-02", "20"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Singapore Dollar", "Yen", Double.valueOf(90.32d), "2022-01-02", "20"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"US Dollar", "Yen", Double.valueOf(122.46d), "2022-01-02", "21"}), TestValuesTableFactory.changelogRow("+U", new Object[]{"Singapore Dollar", "Yen", Double.valueOf(90.1d), "2022-01-02", "20"}));
        String createTemporaryTable = ReadWriteTableTestUtil.createTemporaryTable(Arrays.asList("from_currency STRING", "to_currency STRING", "rate_by_to_currency DOUBLE", "dt STRING", "hh STRING"), Arrays.asList("from_currency", "to_currency", "dt", "hh"), Arrays.asList("dt", "hh"), asList, "dt:2022-01-02,hh:20;dt:2022-01-02,hh:21", false, "I,UA,D");
        String createTableWithKafkaLog = ReadWriteTableTestUtil.createTableWithKafkaLog(Arrays.asList("from_currency STRING", "to_currency STRING", "rate_by_to_currency DOUBLE", "dt STRING", "hh STRING"), Arrays.asList("from_currency", "to_currency", "dt", "hh"), Arrays.asList("dt", "hh"), true);
        ArrayList arrayList = new ArrayList(asList);
        arrayList.add(TestValuesTableFactory.changelogRow("-U", new Object[]{"Yen", "US Dollar", Double.valueOf(0.0082d), "2022-01-02", "20"}));
        arrayList.add(TestValuesTableFactory.changelogRow("-U", new Object[]{"Singapore Dollar", "US Dollar", Double.valueOf(0.74d), "2022-01-02", "20"}));
        arrayList.add(TestValuesTableFactory.changelogRow("-U", new Object[]{"Singapore Dollar", "Euro", Double.valueOf(0.67d), "2022-01-02", "20"}));
        arrayList.add(TestValuesTableFactory.changelogRow("-U", new Object[]{"Chinese Yuan", "Yen", Double.valueOf(19.25d), "2022-01-02", "20"}));
        arrayList.add(TestValuesTableFactory.changelogRow("-U", new Object[]{"Singapore Dollar", "Yen", Double.valueOf(90.32d), "2022-01-02", "20"}));
        ReadWriteTableTestUtil.testStreamingReadWithReadFirst(createTemporaryTable, createTableWithKafkaLog, ReadWriteTableTestUtil.buildQueryWithTableOptions(createTableWithKafkaLog, "*", "", ReadWriteTableTestUtil.SCAN_LATEST), arrayList).close();
        String createTableWithKafkaLog2 = ReadWriteTableTestUtil.createTableWithKafkaLog(Arrays.asList("from_currency STRING", "to_currency STRING", "rate_by_to_currency DOUBLE", "dt STRING", "hh STRING"), Arrays.asList("from_currency", "to_currency", "dt", "hh"), Arrays.asList("dt", "hh"), true);
        ReadWriteTableTestUtil.testStreamingReadWithReadFirst(createTemporaryTable, createTableWithKafkaLog2, ReadWriteTableTestUtil.buildQueryWithTableOptions(createTableWithKafkaLog2, "*", "WHERE dt = '2022-01-02' AND hh = '21'", ReadWriteTableTestUtil.SCAN_LATEST), Collections.singletonList(TestValuesTableFactory.changelogRow("+I", new Object[]{"US Dollar", "Yen", Double.valueOf(122.46d), "2022-01-02", "21"}))).close();
        String createTableWithKafkaLog3 = ReadWriteTableTestUtil.createTableWithKafkaLog(Arrays.asList("from_currency STRING", "to_currency STRING", "rate_by_to_currency DOUBLE", "dt STRING", "hh STRING"), Arrays.asList("from_currency", "to_currency", "dt", "hh"), Arrays.asList("dt", "hh"), true);
        ReadWriteTableTestUtil.testStreamingReadWithReadFirst(createTemporaryTable, createTableWithKafkaLog3, ReadWriteTableTestUtil.buildQueryWithTableOptions(createTableWithKafkaLog3, "*", "WHERE rate_by_to_currency IS NOT NULL AND from_currency = 'US Dollar'", ReadWriteTableTestUtil.SCAN_LATEST), Arrays.asList(TestValuesTableFactory.changelogRow("+I", new Object[]{"US Dollar", "US Dollar", Double.valueOf(1.0d), "2022-01-02", "20"}), TestValuesTableFactory.changelogRow("-D", new Object[]{"US Dollar", "US Dollar", Double.valueOf(1.0d), "2022-01-02", "20"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"US Dollar", "Euro", Double.valueOf(0.9d), "2022-01-02", "20"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"US Dollar", "Yen", Double.valueOf(122.46d), "2022-01-02", "21"}))).close();
        String createTableWithKafkaLog4 = ReadWriteTableTestUtil.createTableWithKafkaLog(Arrays.asList("from_currency STRING", "to_currency STRING", "rate_by_to_currency DOUBLE", "dt STRING", "hh STRING"), Arrays.asList("from_currency", "to_currency", "dt", "hh"), Arrays.asList("dt", "hh"), true);
        ReadWriteTableTestUtil.testStreamingReadWithReadFirst(createTemporaryTable, createTableWithKafkaLog4, ReadWriteTableTestUtil.buildQueryWithTableOptions(createTableWithKafkaLog4, "*", "WHERE hh = '21' AND from_currency = 'US Dollar'", ReadWriteTableTestUtil.SCAN_LATEST), Collections.singletonList(TestValuesTableFactory.changelogRow("+I", new Object[]{"US Dollar", "Yen", Double.valueOf(122.46d), "2022-01-02", "21"}))).close();
        String createTableWithKafkaLog5 = ReadWriteTableTestUtil.createTableWithKafkaLog(Arrays.asList("from_currency STRING", "to_currency STRING", "rate_by_to_currency DOUBLE", "dt STRING", "hh STRING"), Arrays.asList("from_currency", "to_currency", "dt", "hh"), Arrays.asList("dt", "hh"), true);
        ReadWriteTableTestUtil.testStreamingReadWithReadFirst(createTemporaryTable, createTableWithKafkaLog5, ReadWriteTableTestUtil.buildQueryWithTableOptions(createTableWithKafkaLog5, "from_currency, to_currency", "", ReadWriteTableTestUtil.SCAN_LATEST), Arrays.asList(TestValuesTableFactory.changelogRow("+I", new Object[]{"US Dollar", "US Dollar"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Euro", "US Dollar"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"HK Dollar", "US Dollar"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Yen", "US Dollar"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Singapore Dollar", "US Dollar"}), TestValuesTableFactory.changelogRow("-D", new Object[]{"US Dollar", "US Dollar"}), TestValuesTableFactory.changelogRow("-D", new Object[]{"Euro", "US Dollar"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"US Dollar", "Euro"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Singapore Dollar", "Euro"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Yen", "Yen"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Chinese Yuan", "Yen"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Singapore Dollar", "Yen"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"US Dollar", "Yen"}))).close();
        String createTableWithKafkaLog6 = ReadWriteTableTestUtil.createTableWithKafkaLog(Arrays.asList("from_currency STRING", "to_currency STRING", "rate_by_to_currency DOUBLE", "dt STRING", "hh STRING"), Arrays.asList("from_currency", "to_currency", "dt", "hh"), Arrays.asList("dt", "hh"), true);
        ReadWriteTableTestUtil.testStreamingReadWithReadFirst(createTemporaryTable, createTableWithKafkaLog6, ReadWriteTableTestUtil.buildQueryWithTableOptions(createTableWithKafkaLog6, "from_currency, to_currency", "WHERE rate_by_to_currency > 100", ReadWriteTableTestUtil.SCAN_LATEST), Collections.singletonList(TestValuesTableFactory.changelogRow("+I", new Object[]{"US Dollar", "Yen"}))).close();
    }

    @Test
    public void testReadLatestChangelogOfSinglePartitionedRecordsWithMultiPk() throws Exception {
        List asList = Arrays.asList(TestValuesTableFactory.changelogRow("+I", new Object[]{"US Dollar", "US Dollar", null, "2022-01-01"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Euro", "US Dollar", null, "2022-01-01"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"HK Dollar", "US Dollar", Double.valueOf(0.13d), "2022-01-01"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Yen", "US Dollar", Double.valueOf(0.0082d), "2022-01-01"}), TestValuesTableFactory.changelogRow("-D", new Object[]{"Yen", "US Dollar", Double.valueOf(0.0082d), "2022-01-01"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Singapore Dollar", "US Dollar", Double.valueOf(0.74d), "2022-01-01"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Yen", "US Dollar", Double.valueOf(0.0081d), "2022-01-01"}), TestValuesTableFactory.changelogRow("+U", new Object[]{"Euro", "US Dollar", Double.valueOf(1.11d), "2022-01-01"}), TestValuesTableFactory.changelogRow("+U", new Object[]{"US Dollar", "US Dollar", Double.valueOf(1.0d), "2022-01-01"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"US Dollar", "Euro", Double.valueOf(0.9d), "2022-01-02"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Singapore Dollar", "Euro", Double.valueOf(0.67d), "2022-01-02"}), TestValuesTableFactory.changelogRow("-D", new Object[]{"Singapore Dollar", "Euro", Double.valueOf(0.67d), "2022-01-02"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Yen", "Yen", Double.valueOf(1.0d), "2022-01-02"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Chinese Yuan", "Yen", Double.valueOf(19.25d), "2022-01-02"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Singapore Dollar", "Yen", Double.valueOf(90.32d), "2022-01-02"}), TestValuesTableFactory.changelogRow("+U", new Object[]{"Singapore Dollar", "Yen", Double.valueOf(122.46d), "2022-01-02"}));
        String createTemporaryTable = ReadWriteTableTestUtil.createTemporaryTable(Arrays.asList("from_currency STRING", "to_currency STRING", "rate_by_to_currency DOUBLE", "dt STRING"), Arrays.asList("from_currency", "to_currency", "dt"), Collections.singletonList("dt"), asList, "dt:2022-01-01;dt:2022-01-02", false, "I,UA,D");
        String createTableWithKafkaLog = ReadWriteTableTestUtil.createTableWithKafkaLog(Arrays.asList("from_currency STRING", "to_currency STRING", "rate_by_to_currency DOUBLE", "dt STRING"), Arrays.asList("from_currency", "to_currency", "dt"), Collections.singletonList("dt"), true);
        ArrayList arrayList = new ArrayList(asList);
        arrayList.add(TestValuesTableFactory.changelogRow("-U", new Object[]{"Euro", "US Dollar", null, "2022-01-01"}));
        arrayList.add(TestValuesTableFactory.changelogRow("-U", new Object[]{"US Dollar", "US Dollar", null, "2022-01-01"}));
        arrayList.add(TestValuesTableFactory.changelogRow("-U", new Object[]{"Singapore Dollar", "Yen", Double.valueOf(90.32d), "2022-01-02"}));
        ReadWriteTableTestUtil.testStreamingReadWithReadFirst(createTemporaryTable, createTableWithKafkaLog, ReadWriteTableTestUtil.buildQueryWithTableOptions(createTableWithKafkaLog, "*", "", ReadWriteTableTestUtil.SCAN_LATEST), arrayList).close();
        String createTableWithKafkaLog2 = ReadWriteTableTestUtil.createTableWithKafkaLog(Arrays.asList("from_currency STRING", "to_currency STRING", "rate_by_to_currency DOUBLE", "dt STRING"), Arrays.asList("from_currency", "to_currency", "dt"), Collections.singletonList("dt"), true);
        ReadWriteTableTestUtil.testStreamingReadWithReadFirst(createTemporaryTable, createTableWithKafkaLog2, ReadWriteTableTestUtil.buildQueryWithTableOptions(createTableWithKafkaLog2, "*", "WHERE dt = '2022-01-02'", ReadWriteTableTestUtil.SCAN_LATEST), Arrays.asList(TestValuesTableFactory.changelogRow("+I", new Object[]{"US Dollar", "Euro", Double.valueOf(0.9d), "2022-01-02"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Singapore Dollar", "Euro", Double.valueOf(0.67d), "2022-01-02"}), TestValuesTableFactory.changelogRow("-D", new Object[]{"Singapore Dollar", "Euro", Double.valueOf(0.67d), "2022-01-02"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Yen", "Yen", Double.valueOf(1.0d), "2022-01-02"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Chinese Yuan", "Yen", Double.valueOf(19.25d), "2022-01-02"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Singapore Dollar", "Yen", Double.valueOf(90.32d), "2022-01-02"}), TestValuesTableFactory.changelogRow("+U", new Object[]{"Singapore Dollar", "Yen", Double.valueOf(122.46d), "2022-01-02"}), TestValuesTableFactory.changelogRow("-U", new Object[]{"Singapore Dollar", "Yen", Double.valueOf(90.32d), "2022-01-02"}))).close();
        String createTableWithKafkaLog3 = ReadWriteTableTestUtil.createTableWithKafkaLog(Arrays.asList("from_currency STRING", "to_currency STRING", "rate_by_to_currency DOUBLE", "dt STRING"), Arrays.asList("from_currency", "to_currency", "dt"), Collections.singletonList("dt"), true);
        ReadWriteTableTestUtil.testStreamingReadWithReadFirst(createTemporaryTable, createTableWithKafkaLog3, ReadWriteTableTestUtil.buildQueryWithTableOptions(createTableWithKafkaLog3, "*", "WHERE rate_by_to_currency IS NULL", ReadWriteTableTestUtil.SCAN_LATEST), Arrays.asList(TestValuesTableFactory.changelogRow("+I", new Object[]{"US Dollar", "US Dollar", null, "2022-01-01"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Euro", "US Dollar", null, "2022-01-01"}), TestValuesTableFactory.changelogRow("-U", new Object[]{"Euro", "US Dollar", null, "2022-01-01"}), TestValuesTableFactory.changelogRow("-U", new Object[]{"US Dollar", "US Dollar", null, "2022-01-01"}))).close();
        String createTableWithKafkaLog4 = ReadWriteTableTestUtil.createTableWithKafkaLog(Arrays.asList("from_currency STRING", "to_currency STRING", "rate_by_to_currency DOUBLE", "dt STRING"), Arrays.asList("from_currency", "to_currency", "dt"), Collections.singletonList("dt"), true);
        ReadWriteTableTestUtil.testStreamingReadWithReadFirst(createTemporaryTable, createTableWithKafkaLog4, ReadWriteTableTestUtil.buildQueryWithTableOptions(createTableWithKafkaLog4, "*", "WHERE dt = '2022-01-02' AND from_currency = 'Yen'", ReadWriteTableTestUtil.SCAN_LATEST), Collections.singletonList(TestValuesTableFactory.changelogRow("+I", new Object[]{"Yen", "Yen", Double.valueOf(1.0d), "2022-01-02"}))).close();
        String createTableWithKafkaLog5 = ReadWriteTableTestUtil.createTableWithKafkaLog(Arrays.asList("from_currency STRING", "to_currency STRING", "rate_by_to_currency DOUBLE", "dt STRING"), Arrays.asList("from_currency", "to_currency", "dt"), Collections.singletonList("dt"), true);
        ReadWriteTableTestUtil.testStreamingReadWithReadFirst(createTemporaryTable, createTableWithKafkaLog5, ReadWriteTableTestUtil.buildQueryWithTableOptions(createTableWithKafkaLog5, "from_currency, to_currency", "WHERE rate_by_to_currency > 100", ReadWriteTableTestUtil.SCAN_LATEST), Collections.singletonList(TestValuesTableFactory.changelogRow("+U", new Object[]{"Singapore Dollar", "Yen"}))).close();
    }

    @Test
    public void testReadLatestChangelogOfNonPartitionedRecordsWithMultiPk() throws Exception {
        List asList = Arrays.asList(TestValuesTableFactory.changelogRow("+I", new Object[]{"US Dollar", "US Dollar", Double.valueOf(1.0d)}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Euro", "US Dollar", Double.valueOf(1.11d)}), TestValuesTableFactory.changelogRow("+I", new Object[]{"HK Dollar", "US Dollar", Double.valueOf(0.13d)}), TestValuesTableFactory.changelogRow("+U", new Object[]{"Euro", "US Dollar", Double.valueOf(1.12d)}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Yen", "US Dollar", Double.valueOf(0.0082d)}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Singapore Dollar", "US Dollar", Double.valueOf(0.74d)}), TestValuesTableFactory.changelogRow("+U", new Object[]{"Yen", "US Dollar", Double.valueOf(0.0081d)}), TestValuesTableFactory.changelogRow("-D", new Object[]{"US Dollar", "US Dollar", Double.valueOf(1.0d)}), TestValuesTableFactory.changelogRow("-D", new Object[]{"Yen", "US Dollar", Double.valueOf(0.0081d)}), TestValuesTableFactory.changelogRow("+I", new Object[]{"US Dollar", "Euro", Double.valueOf(0.9d)}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Singapore Dollar", "Euro", Double.valueOf(0.67d)}), TestValuesTableFactory.changelogRow("+U", new Object[]{"Singapore Dollar", "Euro", Double.valueOf(0.69d)}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Yen", "Yen", Double.valueOf(1.0d)}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Chinese Yuan", "Yen", Double.valueOf(19.25d)}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Singapore Dollar", "Yen", Double.valueOf(90.32d)}), TestValuesTableFactory.changelogRow("-D", new Object[]{"Yen", "Yen", Double.valueOf(1.0d)}), TestValuesTableFactory.changelogRow("+U", new Object[]{"Singapore Dollar", "Yen", Double.valueOf(122.46d)}), TestValuesTableFactory.changelogRow("+U", new Object[]{"Singapore Dollar", "Yen", Double.valueOf(122.0d)}));
        String createTemporaryTable = ReadWriteTableTestUtil.createTemporaryTable(Arrays.asList("from_currency STRING", "to_currency STRING", "rate_by_to_currency DOUBLE"), Arrays.asList("from_currency", "to_currency"), Collections.emptyList(), asList, null, false, "I,UA,D");
        String createTableWithKafkaLog = ReadWriteTableTestUtil.createTableWithKafkaLog(Arrays.asList("from_currency STRING", "to_currency STRING", "rate_by_to_currency DOUBLE"), Arrays.asList("from_currency", "to_currency"), Collections.emptyList(), true);
        ArrayList arrayList = new ArrayList(asList);
        arrayList.add(TestValuesTableFactory.changelogRow("-U", new Object[]{"Yen", "US Dollar", Double.valueOf(0.0082d)}));
        arrayList.add(TestValuesTableFactory.changelogRow("-U", new Object[]{"Euro", "US Dollar", Double.valueOf(1.11d)}));
        arrayList.add(TestValuesTableFactory.changelogRow("-U", new Object[]{"Singapore Dollar", "Euro", Double.valueOf(0.67d)}));
        arrayList.add(TestValuesTableFactory.changelogRow("-U", new Object[]{"Singapore Dollar", "Yen", Double.valueOf(90.32d)}));
        arrayList.add(TestValuesTableFactory.changelogRow("-U", new Object[]{"Singapore Dollar", "Yen", Double.valueOf(122.46d)}));
        ReadWriteTableTestUtil.testStreamingReadWithReadFirst(createTemporaryTable, createTableWithKafkaLog, ReadWriteTableTestUtil.buildQueryWithTableOptions(createTableWithKafkaLog, "*", "", ReadWriteTableTestUtil.SCAN_LATEST), arrayList).close();
        String createTableWithKafkaLog2 = ReadWriteTableTestUtil.createTableWithKafkaLog(Arrays.asList("from_currency STRING", "to_currency STRING", "rate_by_to_currency DOUBLE"), Arrays.asList("from_currency", "to_currency"), Collections.emptyList(), true);
        ReadWriteTableTestUtil.testStreamingReadWithReadFirst(createTemporaryTable, createTableWithKafkaLog2, ReadWriteTableTestUtil.buildQueryWithTableOptions(createTableWithKafkaLog2, "*", "WHERE rate_by_to_currency < 1 OR rate_by_to_currency > 100", ReadWriteTableTestUtil.SCAN_LATEST), Arrays.asList(TestValuesTableFactory.changelogRow("+I", new Object[]{"HK Dollar", "US Dollar", Double.valueOf(0.13d)}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Yen", "US Dollar", Double.valueOf(0.0082d)}), TestValuesTableFactory.changelogRow("-U", new Object[]{"Yen", "US Dollar", Double.valueOf(0.0082d)}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Singapore Dollar", "US Dollar", Double.valueOf(0.74d)}), TestValuesTableFactory.changelogRow("+U", new Object[]{"Yen", "US Dollar", Double.valueOf(0.0081d)}), TestValuesTableFactory.changelogRow("-D", new Object[]{"Yen", "US Dollar", Double.valueOf(0.0081d)}), TestValuesTableFactory.changelogRow("+I", new Object[]{"US Dollar", "Euro", Double.valueOf(0.9d)}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Singapore Dollar", "Euro", Double.valueOf(0.67d)}), TestValuesTableFactory.changelogRow("-U", new Object[]{"Singapore Dollar", "Euro", Double.valueOf(0.67d)}), TestValuesTableFactory.changelogRow("+U", new Object[]{"Singapore Dollar", "Euro", Double.valueOf(0.69d)}), TestValuesTableFactory.changelogRow("+U", new Object[]{"Singapore Dollar", "Yen", Double.valueOf(122.46d)}), TestValuesTableFactory.changelogRow("-U", new Object[]{"Singapore Dollar", "Yen", Double.valueOf(122.46d)}), TestValuesTableFactory.changelogRow("+U", new Object[]{"Singapore Dollar", "Yen", Double.valueOf(122.0d)}))).close();
        String createTableWithKafkaLog3 = ReadWriteTableTestUtil.createTableWithKafkaLog(Arrays.asList("from_currency STRING", "to_currency STRING", "rate_by_to_currency DOUBLE"), Arrays.asList("from_currency", "to_currency"), Collections.emptyList(), true);
        ReadWriteTableTestUtil.testStreamingReadWithReadFirst(createTemporaryTable, createTableWithKafkaLog3, ReadWriteTableTestUtil.buildQueryWithTableOptions(createTableWithKafkaLog3, "from_currency, to_currency", "WHERE rate_by_to_currency < 1 OR rate_by_to_currency > 100", ReadWriteTableTestUtil.SCAN_LATEST), Arrays.asList(TestValuesTableFactory.changelogRow("+I", new Object[]{"HK Dollar", "US Dollar"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Yen", "US Dollar"}), TestValuesTableFactory.changelogRow("-U", new Object[]{"Yen", "US Dollar"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Singapore Dollar", "US Dollar"}), TestValuesTableFactory.changelogRow("+U", new Object[]{"Yen", "US Dollar"}), TestValuesTableFactory.changelogRow("-D", new Object[]{"Yen", "US Dollar"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"US Dollar", "Euro"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Singapore Dollar", "Euro"}), TestValuesTableFactory.changelogRow("-U", new Object[]{"Singapore Dollar", "Euro"}), TestValuesTableFactory.changelogRow("+U", new Object[]{"Singapore Dollar", "Euro"}), TestValuesTableFactory.changelogRow("+U", new Object[]{"Singapore Dollar", "Yen"}), TestValuesTableFactory.changelogRow("-U", new Object[]{"Singapore Dollar", "Yen"}), TestValuesTableFactory.changelogRow("+U", new Object[]{"Singapore Dollar", "Yen"}))).close();
    }

    @Test
    public void testReadLatestChangelogOfMultiPartitionedRecordsWithOnePk() throws Exception {
        List asList = Arrays.asList(TestValuesTableFactory.changelogRow("+I", new Object[]{"US Dollar", 102L, "2022-01-01", "15"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Euro", 114L, "2022-01-01", "15"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Yen", 1L, "2022-01-01", "15"}), TestValuesTableFactory.changelogRow("+U", new Object[]{"Euro", 116L, "2022-01-01", "15"}), TestValuesTableFactory.changelogRow("-D", new Object[]{"Yen", 1L, "2022-01-01", "15"}), TestValuesTableFactory.changelogRow("-D", new Object[]{"Euro", 116L, "2022-01-01", "15"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Euro", 119L, "2022-01-02", "23"}), TestValuesTableFactory.changelogRow("+U", new Object[]{"Euro", 119L, "2022-01-02", "23"}));
        String createTemporaryTable = ReadWriteTableTestUtil.createTemporaryTable(Arrays.asList("currency STRING", "rate BIGINT", "dt STRING", "hh STRING"), Arrays.asList("currency", "dt", "hh"), Arrays.asList("dt", "hh"), asList, "dt:2022-01-01,hh:15;dt:2022-01-02,hh:23", false, "I,UA,D");
        String createTableWithKafkaLog = ReadWriteTableTestUtil.createTableWithKafkaLog(Arrays.asList("currency STRING", "rate BIGINT", "dt STRING", "hh STRING"), Arrays.asList("currency", "dt", "hh"), Arrays.asList("dt", "hh"), true);
        ArrayList arrayList = new ArrayList(asList);
        arrayList.remove(TestValuesTableFactory.changelogRow("+U", new Object[]{"Euro", 119L, "2022-01-02", "23"}));
        arrayList.add(TestValuesTableFactory.changelogRow("-U", new Object[]{"Euro", 114L, "2022-01-01", "15"}));
        ReadWriteTableTestUtil.testStreamingReadWithReadFirst(createTemporaryTable, createTableWithKafkaLog, ReadWriteTableTestUtil.buildQueryWithTableOptions(createTableWithKafkaLog, "*", "", ReadWriteTableTestUtil.SCAN_LATEST), arrayList).close();
        String createTableWithKafkaLog2 = ReadWriteTableTestUtil.createTableWithKafkaLog(Arrays.asList("currency STRING", "rate BIGINT", "dt STRING", "hh STRING"), Arrays.asList("currency", "dt", "hh"), Arrays.asList("dt", "hh"), true);
        ReadWriteTableTestUtil.testStreamingReadWithReadFirst(createTemporaryTable, createTableWithKafkaLog2, ReadWriteTableTestUtil.buildQueryWithTableOptions(createTableWithKafkaLog2, "*", "WHERE dt >= '2022-01-02'", ReadWriteTableTestUtil.SCAN_LATEST), Collections.singletonList(TestValuesTableFactory.changelogRow("+I", new Object[]{"Euro", 119L, "2022-01-02", "23"}))).close();
        String createTableWithKafkaLog3 = ReadWriteTableTestUtil.createTableWithKafkaLog(Arrays.asList("currency STRING", "rate BIGINT", "dt STRING", "hh STRING"), Arrays.asList("currency", "dt", "hh"), Arrays.asList("dt", "hh"), true);
        ReadWriteTableTestUtil.testStreamingReadWithReadFirst(createTemporaryTable, createTableWithKafkaLog3, ReadWriteTableTestUtil.buildQueryWithTableOptions(createTableWithKafkaLog3, "*", "WHERE rate = 1", ReadWriteTableTestUtil.SCAN_LATEST), Arrays.asList(TestValuesTableFactory.changelogRow("+I", new Object[]{"Yen", 1L, "2022-01-01", "15"}), TestValuesTableFactory.changelogRow("-D", new Object[]{"Yen", 1L, "2022-01-01", "15"}))).close();
        String createTableWithKafkaLog4 = ReadWriteTableTestUtil.createTableWithKafkaLog(Arrays.asList("currency STRING", "rate BIGINT", "dt STRING", "hh STRING"), Arrays.asList("currency", "dt", "hh"), Arrays.asList("dt", "hh"), true);
        ReadWriteTableTestUtil.testStreamingReadWithReadFirst(createTemporaryTable, createTableWithKafkaLog4, ReadWriteTableTestUtil.buildQueryWithTableOptions(createTableWithKafkaLog4, "currency", "WHERE rate = 1", ReadWriteTableTestUtil.SCAN_LATEST), Arrays.asList(TestValuesTableFactory.changelogRow("+I", new Object[]{"Yen"}), TestValuesTableFactory.changelogRow("-D", new Object[]{"Yen"}))).close();
    }

    @Test
    public void testReadLatestChangelogOfMultiPartitionedRecordsWithoutPk() throws Exception {
        String createTemporaryTable = ReadWriteTableTestUtil.createTemporaryTable(Arrays.asList("currency STRING", "rate BIGINT", "dt STRING", "hh STRING"), Collections.emptyList(), Arrays.asList("dt", "hh"), Arrays.asList(TestValuesTableFactory.changelogRow("+I", new Object[]{"US Dollar", 102L, "2022-01-01", "15"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Euro", 116L, "2022-01-01", "15"}), TestValuesTableFactory.changelogRow("-D", new Object[]{"Euro", 116L, "2022-01-01", "15"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Yen", 1L, "2022-01-01", "15"}), TestValuesTableFactory.changelogRow("-D", new Object[]{"Yen", 1L, "2022-01-01", "15"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Euro", 114L, "2022-01-02", "20"}), TestValuesTableFactory.changelogRow("-U", new Object[]{"Euro", 114L, "2022-01-02", "20"}), TestValuesTableFactory.changelogRow("+U", new Object[]{"Euro", 119L, "2022-01-02", "20"}), TestValuesTableFactory.changelogRow("-D", new Object[]{"Euro", 119L, "2022-01-02", "20"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Euro", 115L, "2022-01-02", "20"})), "dt:2022-01-01,hh:15;dt:2022-01-02,hh:20", false, "I,UA,UB,D");
        String createTableWithKafkaLog = ReadWriteTableTestUtil.createTableWithKafkaLog(Arrays.asList("currency STRING", "rate BIGINT", "dt STRING", "hh STRING"), Collections.emptyList(), Arrays.asList("dt", "hh"), true);
        ReadWriteTableTestUtil.testStreamingReadWithReadFirst(createTemporaryTable, createTableWithKafkaLog, ReadWriteTableTestUtil.buildQueryWithTableOptions(createTableWithKafkaLog, "*", "", ReadWriteTableTestUtil.SCAN_LATEST), Arrays.asList(TestValuesTableFactory.changelogRow("+I", new Object[]{"US Dollar", 102L, "2022-01-01", "15"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Euro", 116L, "2022-01-01", "15"}), TestValuesTableFactory.changelogRow("-D", new Object[]{"Euro", 116L, "2022-01-01", "15"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Yen", 1L, "2022-01-01", "15"}), TestValuesTableFactory.changelogRow("-D", new Object[]{"Yen", 1L, "2022-01-01", "15"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Euro", 114L, "2022-01-02", "20"}), TestValuesTableFactory.changelogRow("-D", new Object[]{"Euro", 114L, "2022-01-02", "20"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Euro", 119L, "2022-01-02", "20"}), TestValuesTableFactory.changelogRow("-D", new Object[]{"Euro", 119L, "2022-01-02", "20"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Euro", 115L, "2022-01-02", "20"}))).close();
        String createTableWithKafkaLog2 = ReadWriteTableTestUtil.createTableWithKafkaLog(Arrays.asList("currency STRING", "rate BIGINT", "dt STRING", "hh STRING"), Collections.emptyList(), Arrays.asList("dt", "hh"), true);
        ReadWriteTableTestUtil.testStreamingReadWithReadFirst(createTemporaryTable, createTableWithKafkaLog2, ReadWriteTableTestUtil.buildQueryWithTableOptions(createTableWithKafkaLog2, "*", "WHERE hh <> '20'", ReadWriteTableTestUtil.SCAN_LATEST), Arrays.asList(TestValuesTableFactory.changelogRow("+I", new Object[]{"US Dollar", 102L, "2022-01-01", "15"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Euro", 116L, "2022-01-01", "15"}), TestValuesTableFactory.changelogRow("-D", new Object[]{"Euro", 116L, "2022-01-01", "15"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Yen", 1L, "2022-01-01", "15"}), TestValuesTableFactory.changelogRow("-D", new Object[]{"Yen", 1L, "2022-01-01", "15"}))).close();
        String createTableWithKafkaLog3 = ReadWriteTableTestUtil.createTableWithKafkaLog(Arrays.asList("currency STRING", "rate BIGINT", "dt STRING", "hh STRING"), Collections.emptyList(), Arrays.asList("dt", "hh"), true);
        ReadWriteTableTestUtil.testStreamingReadWithReadFirst(createTemporaryTable, createTableWithKafkaLog3, ReadWriteTableTestUtil.buildQueryWithTableOptions(createTableWithKafkaLog3, "*", "WHERE hh <> '20'", ReadWriteTableTestUtil.SCAN_LATEST), Arrays.asList(TestValuesTableFactory.changelogRow("+I", new Object[]{"US Dollar", 102L, "2022-01-01", "15"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Euro", 116L, "2022-01-01", "15"}), TestValuesTableFactory.changelogRow("-D", new Object[]{"Euro", 116L, "2022-01-01", "15"}), TestValuesTableFactory.changelogRow("+I", new Object[]{"Yen", 1L, "2022-01-01", "15"}), TestValuesTableFactory.changelogRow("-D", new Object[]{"Yen", 1L, "2022-01-01", "15"}))).close();
        String createTableWithKafkaLog4 = ReadWriteTableTestUtil.createTableWithKafkaLog(Arrays.asList("currency STRING", "rate BIGINT", "dt STRING", "hh STRING"), Collections.emptyList(), Arrays.asList("dt", "hh"), true);
        ReadWriteTableTestUtil.testStreamingReadWithReadFirst(createTemporaryTable, createTableWithKafkaLog4, ReadWriteTableTestUtil.buildQueryWithTableOptions(createTableWithKafkaLog4, "*", "WHERE rate = 1", ReadWriteTableTestUtil.SCAN_LATEST), Arrays.asList(TestValuesTableFactory.changelogRow("+I", new Object[]{"Yen", 1L, "2022-01-01", "15"}), TestValuesTableFactory.changelogRow("-D", new Object[]{"Yen", 1L, "2022-01-01", "15"}))).close();
        String createTableWithKafkaLog5 = ReadWriteTableTestUtil.createTableWithKafkaLog(Arrays.asList("currency STRING", "rate BIGINT", "dt STRING", "hh STRING"), Collections.emptyList(), Arrays.asList("dt", "hh"), true);
        ReadWriteTableTestUtil.testStreamingReadWithReadFirst(createTemporaryTable, createTableWithKafkaLog5, ReadWriteTableTestUtil.buildQueryWithTableOptions(createTableWithKafkaLog5, "currency", "WHERE rate = 1", ReadWriteTableTestUtil.SCAN_LATEST), Arrays.asList(TestValuesTableFactory.changelogRow("+I", new Object[]{"Yen"}), TestValuesTableFactory.changelogRow("-D", new Object[]{"Yen"}))).close();
    }
}
