package org.apache.iceberg.spark.source;

import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.OptionalLong;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.iceberg.CombinedScanTask;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.Schema;
import org.apache.iceberg.SchemaParser;
import org.apache.iceberg.SnapshotSummary;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.TableScan;
import org.apache.iceberg.encryption.EncryptionManager;
import org.apache.iceberg.exceptions.RuntimeIOException;
import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.hadoop.HadoopFileIO;
import org.apache.iceberg.hadoop.Util;
import org.apache.iceberg.hivelink.core.LegacyHiveTable;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.orc.OrcRowFilterUtils;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.spark.SparkFilters;
import org.apache.iceberg.spark.SparkReadOptions;
import org.apache.iceberg.spark.SparkSchemaUtil;
import org.apache.iceberg.spark.SparkUtil;
import org.apache.iceberg.util.PropertyUtil;
import org.apache.iceberg.util.TableScanUtil;
import org.apache.spark.broadcast.Broadcast;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.sources.Filter;
import org.apache.spark.sql.sources.v2.DataSourceOptions;
import org.apache.spark.sql.sources.v2.reader.DataSourceReader;
import org.apache.spark.sql.sources.v2.reader.InputPartition;
import org.apache.spark.sql.sources.v2.reader.InputPartitionReader;
import org.apache.spark.sql.sources.v2.reader.Statistics;
import org.apache.spark.sql.sources.v2.reader.SupportsPushDownFilters;
import org.apache.spark.sql.sources.v2.reader.SupportsPushDownRequiredColumns;
import org.apache.spark.sql.sources.v2.reader.SupportsReportStatistics;
import org.apache.spark.sql.sources.v2.reader.SupportsScanColumnarBatch;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.sql.vectorized.ColumnarBatch;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/iceberg/spark/source/Reader.class */
class Reader implements DataSourceReader, SupportsScanColumnarBatch, SupportsPushDownFilters, SupportsPushDownRequiredColumns, SupportsReportStatistics {
    private final Table table;
    private final Long snapshotId;
    private final Long startSnapshotId;
    private final Long endSnapshotId;
    private final Long asOfTimestamp;
    private final Long splitSize;
    private final Integer splitLookback;
    private final Long splitOpenFileCost;
    private final Broadcast<FileIO> io;
    private final Broadcast<EncryptionManager> encryptionManager;
    private final boolean caseSensitive;
    private final boolean localityPreferred;
    private final boolean batchReadsEnabled;
    private final int batchSize;
    private final boolean readTimestampWithoutZone;
    private Schema schema;
    private static final Logger LOG = LoggerFactory.getLogger(Reader.class);
    private static final Filter[] NO_FILTERS = new Filter[0];
    private static final ImmutableSet<String> LOCALITY_WHITELIST_FS = ImmutableSet.of("hdfs");
    private static final Statistics EMPTY_STATS = new Statistics() { // from class: org.apache.iceberg.spark.source.Reader.1
        public OptionalLong sizeInBytes() {
            return OptionalLong.empty();
        }

        public OptionalLong numRows() {
            return OptionalLong.empty();
        }
    };
    private StructType requestedSchema = null;
    private List<Expression> filterExpressions = null;
    private Filter[] pushedFilters = NO_FILTERS;
    private StructType type = null;
    private List<CombinedScanTask> tasks = null;
    private Boolean readUsingBatch = null;

    /* loaded from: input_file:org/apache/iceberg/spark/source/Reader$BatchReader.class */
    private static class BatchReader extends BatchDataReader implements InputPartitionReader<ColumnarBatch> {
        BatchReader(CombinedScanTask combinedScanTask, Schema schema, String str, FileIO fileIO, EncryptionManager encryptionManager, boolean z, int i) {
            super(combinedScanTask, schema, str, fileIO, encryptionManager, z, i);
        }
    }

    /* loaded from: input_file:org/apache/iceberg/spark/source/Reader$BatchReaderFactory.class */
    private static class BatchReaderFactory implements ReaderFactory<ColumnarBatch> {
        private final int batchSize;

        BatchReaderFactory(int i) {
            this.batchSize = i;
        }

