package org.apache.iceberg.flink.source;

import java.io.File;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.ExecutorService;
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.Table;
import org.apache.iceberg.TestHelpers;
import org.apache.iceberg.data.GenericAppenderHelper;
import org.apache.iceberg.data.RandomGenericData;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.flink.MiniClusterResource;
import org.apache.iceberg.flink.TestFixtures;
import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
import org.apache.iceberg.hadoop.HadoopCatalog;
import org.apache.iceberg.util.ThreadPools;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

/* loaded from: input_file:org/apache/iceberg/flink/source/TestFlinkPlanner.class */
public class TestFlinkPlanner {

    @ClassRule
    public static final MiniClusterWithClientResource MINI_CLUSTER_RESOURCE = MiniClusterResource.createWithClassloaderCheckDisabled();

    @ClassRule
    public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder();
    protected HadoopCatalog catalog;
    protected String warehouse;
    protected String location;
    FileFormat fileFormat = FileFormat.valueOf(FileFormat.ORC.toString());

    @Before
    public void before() throws IOException {
        File newFolder = TEMPORARY_FOLDER.newFolder();
        Assert.assertTrue(newFolder.delete());
        this.warehouse = "file:" + newFolder;
        this.catalog = new HadoopCatalog(new Configuration(), this.warehouse);
        this.location = String.format("%s/%s/%s", this.warehouse, TestFixtures.DATABASE, TestFixtures.TABLE);
    }

    @After
    public void after() throws IOException {
    }

    @Test
    public void testFlinkPlannerSingleFilePerTask() throws Exception {
        Table createTable = this.catalog.createTable(TestFixtures.TABLE_IDENTIFIER, TestFixtures.SCHEMA, TestFixtures.SPEC);
        List generate = RandomGenericData.generate(TestFixtures.SCHEMA, 1, 0L);
        ((Record) generate.get(0)).set(2, "2020-03-20");
        new GenericAppenderHelper(createTable, this.fileFormat, TEMPORARY_FOLDER).appendToTable(TestHelpers.Row.of(new Object[]{"2020-03-20", 0}), generate);
        new GenericAppenderHelper(createTable, this.fileFormat, TEMPORARY_FOLDER).appendToTable(TestHelpers.Row.of(new Object[]{"2020-03-20", 0}), generate);
        ScanContext build = ScanContext.builder().planParallelism(2).build();
        ExecutorService newWorkerPool = ThreadPools.newWorkerPool("test", build.planParallelism().intValue());
        List planIcebergSourceSplits = FlinkSplitPlanner.planIcebergSourceSplits(createTable, build, newWorkerPool);
        Assert.assertEquals("Expected 1 split but scan returned %d splits", 1L, planIcebergSourceSplits.size());
        Assert.assertEquals("Expected 2 FileScanTask in the split but has %d instead", 2L, ((IcebergSourceSplit) planIcebergSourceSplits.get(0)).task().tasks().size());
        List planIcebergSourceSplits2 = FlinkSplitPlanner.planIcebergSourceSplits(createTable, ScanContext.builder().planParallelism(2).planSingleWholeFilePerTask(true).build(), newWorkerPool);
        Assert.assertEquals("Expected 2 splits but scan returned %d splits", 2L, planIcebergSourceSplits2.size());
        planIcebergSourceSplits2.forEach(icebergSourceSplit -> {
            Assert.assertEquals("Expected a single file task in the split but task has %d tasks", 1L, icebergSourceSplit.task().tasks().size());
        });
    }
}
