package org.apache.iceberg.flink;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Expressions;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.expressions.Expression;
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.flink.types.Row;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DistributionMode;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.Table;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.flink.source.BoundedTableFactory;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.junit.After;
import org.junit.Assert;
import org.junit.Assume;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(Parameterized.class)
/* loaded from: input_file:org/apache/iceberg/flink/TestFlinkTableSink.class */
public class TestFlinkTableSink extends FlinkCatalogTestBase {

    @ClassRule
    public static final MiniClusterWithClientResource MINI_CLUSTER_RESOURCE = MiniClusterResource.createWithClassloaderCheckDisabled();

    @ClassRule
    public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder();
    private static final String SOURCE_TABLE = "default_catalog.default_database.bounded_source";
    private static final String TABLE_NAME = "test_table";
    private TableEnvironment tEnv;
    private Table icebergTable;
    private final FileFormat format;
    private final boolean isStreamingJob;

    @Parameterized.Parameters(name = "catalogName={0}, baseNamespace={1}, format={2}, isStreaming={3}")
    public static Iterable<Object[]> parameters() {
        ArrayList newArrayList = Lists.newArrayList();
        for (FileFormat fileFormat : new FileFormat[]{FileFormat.ORC, FileFormat.AVRO, FileFormat.PARQUET}) {
            for (Boolean bool : new Boolean[]{true, false}) {
                for (Object[] objArr : FlinkCatalogTestBase.parameters()) {
                    newArrayList.add(new Object[]{(String) objArr[0], (Namespace) objArr[1], fileFormat, bool});
                }
            }
        }
        return newArrayList;
    }

