package com.uber.hoodie.utilities.sources;

import com.uber.hoodie.DataSourceUtils;
import com.uber.hoodie.common.util.FSUtils;
import com.uber.hoodie.exception.HoodieIOException;
import com.uber.hoodie.utilities.schema.SchemaProvider;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Optional;
import java.util.stream.Collectors;
import org.apache.avro.generic.GenericRecord;
import org.apache.commons.configuration.PropertiesConfiguration;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;

/* loaded from: input_file:com/uber/hoodie/utilities/sources/HiveIncrPullSource.class */
public class HiveIncrPullSource extends Source {
    private static volatile Logger log = LogManager.getLogger(HiveIncrPullSource.class);
    private final transient FileSystem fs;
    private final String incrPullRootPath;

    /* loaded from: input_file:com/uber/hoodie/utilities/sources/HiveIncrPullSource$Config.class */
    static class Config {
        private static final String ROOT_INPUT_PATH_PROP = "hoodie.deltastreamer.source.incrpull.root";

        Config() {
        }
    }

    public HiveIncrPullSource(PropertiesConfiguration propertiesConfiguration, JavaSparkContext javaSparkContext, SourceDataFormat sourceDataFormat, SchemaProvider schemaProvider) {
        super(propertiesConfiguration, javaSparkContext, sourceDataFormat, schemaProvider);
        this.fs = FSUtils.getFs();
        DataSourceUtils.checkRequiredProperties(propertiesConfiguration, Arrays.asList("hoodie.deltastreamer.source.incrpull.root"));
        this.incrPullRootPath = propertiesConfiguration.getString("hoodie.deltastreamer.source.incrpull.root");
    }

    private Optional<String> findCommitToPull(Optional<String> optional) throws IOException {
        log.info("Looking for commits ");
        FileStatus[] listStatus = this.fs.listStatus(new Path(this.incrPullRootPath));
        ArrayList<String> arrayList = new ArrayList(listStatus.length);
        for (FileStatus fileStatus : listStatus) {
            String[] split = fileStatus.getPath().toString().split("/");
            arrayList.add(split[split.length - 1]);
        }
        Collections.sort(arrayList);
        log.info("Retrieved commit times " + arrayList);
        if (!optional.isPresent()) {
            return Optional.of(arrayList.get(0));
        }
        for (String str : arrayList) {
            if (str.compareTo(optional.get()) > 0) {
                return Optional.of(str);
            }
        }
        return Optional.empty();
    }

    @Override // com.uber.hoodie.utilities.sources.Source
    public Pair<Optional<JavaRDD<GenericRecord>>, String> fetchNewData(Optional<String> optional, long j) {
        try {
            Optional<String> findCommitToPull = findCommitToPull(optional);
            if (!findCommitToPull.isPresent()) {
                return new ImmutablePair(Optional.empty(), optional.isPresent() ? optional.get() : "");
            }
            return new ImmutablePair(Optional.of(DFSSource.fromFiles(this.dataFormat, new AvroConvertor(this.schemaProvider.getSourceSchema().toString()), (String) Arrays.asList(this.fs.listStatus(new Path(this.incrPullRootPath, findCommitToPull.get()))).stream().map(fileStatus -> {
                return fileStatus.getPath().toString();
            }).collect(Collectors.joining(",")), this.sparkContext)), String.valueOf(findCommitToPull.get()));
        } catch (IOException e) {
            throw new HoodieIOException("Unable to read from source from checkpoint: " + optional, e);
        }
    }
}
