package com.uber.hoodie.utilities;

import com.beust.jcommander.IValueValidator;
import com.beust.jcommander.JCommander;
import com.beust.jcommander.Parameter;
import com.beust.jcommander.ParameterException;
import com.google.common.annotations.VisibleForTesting;
import com.uber.hoodie.HoodieWriteClient;
import com.uber.hoodie.WriteStatus;
import com.uber.hoodie.common.HoodieJsonPayload;
import com.uber.hoodie.common.model.HoodieKey;
import com.uber.hoodie.common.model.HoodieRecord;
import com.uber.hoodie.common.table.HoodieTableMetaClient;
import com.uber.hoodie.common.util.FSUtils;
import com.uber.hoodie.config.HoodieIndexConfig;
import com.uber.hoodie.config.HoodieWriteConfig;
import com.uber.hoodie.exception.HoodieIOException;
import com.uber.hoodie.index.HoodieIndex;
import java.io.IOException;
import java.io.Serializable;
import java.nio.ByteBuffer;
import java.text.SimpleDateFormat;
import java.util.Arrays;
import java.util.Date;
import java.util.List;
import java.util.Properties;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.Job;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.parquet.avro.AvroReadSupport;
import org.apache.parquet.hadoop.ParquetInputFormat;
import org.apache.spark.Accumulator;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.VoidFunction;
import scala.Tuple2;

/* loaded from: input_file:com/uber/hoodie/utilities/HDFSParquetImporter.class */
public class HDFSParquetImporter implements Serializable {
    private final Config cfg;
    private final transient FileSystem fs = FSUtils.getFs();
    private static volatile Logger logger = LogManager.getLogger(HDFSParquetImporter.class);
    public static final SimpleDateFormat PARTITION_FORMATTER = new SimpleDateFormat("yyyy/MM/dd");

    /* loaded from: input_file:com/uber/hoodie/utilities/HDFSParquetImporter$Config.class */
    public static class Config implements Serializable {

        @Parameter(names = {"--src-path", "-sp"}, description = "Base path for the input dataset", required = true)
        public String srcPath = null;

        @Parameter(names = {"--src-type", "-st"}, description = "Source type for the input dataset", required = true, validateValueWith = SourceTypeValidator.class)
        public String srcType = null;

        @Parameter(names = {"--target-path", "-tp"}, description = "Base path for the target hoodie dataset", required = true)
        public String targetPath = null;

        @Parameter(names = {"--table-name", "-tn"}, description = "Table name", required = true)
        public String tableName = null;

        @Parameter(names = {"--table-type", "-tt"}, description = "Table type", required = true)
        public String tableType = null;

        @Parameter(names = {"--row-key-field", "-rk"}, description = "Row key field name", required = true)
        public String rowKey = null;

        @Parameter(names = {"--partition-key-field", "-pk"}, description = "Partition key field name", required = true)
        public String partitionKey = null;

        @Parameter(names = {"--parallelism", "-pl"}, description = "Parallelism for hoodie insert", required = true)
        public int parallelism = 1;

        @Parameter(names = {"--schema-file", "-sf"}, description = "path for Avro schema file", required = true)
        public String schemaFile = null;

        @Parameter(names = {"--format", "-f"}, description = "Format for the input data.", required = false, validateValueWith = FormatValidator.class)
        public String format = null;

        @Parameter(names = {"--spark-master", "-ms"}, description = "Spark master", required = false)
        public String sparkMaster = null;

        @Parameter(names = {"--spark-memory", "-sm"}, description = "spark memory to use", required = true)
        public String sparkMemory = null;

        @Parameter(names = {"--retry", "-rt"}, description = "number of retries", required = false)
        public int retry = 0;

        @Parameter(names = {"--help", "-h"}, help = true)
        public Boolean help = false;
    }