    public TestFlinkTableSink(String str, Namespace namespace, FileFormat fileFormat, Boolean bool) {
        super(str, namespace);
        this.format = fileFormat;
        this.isStreamingJob = bool.booleanValue();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.iceberg.flink.FlinkTestBase
    public TableEnvironment getTableEnv() {
        if (this.tEnv == null) {
            synchronized (this) {
                EnvironmentSettings.Builder newInstance = EnvironmentSettings.newInstance();
                if (this.isStreamingJob) {
                    newInstance.inStreamingMode();
                    StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment(MiniClusterResource.DISABLE_CLASSLOADER_CHECK_CONFIG);
                    executionEnvironment.enableCheckpointing(400L);
                    executionEnvironment.setMaxParallelism(2);
                    executionEnvironment.setParallelism(2);
                    this.tEnv = StreamTableEnvironment.create(executionEnvironment, newInstance.build());
                } else {
                    newInstance.inBatchMode();
                    this.tEnv = TableEnvironment.create(newInstance.build());
                }
            }
        }
        return this.tEnv;
    }

    @Override // org.apache.iceberg.flink.FlinkCatalogTestBase
    @Before
    public void before() {
        super.before();
        sql("CREATE DATABASE %s", this.flinkDatabase);
        sql("USE CATALOG %s", this.catalogName);
        sql("USE %s", "db");
        sql("CREATE TABLE %s (id int, data varchar) with ('write.format.default'='%s')", TABLE_NAME, this.format.name());
        this.icebergTable = this.validationCatalog.loadTable(TableIdentifier.of(this.icebergNamespace, TABLE_NAME));
    }

    @Override // org.apache.iceberg.flink.FlinkCatalogTestBase
    @After
    public void clean() {
        sql("DROP TABLE IF EXISTS %s.%s", this.flinkDatabase, TABLE_NAME);
        sql("DROP DATABASE IF EXISTS %s", this.flinkDatabase);
        BoundedTableFactory.clearDataSets();
        super.clean();
    }

    @Test
    public void testInsertFromSourceTable() throws Exception {
        getTableEnv().createTemporaryView("sourceTable", getTableEnv().fromValues(SimpleDataUtil.FLINK_SCHEMA.toRowDataType(), new Expression[]{Expressions.row(1, new Object[]{"hello"}), Expressions.row(2, new Object[]{"world"}), Expressions.row(3, new Object[]{(String) null}), Expressions.row((Object) null, new Object[]{"bar"})}));
        sql("INSERT INTO %s SELECT id,data from sourceTable", TABLE_NAME);
        SimpleDataUtil.assertTableRecords(this.icebergTable, Lists.newArrayList(new Record[]{SimpleDataUtil.createRecord(1, "hello"), SimpleDataUtil.createRecord(2, "world"), SimpleDataUtil.createRecord(3, null), SimpleDataUtil.createRecord(null, "bar")}));
    }

    @Test
    public void testOverwriteTable() throws Exception {
        Assume.assumeFalse("Flink unbounded streaming does not support overwrite operation", this.isStreamingJob);
        sql("INSERT INTO %s SELECT 1, 'a'", TABLE_NAME);
        SimpleDataUtil.assertTableRecords(this.icebergTable, Lists.newArrayList(new Record[]{SimpleDataUtil.createRecord(1, "a")}));
        sql("INSERT OVERWRITE %s SELECT 2, 'b'", TABLE_NAME);
        SimpleDataUtil.assertTableRecords(this.icebergTable, Lists.newArrayList(new Record[]{SimpleDataUtil.createRecord(2, "b")}));
    }

    @Test
    public void testReplacePartitions() throws Exception {
        Assume.assumeFalse("Flink unbounded streaming does not support overwrite operation", this.isStreamingJob);
        sql("CREATE TABLE %s(id INT, data VARCHAR) PARTITIONED BY (data) WITH ('write.format.default'='%s')", "test_partition", this.format.name());
        try {
            Table loadTable = this.validationCatalog.loadTable(TableIdentifier.of(this.icebergNamespace, "test_partition"));
            sql("INSERT INTO %s SELECT 1, 'a'", "test_partition");
            sql("INSERT INTO %s SELECT 2, 'b'", "test_partition");
            sql("INSERT INTO %s SELECT 3, 'c'", "test_partition");
            SimpleDataUtil.assertTableRecords(loadTable, Lists.newArrayList(new Record[]{SimpleDataUtil.createRecord(1, "a"), SimpleDataUtil.createRecord(2, "b"), SimpleDataUtil.createRecord(3, "c")}));
            sql("INSERT OVERWRITE %s SELECT 4, 'b'", "test_partition");
            sql("INSERT OVERWRITE %s SELECT 5, 'a'", "test_partition");
            SimpleDataUtil.assertTableRecords(loadTable, Lists.newArrayList(new Record[]{SimpleDataUtil.createRecord(5, "a"), SimpleDataUtil.createRecord(4, "b"), SimpleDataUtil.createRecord(3, "c")}));
            sql("INSERT OVERWRITE %s PARTITION (data='a') SELECT 6", "test_partition");
            SimpleDataUtil.assertTableRecords(loadTable, Lists.newArrayList(new Record[]{SimpleDataUtil.createRecord(6, "a"), SimpleDataUtil.createRecord(4, "b"), SimpleDataUtil.createRecord(3, "c")}));
            sql("DROP TABLE IF EXISTS %s.%s", this.flinkDatabase, "test_partition");
        } catch (Throwable th) {
            sql("DROP TABLE IF EXISTS %s.%s", this.flinkDatabase, "test_partition");
            throw th;
        }
    }

    @Test
    public void testInsertIntoPartition() throws Exception {
        sql("CREATE TABLE %s(id INT, data VARCHAR) PARTITIONED BY (data) WITH ('write.format.default'='%s')", "test_insert_into_partition", this.format.name());
        try {
            Table loadTable = this.validationCatalog.loadTable(TableIdentifier.of(this.icebergNamespace, "test_insert_into_partition"));
            sql("INSERT INTO %s PARTITION (data='a') SELECT 1", "test_insert_into_partition");
            sql("INSERT INTO %s PARTITION (data='a') SELECT 2", "test_insert_into_partition");
            sql("INSERT INTO %s PARTITION (data='b') SELECT 3", "test_insert_into_partition");
            SimpleDataUtil.assertTableRecords(loadTable, Lists.newArrayList(new Record[]{SimpleDataUtil.createRecord(1, "a"), SimpleDataUtil.createRecord(2, "a"), SimpleDataUtil.createRecord(3, "b")}));
            sql("INSERT INTO %s SELECT 4, 'c'", "test_insert_into_partition");
            sql("INSERT INTO %s SELECT 5, 'd'", "test_insert_into_partition");
            SimpleDataUtil.assertTableRecords(loadTable, Lists.newArrayList(new Record[]{SimpleDataUtil.createRecord(1, "a"), SimpleDataUtil.createRecord(2, "a"), SimpleDataUtil.createRecord(3, "b"), SimpleDataUtil.createRecord(4, "c"), SimpleDataUtil.createRecord(5, "d")}));
            sql("DROP TABLE IF EXISTS %s.%s", this.flinkDatabase, "test_insert_into_partition");
        } catch (Throwable th) {
            sql("DROP TABLE IF EXISTS %s.%s", this.flinkDatabase, "test_insert_into_partition");
            throw th;
        }
    }

    @Test
    public void testHashDistributeMode() throws Exception {
        ImmutableMap of = ImmutableMap.of("write.format.default", this.format.name(), "write.distribution-mode", DistributionMode.HASH.modeName());
        List list = (List) IntStream.range(1, 1000).mapToObj(i -> {
            return ImmutableList.of(Row.of(new Object[]{Integer.valueOf(i), "aaa"}), Row.of(new Object[]{Integer.valueOf(i), "bbb"}), Row.of(new Object[]{Integer.valueOf(i), "ccc"}));
        }).flatMap((v0) -> {
            return v0.stream();
        }).collect(Collectors.toList());
        sql("CREATE TABLE %s(id INT NOT NULL, data STRING NOT NULL) WITH ('connector'='BoundedSource', 'data-id'='%s')", SOURCE_TABLE, BoundedTableFactory.registerDataSet(ImmutableList.of(list)));
        Assert.assertEquals("Should have the expected rows in source table.", Sets.newHashSet(list), Sets.newHashSet(sql("SELECT * FROM %s", SOURCE_TABLE)));
        sql("CREATE TABLE %s(id INT, data VARCHAR) PARTITIONED BY (data) WITH %s", "test_hash_distribution_mode", toWithClause(of));
        try {
            sql("INSERT INTO %s SELECT * FROM %s", "test_hash_distribution_mode", SOURCE_TABLE);
            Assert.assertEquals("Should have the expected rows in sink table.", Sets.newHashSet(list), Sets.newHashSet(sql("SELECT * FROM %s", "test_hash_distribution_mode")));
            Iterator<List<DataFile>> it = SimpleDataUtil.snapshotToDataFiles(this.validationCatalog.loadTable(TableIdentifier.of(this.icebergNamespace, "test_hash_distribution_mode"))).values().iterator();
            while (it.hasNext()) {
                if (!it.next().isEmpty()) {
                    Assert.assertEquals("There should be 1 data file in partition 'aaa'", 1L, SimpleDataUtil.matchingPartitions(r0, r0.spec(), ImmutableMap.of("data", "aaa")).size());
                    Assert.assertEquals("There should be 1 data file in partition 'bbb'", 1L, SimpleDataUtil.matchingPartitions(r0, r0.spec(), ImmutableMap.of("data", "bbb")).size());
                    Assert.assertEquals("There should be 1 data file in partition 'ccc'", 1L, SimpleDataUtil.matchingPartitions(r0, r0.spec(), ImmutableMap.of("data", "ccc")).size());
                }
            }
            sql("DROP TABLE IF EXISTS %s.%s", this.flinkDatabase, "test_hash_distribution_mode");
        } catch (Throwable th) {
            sql("DROP TABLE IF EXISTS %s.%s", this.flinkDatabase, "test_hash_distribution_mode");
            throw th;
        }
    }
}