        @Override // org.apache.iceberg.spark.source.Reader.ReaderFactory
        public InputPartitionReader<ColumnarBatch> create(CombinedScanTask combinedScanTask, Schema schema, Schema schema2, String str, FileIO fileIO, EncryptionManager encryptionManager, boolean z) {
            return new BatchReader(combinedScanTask, schema2, str, fileIO, encryptionManager, z, this.batchSize);
        }
    }

    /* loaded from: input_file:org/apache/iceberg/spark/source/Reader$InternalRowReaderFactory.class */
    private static class InternalRowReaderFactory implements ReaderFactory<InternalRow> {
        private static final InternalRowReaderFactory INSTANCE = new InternalRowReaderFactory();

        private InternalRowReaderFactory() {
        }

        @Override // org.apache.iceberg.spark.source.Reader.ReaderFactory
        public InputPartitionReader<InternalRow> create(CombinedScanTask combinedScanTask, Schema schema, Schema schema2, String str, FileIO fileIO, EncryptionManager encryptionManager, boolean z) {
            return new RowReader(combinedScanTask, schema, schema2, str, fileIO, encryptionManager, z);
        }
    }

    /* loaded from: input_file:org/apache/iceberg/spark/source/Reader$ReadTask.class */
    private static class ReadTask<T> implements Serializable, InputPartition<T> {
        private final CombinedScanTask task;
        private final String tableSchemaString;
        private final String expectedSchemaString;
        private final String nameMappingString;
        private final Broadcast<FileIO> io;
        private final Broadcast<EncryptionManager> encryptionManager;
        private final boolean caseSensitive;
        private final boolean localityPreferred;
        private final ReaderFactory<T> readerFactory;
        private transient Schema tableSchema;
        private transient Schema expectedSchema;
        private transient String[] preferredLocations;

        private ReadTask(CombinedScanTask combinedScanTask, String str, String str2, String str3, Broadcast<FileIO> broadcast, Broadcast<EncryptionManager> broadcast2, boolean z, boolean z2, ReaderFactory<T> readerFactory) {
            this.tableSchema = null;
            this.expectedSchema = null;
            this.preferredLocations = null;
            this.task = combinedScanTask;
            this.tableSchemaString = str;
            this.expectedSchemaString = str2;
            this.io = broadcast;
            this.encryptionManager = broadcast2;
            this.caseSensitive = z;
            this.localityPreferred = z2;
            this.preferredLocations = getPreferredLocations();
            this.readerFactory = readerFactory;
            this.nameMappingString = str3;
        }

        public InputPartitionReader<T> createPartitionReader() {
            return this.readerFactory.create(this.task, lazyTableSchema(), lazyExpectedSchema(), this.nameMappingString, (FileIO) this.io.value(), (EncryptionManager) this.encryptionManager.value(), this.caseSensitive);
        }

        public String[] preferredLocations() {
            return this.preferredLocations;
        }

        private Schema lazyTableSchema() {
            if (this.tableSchema == null) {
                this.tableSchema = SchemaParser.fromJson(this.tableSchemaString);
            }
            return this.tableSchema;
        }

        private Schema lazyExpectedSchema() {
            if (this.expectedSchema == null) {
                this.expectedSchema = SchemaParser.fromJson(this.expectedSchemaString);
            }
            return this.expectedSchema;
        }