    /* loaded from: input_file:com/uber/hoodie/utilities/HDFSParquetImporter$FormatValidator.class */
    public static class FormatValidator implements IValueValidator<String> {
        List<String> validFormats = Arrays.asList("parquet");

        public void validate(String str, String str2) throws ParameterException {
            if (str2 == null || !this.validFormats.contains(str2)) {
                throw new ParameterException(String.format("Invalid format type: value:%s: supported formats:%s", str2, this.validFormats));
            }
        }
    }

    /* loaded from: input_file:com/uber/hoodie/utilities/HDFSParquetImporter$SourceTypeValidator.class */
    public static class SourceTypeValidator implements IValueValidator<String> {
        List<String> validSourceTypes = Arrays.asList("hdfs");

        public void validate(String str, String str2) throws ParameterException {
            if (str2 == null || !this.validSourceTypes.contains(str2)) {
                throw new ParameterException(String.format("Invalid source type: value:%s: supported source types:%s", str2, this.validSourceTypes));
            }
        }
    }

    public HDFSParquetImporter(Config config) throws IOException {
        this.cfg = config;
    }

    public static void main(String[] strArr) throws Exception {
        Config config = new Config();
        JCommander jCommander = new JCommander(config, strArr);
        if (config.help.booleanValue() || strArr.length == 0) {
            jCommander.usage();
            System.exit(1);
        }
        HDFSParquetImporter hDFSParquetImporter = new HDFSParquetImporter(config);
        hDFSParquetImporter.dataImport(hDFSParquetImporter.getSparkContext(), config.retry);
    }

    private JavaSparkContext getSparkContext() {
        SparkConf appName = new SparkConf().setAppName("hoodie-data-importer-" + this.cfg.tableName);
        appName.setMaster(this.cfg.sparkMaster);
        if (this.cfg.sparkMaster.startsWith("yarn")) {
            appName.set("spark.eventLog.overwrite", "true");
            appName.set("spark.eventLog.enabled", "true");
        }
        appName.set("spark.driver.maxResultSize", "2g");
        appName.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
        appName.set("spark.executor.memory", this.cfg.sparkMemory);
        appName.set("spark.hadoop.mapred.output.compress", "true");
        appName.set("spark.hadoop.mapred.output.compression.codec", "true");
        appName.set("spark.hadoop.mapred.output.compression.codec", "org.apache.hadoop.io.compress.GzipCodec");
        appName.set("spark.hadoop.mapred.output.compression.type", "BLOCK");
        return new JavaSparkContext(HoodieWriteClient.registerClasses(appName));
    }

    private String getSchema() throws Exception {
        Path path = new Path(this.cfg.schemaFile);
        if (!this.fs.exists(path)) {
            throw new Exception(String.format("Could not find - %s - schema file.", this.cfg.schemaFile));
        }
        ByteBuffer allocate = ByteBuffer.allocate((int) this.fs.getFileStatus(path).getLen());
        FSDataInputStream fSDataInputStream = null;
        try {
            fSDataInputStream = this.fs.open(path);
            fSDataInputStream.readFully(0L, allocate.array(), 0, allocate.array().length);
            if (fSDataInputStream != null) {
                fSDataInputStream.close();
            }
            return new String(allocate.array());
        } catch (Throwable th) {
            if (fSDataInputStream != null) {
                fSDataInputStream.close();
            }
            throw th;
        }
    }

    public int dataImport(JavaSparkContext javaSparkContext, int i) throws Exception {
        int i2;
        int i3 = -1;
        try {
            if (this.fs.exists(new Path(this.cfg.targetPath))) {
                throw new HoodieIOException(String.format("Make sure %s is not present.", this.cfg.targetPath));
            }
            do {
                i3 = dataImport(javaSparkContext);
                if (i3 == 0) {
                    break;
                }
                i2 = i;
                i--;
            } while (i2 > 0);
        } catch (Throwable th) {
            logger.error(th);
        }
        return i3;
    }

