package com.uber.hoodie.utilities.deltastreamer;

import com.beust.jcommander.IStringConverter;
import com.beust.jcommander.JCommander;
import com.beust.jcommander.Parameter;
import com.beust.jcommander.ParameterException;
import com.uber.hoodie.HoodieWriteClient;
import com.uber.hoodie.common.model.HoodieCommitMetadata;
import com.uber.hoodie.common.model.HoodieRecord;
import com.uber.hoodie.common.table.HoodieTableMetaClient;
import com.uber.hoodie.common.table.HoodieTimeline;
import com.uber.hoodie.common.table.timeline.HoodieInstant;
import com.uber.hoodie.common.util.FSUtils;
import com.uber.hoodie.config.HoodieIndexConfig;
import com.uber.hoodie.config.HoodieWriteConfig;
import com.uber.hoodie.index.HoodieIndex;
import com.uber.hoodie.utilities.UtilHelpers;
import com.uber.hoodie.utilities.exception.HoodieDeltaStreamerException;
import com.uber.hoodie.utilities.keygen.KeyGenerator;
import com.uber.hoodie.utilities.keygen.SimpleKeyGenerator;
import com.uber.hoodie.utilities.schema.FilebasedSchemaProvider;
import com.uber.hoodie.utilities.schema.SchemaProvider;
import com.uber.hoodie.utilities.sources.DFSSource;
import com.uber.hoodie.utilities.sources.Source;
import com.uber.hoodie.utilities.sources.SourceDataFormat;
import java.io.IOException;
import java.io.Serializable;
import java.lang.invoke.SerializedLambda;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Optional;
import java.util.Properties;
import org.apache.avro.generic.GenericRecord;
import org.apache.commons.configuration.PropertiesConfiguration;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import scala.collection.JavaConversions;

/* loaded from: input_file:com/uber/hoodie/utilities/deltastreamer/HoodieDeltaStreamer.class */
public class HoodieDeltaStreamer implements Serializable {
    private static volatile Logger log = LogManager.getLogger(HoodieDeltaStreamer.class);
    private static String CHECKPOINT_KEY = "deltastreamer.checkpoint.key";
    private final Config cfg;
    private transient Source source;
    private transient SchemaProvider schemaProvider;
    private KeyGenerator keyGenerator;
    private transient FileSystem fs = FSUtils.getFs();
    private transient Optional<HoodieTimeline> commitTimelineOpt;
    private transient JavaSparkContext jssc;

    /* loaded from: input_file:com/uber/hoodie/utilities/deltastreamer/HoodieDeltaStreamer$Config.class */
    public static class Config implements Serializable {

        @Parameter(names = {"--target-base-path"}, description = "base path for the target hoodie dataset", required = true)
        public String targetBasePath;

        @Parameter(names = {"--target-table"}, description = "name of the target table in Hive", required = true)
        public String targetTableName;

        @Parameter(names = {"--hoodie-client-config"}, description = "path to properties file on localfs or dfs, with hoodie client config. Sane defaultsare used, but recommend use to provide basic things like metrics endpoints, hive configs etc")
        public String hoodieClientProps = null;

        @Parameter(names = {"--source-class"}, description = "subclass of com.uber.hoodie.utilities.sources.Source to use to read data. built-in options: com.uber.hoodie.utilities.common.{DFSSource (default), KafkaSource, HiveIncrPullSource}")
        public String sourceClassName = DFSSource.class.getName();

        @Parameter(names = {"--source-config"}, description = "path to properties file on localfs or dfs, with source configs. For list of acceptable properties, refer the source class", required = true)
        public String sourceConfigProps = null;

        @Parameter(names = {"--source-format"}, description = "Format of data in source, JSON (default), Avro. All source data is converted to Avro using the provided schema in any case", converter = SourceFormatConvertor.class)
        public SourceDataFormat sourceFormat = SourceDataFormat.JSON;

        @Parameter(names = {"--source-ordering-field"}, description = "Field within source record to decide how to break ties between  records with same key in input data. Default: 'ts' holding unix timestamp of record")
        public String sourceOrderingField = "ts";

        @Parameter(names = {"--key-generator-class"}, description = "Subclass of com.uber.hoodie.utilities.common.KeyExtractor to generatea HoodieKey from the given avro record. Built in: SimpleKeyGenerator (Uses provided field names as recordkey & partitionpath. Nested fields specified via dot notation, e.g: a.b.c)")
        public String keyGeneratorClass = SimpleKeyGenerator.class.getName();

        @Parameter(names = {"--key-generator-config"}, description = "Path to properties file on localfs or dfs, with KeyGenerator configs. For list of acceptable properites, refer the KeyGenerator class", required = true)
        public String keyGeneratorProps = null;