        private String[] getPreferredLocations() {
            if (!this.localityPreferred) {
                return new String[0];
            }
            return Util.blockLocations(this.task, SparkSession.active().sparkContext().hadoopConfiguration());
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/iceberg/spark/source/Reader$ReaderFactory.class */
    public interface ReaderFactory<T> extends Serializable {
        InputPartitionReader<T> create(CombinedScanTask combinedScanTask, Schema schema, Schema schema2, String str, FileIO fileIO, EncryptionManager encryptionManager, boolean z);
    }

    /* loaded from: input_file:org/apache/iceberg/spark/source/Reader$RowReader.class */
    private static class RowReader extends RowDataReader implements InputPartitionReader<InternalRow> {
        RowReader(CombinedScanTask combinedScanTask, Schema schema, Schema schema2, String str, FileIO fileIO, EncryptionManager encryptionManager, boolean z) {
            super(combinedScanTask, schema, schema2, str, fileIO, encryptionManager, z);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Reader(Table table, Broadcast<FileIO> broadcast, Broadcast<EncryptionManager> broadcast2, boolean z, DataSourceOptions dataSourceOptions) {
        this.schema = null;
        this.table = table;
        this.snapshotId = (Long) dataSourceOptions.get(SparkReadOptions.SNAPSHOT_ID).map(Long::parseLong).orElse(null);
        this.asOfTimestamp = (Long) dataSourceOptions.get(SparkReadOptions.AS_OF_TIMESTAMP).map(Long::parseLong).orElse(null);
        if (this.snapshotId != null && this.asOfTimestamp != null) {
            throw new IllegalArgumentException("Cannot scan using both snapshot-id and as-of-timestamp to select the table snapshot");
        }
        this.startSnapshotId = (Long) dataSourceOptions.get("start-snapshot-id").map(Long::parseLong).orElse(null);
        this.endSnapshotId = (Long) dataSourceOptions.get("end-snapshot-id").map(Long::parseLong).orElse(null);
        if (this.snapshotId == null && this.asOfTimestamp == null) {
            if (this.startSnapshotId == null && this.endSnapshotId != null) {
                throw new IllegalArgumentException("Cannot only specify option end-snapshot-id to do incremental scan");
            }
        } else if (this.startSnapshotId != null || this.endSnapshotId != null) {
            throw new IllegalArgumentException("Cannot specify start-snapshot-id and end-snapshot-id to do incremental scan when either snapshot-id or as-of-timestamp is specified");
        }
        this.splitSize = (Long) dataSourceOptions.get(SparkReadOptions.SPLIT_SIZE).map(Long::parseLong).orElse(null);
        this.splitLookback = (Integer) dataSourceOptions.get(SparkReadOptions.LOOKBACK).map(Integer::parseInt).orElse(null);
        this.splitOpenFileCost = (Long) dataSourceOptions.get(SparkReadOptions.FILE_OPEN_COST).map(Long::parseLong).orElse(null);
        if (broadcast.getValue() instanceof HadoopFileIO) {
            String str = "no_exist";
            try {
                Configuration configuration = new Configuration(SparkSession.active().sessionState().newHadoopConf());
                mergeIcebergHadoopConfs(configuration, table.properties());
                mergeIcebergHadoopConfs(configuration, dataSourceOptions.asMap());
                str = new Path(table.location()).getFileSystem(configuration).getScheme().toLowerCase(Locale.ENGLISH);
            } catch (IOException e) {
                LOG.warn("Failed to get Hadoop Filesystem", e);
            }
            String str2 = str;
            this.localityPreferred = ((Boolean) dataSourceOptions.get("locality").map(Boolean::parseBoolean).orElseGet(() -> {
                return Boolean.valueOf(LOCALITY_WHITELIST_FS.contains(str2));
            })).booleanValue();
        } else {
            this.localityPreferred = false;
        }
        this.schema = table.schema();
        this.io = broadcast;
        this.encryptionManager = broadcast2;
        this.caseSensitive = z;
        String str3 = SparkSession.active().conf().get("spark.sql.iceberg.vectorization.enabled", (String) null);
        if (str3 != null) {
            this.batchReadsEnabled = Boolean.valueOf(str3).booleanValue();
        } else {
            this.batchReadsEnabled = ((Boolean) dataSourceOptions.get(SparkReadOptions.VECTORIZATION_ENABLED).map(Boolean::parseBoolean).orElseGet(() -> {
                return Boolean.valueOf(PropertyUtil.propertyAsBoolean(table.properties(), TableProperties.PARQUET_VECTORIZATION_ENABLED, false));
            })).booleanValue();
        }
        this.batchSize = ((Integer) dataSourceOptions.get(SparkReadOptions.VECTORIZATION_BATCH_SIZE).map(Integer::parseInt).orElseGet(() -> {
            return Integer.valueOf(PropertyUtil.propertyAsInt(table.properties(), TableProperties.PARQUET_BATCH_SIZE, 5000));
        })).intValue();
        this.readTimestampWithoutZone = SparkUtil.canHandleTimestampWithoutZone(dataSourceOptions.asMap(), SparkSession.active().conf());
    }

    private Schema lazySchema() {
        if (this.schema == null) {
            if (this.requestedSchema != null) {
                this.schema = SparkSchemaUtil.prune(this.table.schema(), this.requestedSchema, filterExpression(), this.caseSensitive);
            } else {
                this.schema = this.table.schema();
            }
        }
        return this.schema;
    }

    private Expression filterExpression() {
        return this.filterExpressions != null ? this.filterExpressions.stream().reduce(Expressions.alwaysTrue(), Expressions::and) : Expressions.alwaysTrue();
    }

    private StructType lazyType() {
        if (this.type == null) {
            Preconditions.checkArgument(this.readTimestampWithoutZone || !SparkUtil.hasTimestampWithoutZone(lazySchema()), SparkUtil.TIMESTAMP_WITHOUT_TIMEZONE_ERROR);
            this.type = SparkSchemaUtil.convert(lazySchema());
        }
        return this.type;
    }

    public StructType readSchema() {
        return lazyType();
    }

    public List<InputPartition<ColumnarBatch>> planBatchInputPartitions() {
        Preconditions.checkState(enableBatchRead(), "Batched reads not enabled");
        Preconditions.checkState(this.batchSize > 0, "Invalid batch size");
        String json = SchemaParser.toJson(this.table.schema());
        String json2 = SchemaParser.toJson(lazySchema());
        String str = this.table.properties().get(TableProperties.DEFAULT_NAME_MAPPING);
        ValidationException.check(tasks().stream().noneMatch(TableScanUtil::hasDeletes), "Cannot scan table %s: cannot apply required delete files", this.table);
        ArrayList newArrayList = Lists.newArrayList();
        Iterator<CombinedScanTask> it = tasks().iterator();
        while (it.hasNext()) {
            newArrayList.add(new ReadTask(it.next(), json, json2, str, this.io, this.encryptionManager, this.caseSensitive, this.localityPreferred, new BatchReaderFactory(this.batchSize)));
        }
        LOG.info("Batching input partitions with {} tasks.", Integer.valueOf(newArrayList.size()));
        return newArrayList;
    }

    public List<InputPartition<InternalRow>> planInputPartitions() {
        String json = SchemaParser.toJson(this.table.schema());
        String json2 = SchemaParser.toJson(lazySchema());
        String str = this.table.properties().get(TableProperties.DEFAULT_NAME_MAPPING);
        ArrayList newArrayList = Lists.newArrayList();
        Iterator<CombinedScanTask> it = tasks().iterator();
        while (it.hasNext()) {
            newArrayList.add(new ReadTask(it.next(), json, json2, str, this.io, this.encryptionManager, this.caseSensitive, this.localityPreferred, InternalRowReaderFactory.INSTANCE));
        }
        return newArrayList;
    }

    public Filter[] pushFilters(Filter[] filterArr) {
        this.tasks = null;
        ArrayList newArrayListWithExpectedSize = Lists.newArrayListWithExpectedSize(filterArr.length);
        ArrayList newArrayListWithExpectedSize2 = Lists.newArrayListWithExpectedSize(filterArr.length);
        for (Filter filter : filterArr) {
            Expression convert = SparkFilters.convert(filter);
            if (convert != null) {
                newArrayListWithExpectedSize.add(convert);
                newArrayListWithExpectedSize2.add(filter);
            }
        }
        this.filterExpressions = newArrayListWithExpectedSize;
        this.pushedFilters = (Filter[]) newArrayListWithExpectedSize2.toArray(new Filter[0]);
        this.schema = null;
        this.type = null;
        return filterArr;
    }

    public Filter[] pushedFilters() {
        return this.pushedFilters;
    }

    public void pruneColumns(StructType structType) {
        this.requestedSchema = structType;
        this.schema = null;
        this.type = null;
    }

    public Statistics estimateStatistics() {
        if (this.table instanceof LegacyHiveTable) {
            return EMPTY_STATS;
        }
        if (this.table.currentSnapshot() == null) {
            return new Stats(0L, 0L);
        }
        if (!this.table.spec().isUnpartitioned() && filterExpression() == Expressions.alwaysTrue()) {
            long propertyAsLong = PropertyUtil.propertyAsLong(this.table.currentSnapshot().summary(), SnapshotSummary.TOTAL_RECORDS_PROP, TableProperties.WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT);
            return new Stats(SparkSchemaUtil.estimateSize(lazyType(), propertyAsLong), propertyAsLong);
        }
        long j = 0;
        long j2 = 0;
        Iterator<CombinedScanTask> it = tasks().iterator();
        while (it.hasNext()) {
            for (FileScanTask fileScanTask : it.next().files()) {
                j += fileScanTask.length();
                j2 += fileScanTask.file().recordCount();
            }
        }
        return new Stats(j, j2);
    }

    public boolean enableBatchRead() {
        if (this.readUsingBatch == null) {
            boolean allMatch = tasks().stream().allMatch(combinedScanTask -> {
                return !combinedScanTask.isDataTask() && combinedScanTask.files().stream().allMatch(fileScanTask -> {
                    return fileScanTask.file().format().equals(FileFormat.PARQUET);
                });
            });
            this.readUsingBatch = Boolean.valueOf(this.batchReadsEnabled && tasks().stream().noneMatch(TableScanUtil::hasDeletes) && ((tasks().stream().allMatch(combinedScanTask2 -> {
                return !combinedScanTask2.isDataTask() && combinedScanTask2.files().stream().allMatch(fileScanTask -> {
                    return fileScanTask.file().format().equals(FileFormat.ORC);
                });
            }) && tasks().stream().allMatch(combinedScanTask3 -> {
                return !combinedScanTask3.isDataTask() && combinedScanTask3.files().stream().allMatch(fileScanTask -> {
                    return OrcRowFilterUtils.rowFilterFromTask(fileScanTask) == null;
                });
            })) || (allMatch && (lazySchema().columns().size() > 0) && lazySchema().columns().stream().allMatch(nestedField -> {
                return nestedField.type().isPrimitiveType();
            }) && !SparkUtil.hasTimestampWithoutZone(lazySchema()))));
        }
        return this.readUsingBatch.booleanValue();
    }

    private static void mergeIcebergHadoopConfs(Configuration configuration, Map<String, String> map) {
        map.keySet().stream().filter(str -> {
            return str.startsWith("hadoop.");
        }).forEach(str2 -> {
            configuration.set(str2.replaceFirst("hadoop.", ""), (String) map.get(str2));
        });
    }

    private List<CombinedScanTask> tasks() {
        if (this.tasks == null) {
            TableScan project = this.table.newScan().caseSensitive(this.caseSensitive).project(lazySchema());
            if (this.snapshotId != null) {
                project = project.useSnapshot(this.snapshotId.longValue());
            }
            if (this.asOfTimestamp != null) {
                project = project.asOfTime(this.asOfTimestamp.longValue());
            }
            if (this.startSnapshotId != null) {
                project = this.endSnapshotId != null ? project.appendsBetween(this.startSnapshotId.longValue(), this.endSnapshotId.longValue()) : project.appendsAfter(this.startSnapshotId.longValue());
            }
            if (this.splitSize != null) {
                project = project.option(TableProperties.SPLIT_SIZE, this.splitSize.toString());
            }
            if (this.splitLookback != null) {
                project = project.option(TableProperties.SPLIT_LOOKBACK, this.splitLookback.toString());
            }
            if (this.splitOpenFileCost != null) {
                project = project.option(TableProperties.SPLIT_OPEN_FILE_COST, this.splitOpenFileCost.toString());
            }
            if (this.filterExpressions != null) {
                Iterator<Expression> it = this.filterExpressions.iterator();
                while (it.hasNext()) {
                    project = project.filter(it.next());
                }
            }
            try {
                CloseableIterable<CombinedScanTask> planTasks = project.planTasks();
                Throwable th = null;
                try {
                    try {
                        this.tasks = Lists.newArrayList(planTasks);
                        if (planTasks != null) {
                            if (0 != 0) {
                                try {
                                    planTasks.close();
                                } catch (Throwable th2) {
                                    th.addSuppressed(th2);
                                }
                            } else {
                                planTasks.close();
                            }
                        }
                    } finally {
                    }
                } finally {
                }
            } catch (IOException e) {
                throw new RuntimeIOException(e, "Failed to close table scan: %s", project);
            }
        }
        return this.tasks;
    }

    public String toString() {
        return String.format("IcebergScan(table=%s, type=%s, filters=%s, caseSensitive=%s, batchedReads=%s)", this.table, lazySchema().asStruct(), this.filterExpressions, Boolean.valueOf(this.caseSensitive), Boolean.valueOf(enableBatchRead()));
    }
}