    @VisibleForTesting
    protected int dataImport(JavaSparkContext javaSparkContext) throws IOException {
        try {
            if (this.fs.exists(new Path(this.cfg.targetPath))) {
                this.fs.delete(new Path(this.cfg.targetPath), true);
            }
            String schema = getSchema();
            Properties properties = new Properties();
            properties.put("hoodie.table.name", this.cfg.tableName);
            properties.put("hoodie.table.type", this.cfg.tableType);
            HoodieTableMetaClient.initializePathAsHoodieDataset(this.fs, this.cfg.targetPath, properties);
            HoodieWriteClient createHoodieClient = createHoodieClient(javaSparkContext, this.cfg.targetPath, schema, this.cfg.parallelism);
            Job job = Job.getInstance(javaSparkContext.hadoopConfiguration());
            job.getConfiguration().set("mapreduce.input.fileinputformat.list-status.num-threads", "1024");
            AvroReadSupport.setAvroReadSchema(javaSparkContext.hadoopConfiguration(), new Schema.Parser().parse(schema));
            ParquetInputFormat.setReadSupportClass(job, AvroReadSupport.class);
            JavaRDD map = javaSparkContext.newAPIHadoopFile(this.cfg.srcPath, ParquetInputFormat.class, Void.class, GenericRecord.class, job.getConfiguration()).coalesce(16 * this.cfg.parallelism).map(new Function<Tuple2<Void, GenericRecord>, HoodieRecord<HoodieJsonPayload>>() { // from class: com.uber.hoodie.utilities.HDFSParquetImporter.1
                public HoodieRecord<HoodieJsonPayload> call(Tuple2<Void, GenericRecord> tuple2) throws Exception {
                    GenericRecord genericRecord = (GenericRecord) tuple2._2();
                    Object obj = genericRecord.get(HDFSParquetImporter.this.cfg.partitionKey);
                    if (obj == null) {
                        throw new HoodieIOException("partition key is missing. :" + HDFSParquetImporter.this.cfg.partitionKey);
                    }
                    Object obj2 = genericRecord.get(HDFSParquetImporter.this.cfg.rowKey);
                    if (obj2 == null) {
                        throw new HoodieIOException("row field is missing. :" + HDFSParquetImporter.this.cfg.rowKey);
                    }
                    return new HoodieRecord<>(new HoodieKey((String) obj2, HDFSParquetImporter.PARTITION_FORMATTER.format(new Date((long) (((Double) obj).doubleValue() * 1000.0d)))), new HoodieJsonPayload(genericRecord.toString()));
                }
            });
            String startCommit = createHoodieClient.startCommit();
            JavaRDD bulkInsert = createHoodieClient.bulkInsert(map, startCommit);
            final Accumulator accumulator = javaSparkContext.accumulator(0);
            bulkInsert.foreach(new VoidFunction<WriteStatus>() { // from class: com.uber.hoodie.utilities.HDFSParquetImporter.2
                public void call(WriteStatus writeStatus) throws Exception {
                    if (writeStatus.hasErrors()) {
                        accumulator.add(1);
                        HDFSParquetImporter.logger.error(String.format("Error processing records :writeStatus:%s", writeStatus.getStat().toString()));
                    }
                }
            });
            if (((Integer) accumulator.value()).intValue() == 0) {
                logger.info(String.format("Dataset imported into hoodie dataset with %s commit time.", startCommit));
                return 0;
            }
            logger.error(String.format("Import failed with %d errors.", accumulator.value()));
            return -1;
        } catch (Throwable th) {
            logger.error("Error occurred.", th);
            return -1;
        }
    }

    private static HoodieWriteClient createHoodieClient(JavaSparkContext javaSparkContext, String str, String str2, int i) throws Exception {
        return new HoodieWriteClient(javaSparkContext, HoodieWriteConfig.newBuilder().withPath(str).withParallelism(i, i).withSchema(str2).combineInput(true, true).withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BLOOM).build()).build());
    }
}