        @Parameter(names = {"--payload-class"}, description = "subclass of HoodieRecordPayload, that works off a GenericRecord. Default: SourceWrapperPayload. Implement your own, if you want to do something other than overwriting existing value")
        public String payloadClassName = DeltaStreamerAvroPayload.class.getName();

        @Parameter(names = {"--schemaprovider-class"}, description = "subclass of com.uber.hoodie.utilities.schema.SchemaProvider to attach schemas to input & target table data, built in options: FilebasedSchemaProvider")
        public String schemaProviderClassName = FilebasedSchemaProvider.class.getName();

        @Parameter(names = {"--schemaprovider-config"}, description = "path to properties file on localfs or dfs, with schema configs. For list of acceptable properties, refer the schema provider class", required = true)
        public String schemaProviderConfigProps = null;

        @Parameter(names = {"--max-input-bytes"}, description = "Maximum number of bytes to read from source. Default: 1TB")
        public long maxInputBytes = 1099511627776L;

        @Parameter(names = {"--op"}, description = "Takes one of these values : UPSERT (default), INSERT (use when input is purely new data/inserts to gain speed)", converter = OperationConvertor.class)
        public Operation operation = Operation.UPSERT;

        @Parameter(names = {"--help", "-h"}, help = true)
        public Boolean help = false;
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/uber/hoodie/utilities/deltastreamer/HoodieDeltaStreamer$Operation.class */
    public enum Operation {
        UPSERT,
        INSERT
    }

    /* loaded from: input_file:com/uber/hoodie/utilities/deltastreamer/HoodieDeltaStreamer$OperationConvertor.class */
    private class OperationConvertor implements IStringConverter<Operation> {
        private OperationConvertor() {
        }

        /* renamed from: convert, reason: merged with bridge method [inline-methods] */
        public Operation m5convert(String str) throws ParameterException {
            return Operation.valueOf(str);
        }
    }

    /* loaded from: input_file:com/uber/hoodie/utilities/deltastreamer/HoodieDeltaStreamer$SourceFormatConvertor.class */
    private class SourceFormatConvertor implements IStringConverter<SourceDataFormat> {
        private SourceFormatConvertor() {
        }

        /* renamed from: convert, reason: merged with bridge method [inline-methods] */
        public SourceDataFormat m6convert(String str) throws ParameterException {
            return SourceDataFormat.valueOf(str);
        }
    }

    public HoodieDeltaStreamer(Config config) throws IOException {
        this.cfg = config;
        if (this.fs.exists(new Path(config.targetBasePath))) {
            this.commitTimelineOpt = Optional.of(new HoodieTableMetaClient(this.fs, config.targetBasePath).getActiveTimeline().getCommitTimeline().filterCompletedInstants());
        } else {
            this.commitTimelineOpt = Optional.empty();
        }
        initSchemaProvider();
        initKeyGenerator();
        this.jssc = getSparkContext();
        initSource();
    }

    private void initSource() throws IOException {
        PropertiesConfiguration readConfig = UtilHelpers.readConfig(this.fs, new Path(this.cfg.sourceConfigProps));
        log.info("Creating source " + this.cfg.sourceClassName + " with configs : " + readConfig.toString());
        this.source = UtilHelpers.createSource(this.cfg.sourceClassName, readConfig, this.jssc, this.cfg.sourceFormat, this.schemaProvider);
    }

    private void initSchemaProvider() throws IOException {
        PropertiesConfiguration readConfig = UtilHelpers.readConfig(this.fs, new Path(this.cfg.schemaProviderConfigProps));
        log.info("Creating schema provider " + this.cfg.schemaProviderClassName + " with configs : " + readConfig.toString());
        this.schemaProvider = UtilHelpers.createSchemaProvider(this.cfg.schemaProviderClassName, readConfig);
    }

    private void initKeyGenerator() throws IOException {
        PropertiesConfiguration readConfig = UtilHelpers.readConfig(this.fs, new Path(this.cfg.keyGeneratorProps));
        log.info("Creating key generator " + this.cfg.keyGeneratorClass + " with configs : " + readConfig.toString());
        this.keyGenerator = UtilHelpers.createKeyGenerator(this.cfg.keyGeneratorClass, readConfig);
    }

    private JavaSparkContext getSparkContext() {
        SparkConf appName = new SparkConf().setAppName("hoodie-delta-streamer-" + this.cfg.targetTableName);
        appName.setMaster("local[2]");
        appName.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
        appName.set("spark.driver.maxResultSize", "2g");
        appName.set("spark.hadoop.mapred.output.compress", "true");
        appName.set("spark.hadoop.mapred.output.compression.codec", "true");
        appName.set("spark.hadoop.mapred.output.compression.codec", "org.apache.hadoop.io.compress.GzipCodec");
        appName.set("spark.hadoop.mapred.output.compression.type", "BLOCK");
        SparkConf registerClasses = HoodieWriteClient.registerClasses(appName);
        registerClasses.registerAvroSchemas(JavaConversions.asScalaBuffer(Arrays.asList(this.schemaProvider.getSourceSchema(), this.schemaProvider.getTargetSchema())).toList());
        return new JavaSparkContext(registerClasses);
    }

