package com.uber.hoodie.utilities;

import com.beust.jcommander.JCommander;
import com.beust.jcommander.Parameter;
import com.google.common.io.Files;
import com.uber.hoodie.HoodieWriteClient;
import com.uber.hoodie.common.HoodieJsonPayload;
import com.uber.hoodie.common.model.HoodieCommits;
import com.uber.hoodie.common.model.HoodieKey;
import com.uber.hoodie.common.model.HoodieRecord;
import com.uber.hoodie.common.model.HoodieTableMetadata;
import com.uber.hoodie.common.util.FSUtils;
import com.uber.hoodie.config.HoodieIndexConfig;
import com.uber.hoodie.config.HoodieWriteConfig;
import com.uber.hoodie.index.HoodieIndex;
import java.io.File;
import java.io.IOException;
import java.io.Serializable;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Collections;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;

/* loaded from: input_file:com/uber/hoodie/utilities/HoodieDeltaStreamer.class */
public class HoodieDeltaStreamer implements Serializable {
    private static volatile Logger log = LogManager.getLogger(HoodieDeltaStreamer.class);
    private final Config cfg;

    /* loaded from: input_file:com/uber/hoodie/utilities/HoodieDeltaStreamer$Config.class */
    public static class Config implements Serializable {

        @Parameter(names = {"--dataPath"})
        public String dataPath;

        @Parameter(names = {"--targetPath"}, required = true)
        public String targetPath;

        @Parameter(names = {"--targetTable"})
        public String targetTableName;

        @Parameter(names = {"--schemaFile"})
        public String schemaFile;

        @Parameter(names = {"--parallelism"})
        public int groupByParallelism = 10000;

        @Parameter(names = {"--upsert"})
        public boolean upsert = false;

        @Parameter(names = {"--master"})
        public String sparkMaster = "yarn-client";

        @Parameter(names = {"--keyColumn"})
        public String keyColumnField = "uuid";

        @Parameter(names = {"--partitionPathField"})
        public String partitionPathField = "request_at";

        @Parameter(names = {"--override"})
        public boolean override = false;

        @Parameter(names = {"--help", "-h"}, help = true)
        public Boolean help = false;
    }

    public HoodieDeltaStreamer(Config config) throws IOException {
        this.cfg = config;
    }

    private void sync() throws Exception {
        JavaSparkContext sparkContext = getSparkContext(this.cfg);
        FileSystem fs = FSUtils.getFs();
        HoodieTableMetadata hoodieTableMetadata = new HoodieTableMetadata(fs, this.cfg.targetPath, this.cfg.targetTableName);
        String findLastCommitPulled = findLastCommitPulled(fs, this.cfg.dataPath);
        log.info("Last commit pulled on the source dataset is " + findLastCommitPulled);
        if (!hoodieTableMetadata.getAllCommits().isEmpty() && HoodieCommits.isCommit1After(hoodieTableMetadata.getAllCommits().lastCommit(), findLastCommitPulled)) {
            throw new IllegalStateException("Last commit pulled from source table " + findLastCommitPulled + " is before the last commit in the target table " + hoodieTableMetadata.getAllCommits().lastCommit());
        }
        if (!this.cfg.override && hoodieTableMetadata.getAllCommits().contains(findLastCommitPulled)) {
            throw new IllegalStateException("Target Table already has the commit " + findLastCommitPulled + ". Not overriding as cfg.override is false");
        }
        syncTill(findLastCommitPulled, hoodieTableMetadata, sparkContext);
    }

    private String findLastCommitPulled(FileSystem fileSystem, String str) throws IOException {
        FileStatus[] listStatus = fileSystem.listStatus(new Path(str));
        ArrayList arrayList = new ArrayList(listStatus.length);
        for (FileStatus fileStatus : listStatus) {
            String[] split = fileStatus.getPath().toString().split("/");
            arrayList.add(split[split.length - 1]);
        }
        Collections.sort(arrayList);
        Collections.reverse(arrayList);
        log.info("Retrieved commit times " + arrayList);
        return (String) arrayList.get(0);
    }

    private void syncTill(String str, HoodieTableMetadata hoodieTableMetadata, JavaSparkContext javaSparkContext) throws Exception {
        String str2 = this.cfg.dataPath + "/" + str;
        log.info("Using data path " + str2);
        JavaRDD map = javaSparkContext.textFile(str2).map(new Function<String, HoodieRecord<HoodieJsonPayload>>() { // from class: com.uber.hoodie.utilities.HoodieDeltaStreamer.1
            public HoodieRecord<HoodieJsonPayload> call(String str3) throws Exception {
                HoodieJsonPayload hoodieJsonPayload = new HoodieJsonPayload(str3);
                return new HoodieRecord<>(new HoodieKey(hoodieJsonPayload.getRowKey(HoodieDeltaStreamer.this.cfg.keyColumnField), hoodieJsonPayload.getPartitionPath(HoodieDeltaStreamer.this.cfg.partitionPathField)), hoodieJsonPayload);
            }
        });
        HoodieWriteClient hoodieWriteClient = new HoodieWriteClient(javaSparkContext, getHoodieClientConfig(hoodieTableMetadata));
        log.info("Rollback started " + str);
        hoodieWriteClient.rollback(str);
        hoodieWriteClient.startCommitWithTime(str);
        log.info("Starting commit " + str);
        if (this.cfg.upsert) {
            log.info("Upserting records");
            hoodieWriteClient.upsert(map, str);
        } else {
            log.info("Inserting records");
            hoodieWriteClient.insert(map, str);
        }
    }

    private HoodieWriteConfig getHoodieClientConfig(HoodieTableMetadata hoodieTableMetadata) throws Exception {
        return HoodieWriteConfig.newBuilder().withPath(hoodieTableMetadata.getBasePath()).withSchema(Files.toString(new File(this.cfg.schemaFile), Charset.forName("UTF-8"))).withParallelism(this.cfg.groupByParallelism, this.cfg.groupByParallelism).forTable(hoodieTableMetadata.getTableName()).withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BLOOM).build()).build();
    }

    private JavaSparkContext getSparkContext(Config config) {
        SparkConf appName = new SparkConf().setAppName("hoodie-delta-streamer-" + config.targetTableName);
        appName.setMaster(config.sparkMaster);
        appName.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
        appName.set("spark.driver.maxResultSize", "2g");
        if (config.sparkMaster.startsWith("yarn")) {
            appName.set("spark.eventLog.overwrite", "true");
            appName.set("spark.eventLog.enabled", "true");
        }
        appName.set("spark.hadoop.mapred.output.compress", "true");
        appName.set("spark.hadoop.mapred.output.compression.codec", "true");
        appName.set("spark.hadoop.mapred.output.compression.codec", "org.apache.hadoop.io.compress.GzipCodec");
        appName.set("spark.hadoop.mapred.output.compression.type", "BLOCK");
        return new JavaSparkContext(HoodieWriteClient.registerClasses(appName));
    }

    public static void main(String[] strArr) throws Exception {
        Config config = new Config();
        JCommander jCommander = new JCommander(config, strArr);
        if (config.help.booleanValue() || strArr.length == 0) {
            jCommander.usage();
            System.exit(1);
        }
        new HoodieDeltaStreamer(config).sync();
    }
}