    private void sync() throws Exception {
        JavaRDD upsert;
        Optional<String> empty = Optional.empty();
        if (this.commitTimelineOpt.isPresent()) {
            Optional lastInstant = this.commitTimelineOpt.get().lastInstant();
            if (lastInstant.isPresent()) {
                HoodieCommitMetadata fromBytes = HoodieCommitMetadata.fromBytes((byte[]) this.commitTimelineOpt.get().getInstantDetails((HoodieInstant) lastInstant.get()).get());
                if (fromBytes.getMetadata(CHECKPOINT_KEY) == null) {
                    throw new HoodieDeltaStreamerException("Unable to find previous checkpoint. Please double check if this table was indeed built via delta streamer ");
                }
                empty = Optional.of(fromBytes.getMetadata(CHECKPOINT_KEY));
            }
        } else {
            Properties properties = new Properties();
            properties.put("hoodie.table.name", this.cfg.targetTableName);
            HoodieTableMetaClient.initializePathAsHoodieDataset(FSUtils.getFs(), this.cfg.targetBasePath, properties);
        }
        log.info("Checkpoint to resume from : " + empty);
        Pair<Optional<JavaRDD<GenericRecord>>, String> fetchNewData = this.source.fetchNewData(empty, this.cfg.maxInputBytes);
        if (!((Optional) fetchNewData.getKey()).isPresent()) {
            log.info("No new data, nothing to commit.. ");
            return;
        }
        JavaRDD map = ((JavaRDD) ((Optional) fetchNewData.getKey()).get()).map(genericRecord -> {
            return new HoodieRecord(this.keyGenerator.getKey(genericRecord), UtilHelpers.createPayload(this.cfg.payloadClassName, genericRecord, (Comparable) genericRecord.get(this.cfg.sourceOrderingField)));
        });
        HoodieWriteClient hoodieWriteClient = new HoodieWriteClient(this.jssc, getHoodieClientConfig(this.cfg.hoodieClientProps));
        String startCommit = hoodieWriteClient.startCommit();
        log.info("Starting commit  : " + startCommit);
        if (this.cfg.operation == Operation.INSERT) {
            upsert = hoodieWriteClient.insert(map, startCommit);
        } else {
            if (this.cfg.operation != Operation.UPSERT) {
                throw new HoodieDeltaStreamerException("Unknown operation :" + this.cfg.operation);
            }
            upsert = hoodieWriteClient.upsert(map, startCommit);
        }
        HashMap hashMap = new HashMap();
        hashMap.put(CHECKPOINT_KEY, fetchNewData.getValue());
        if (hoodieWriteClient.commit(startCommit, upsert, Optional.of(hashMap))) {
            log.info("Commit " + startCommit + " successful!");
        } else {
            log.info("Commit " + startCommit + " failed!");
        }
        hoodieWriteClient.close();
    }

    private HoodieWriteConfig getHoodieClientConfig(String str) throws Exception {
        return HoodieWriteConfig.newBuilder().combineInput(true, true).withPath(this.cfg.targetBasePath).withAutoCommit(false).withSchema(this.schemaProvider.getTargetSchema().toString()).forTable(this.cfg.targetTableName).withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BLOOM).build()).fromInputStream(this.fs.open(new Path(str))).build();
    }

    public static void main(String[] strArr) throws Exception {
        Config config = new Config();
        JCommander jCommander = new JCommander(config, strArr);
        if (config.help.booleanValue() || strArr.length == 0) {
            jCommander.usage();
            System.exit(1);
        }
        new HoodieDeltaStreamer(config).sync();
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -1089061620:
                if (implMethodName.equals("lambda$sync$c9e1cdc3$1")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 7 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/spark/api/java/function/Function") && serializedLambda.getFunctionalInterfaceMethodName().equals("call") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("com/uber/hoodie/utilities/deltastreamer/HoodieDeltaStreamer") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/avro/generic/GenericRecord;)Lcom/uber/hoodie/common/model/HoodieRecord;")) {
                    HoodieDeltaStreamer hoodieDeltaStreamer = (HoodieDeltaStreamer) serializedLambda.getCapturedArg(0);
                    return genericRecord -> {
                        return new HoodieRecord(this.keyGenerator.getKey(genericRecord), UtilHelpers.createPayload(this.cfg.payloadClassName, genericRecord, (Comparable) genericRecord.get(this.cfg.sourceOrderingField)));
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
