package za.co.absa.pramen.extras.writer;

import com.typesafe.config.Config;
import java.time.LocalDate;
import org.apache.avro.Schema;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.functions$;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Array$;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Predef$;
import scala.Predef$ArrowAssoc$;
import scala.Some;
import scala.StringContext;
import scala.Symbol;
import scala.Symbol$;
import scala.Tuple2;
import scala.collection.Seq;
import scala.collection.Seq$;
import scala.collection.immutable.Map;
import scala.reflect.ClassTag$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxesRunTime;
import za.co.absa.abris.avro.read.confluent.SchemaManager;
import za.co.absa.abris.avro.read.confluent.SchemaManagerFactory$;
import za.co.absa.abris.avro.registry.SchemaSubject;
import za.co.absa.abris.config.AbrisConfig$;
import za.co.absa.abris.config.ToAvroConfig;
import za.co.absa.pramen.core.utils.ConfigUtils$;
import za.co.absa.pramen.extras.avro.AvroUtils$;
import za.co.absa.pramen.extras.writer.model.KafkaWriterConfig;
import za.co.absa.pramen.extras.writer.model.NamingStrategy;

/* compiled from: TableWriterKafka.scala */
@ScalaSignature(bytes = "\u0006\u0001\u0005ue\u0001B\u0001\u0003\u0001=\u0011\u0001\u0003V1cY\u0016<&/\u001b;fe.\u000bgm[1\u000b\u0005\r!\u0011AB<sSR,'O\u0003\u0002\u0006\r\u00051Q\r\u001f;sCNT!a\u0002\u0005\u0002\rA\u0014\u0018-\\3o\u0015\tI!\"\u0001\u0003bEN\f'BA\u0006\r\u0003\t\u0019wNC\u0001\u000e\u0003\tQ\u0018m\u0001\u0001\u0014\u0007\u0001\u0001b\u0003\u0005\u0002\u0012)5\t!CC\u0001\u0014\u0003\u0015\u00198-\u00197b\u0013\t)\"C\u0001\u0004B]f\u0014VM\u001a\t\u0003/ai\u0011AA\u0005\u00033\t\u00111\u0002V1cY\u0016<&/\u001b;fe\"A1\u0004\u0001B\u0001B\u0003%A$A\u0005u_BL7MT1nKB\u0011Q\u0004\t\b\u0003#yI!a\b\n\u0002\rA\u0013X\rZ3g\u0013\t\t#E\u0001\u0004TiJLgn\u001a\u0006\u0003?IA\u0001\u0002\n\u0001\u0003\u0002\u0003\u0006I!J\u0001\fW\u000647.Y\"p]\u001aLw\r\u0005\u0002'S5\tqE\u0003\u0002)\u0005\u0005)Qn\u001c3fY&\u0011!f\n\u0002\u0012\u0017\u000647.Y,sSR,'oQ8oM&<\u0007\u0002\u0003\u0017\u0001\u0005\u0003\u0005\u000b\u0011B\u0017\u0002\u0019\u0015DHO]1PaRLwN\\:\u0011\tuqC\u0004H\u0005\u0003_\t\u00121!T1q\u0011!\t\u0004A!A!\u0002\u0017\u0011\u0014!B:qCJ\\\u0007CA\u001a<\u001b\u0005!$BA\u001b7\u0003\r\u0019\u0018\u000f\u001c\u0006\u0003c]R!\u0001O\u001d\u0002\r\u0005\u0004\u0018m\u00195f\u0015\u0005Q\u0014aA8sO&\u0011A\b\u000e\u0002\r'B\f'o[*fgNLwN\u001c\u0005\u0006}\u0001!\taP\u0001\u0007y%t\u0017\u000e\u001e \u0015\t\u0001\u001bE)\u0012\u000b\u0003\u0003\n\u0003\"a\u0006\u0001\t\u000bEj\u00049\u0001\u001a\t\u000bmi\u0004\u0019\u0001\u000f\t\u000b\u0011j\u0004\u0019A\u0013\t\u000b1j\u0004\u0019A\u0017\t\u000f\u001d\u0003!\u0019!C\u0005\u0011\u0006\u0019An\\4\u0016\u0003%\u0003\"AS'\u000e\u0003-S!\u0001T\u001d\u0002\u000bMdg\r\u000e6\n\u00059[%A\u0002'pO\u001e,'\u000f\u0003\u0004Q\u0001\u0001\u0006I!S\u0001\u0005Y><\u0007\u0005C\u0003S\u0001\u0011\u00053+A\u0003xe&$X\r\u0006\u0003U/.,\bCA\tV\u0013\t1&C\u0001\u0003M_:<\u0007\"\u0002-R\u0001\u0004I\u0016A\u00013g!\tQ\u0006N\u0004\u0002\\M:\u0011A,\u001a\b\u0003;\u0012t!AX2\u000f\u0005}\u0013W\"\u00011\u000b\u0005\u0005t\u0011A\u0002\u001fs_>$h(C\u0001;\u0013\tA\u0014(\u0003\u00022o%\u0011QGN\u0005\u0003OR\nq\u0001]1dW\u0006<W-\u0003\u0002jU\nIA)\u0019;b\rJ\fW.\u001a\u0006\u0003ORBQ\u0001\\)A\u00025\f\u0001\"\u001b8g_\u0012\u000bG/\u001a\t\u0003]Nl\u0011a\u001c\u0006\u0003aF\fA\u0001^5nK*\t!/\u0001\u0003kCZ\f\u0017B\u0001;p\u0005%aunY1m\t\u0006$X\rC\u0003w#\u0002\u0007q/\u0001\u000bok6|eMU3d_J$7/R:uS6\fG/\u001a\t\u0004#a$\u0016BA=\u0013\u0005\u0019y\u0005\u000f^5p]\"11\u0010\u0001C\u0001\rq\fqbZ3u\u000bb$(/Y(qi&|gn]\u000b\u0002[!1a\u0010\u0001C\u0001\r}\f!cZ3u\u001fV$\b/\u001e;ECR\fgI]1nKR\u0019\u0011,!\u0001\t\u000bak\b\u0019A-\t\u0011\u0005\u0015\u0001\u0001\"\u0001\u0007\u0003\u000f\t\u0011cZ3u-\u0006dW/\u001a#bi\u00064%/Y7f)\rI\u0016\u0011\u0002\u0005\u00071\u0006\r\u0001\u0019A-\t\u0011\u00055\u0001\u0001\"\u0001\u0007\u0003\u001f\tAcZ3u\u0017\u0016Lh+\u00197vK\u0012\u000bG/\u0019$sC6,GcA-\u0002\u0012!1\u0001,a\u0003A\u0002eC\u0001\"!\u0006\u0001\t\u00031\u0011qC\u0001\u000fe\u0016<\u0017n\u001d;feN\u001b\u0007.Z7b))\tI\"a\b\u0002*\u00055\u0012q\u0007\t\u0004#\u0005m\u0011bAA\u000f%\t\u0019\u0011J\u001c;\t\u0011\u0005\u0005\u00121\u0003a\u0001\u0003G\tqaY8mk6t7\u000fE\u00024\u0003KI1!a\n5\u0005\u0019\u0019u\u000e\\;n]\"9\u00111FA\n\u0001\u0004i\u0013AG:dQ\u0016l\u0017MU3hSN$(/_\"mS\u0016tGoQ8oM&<\u0007\u0002CA\u0018\u0003'\u0001\r!!\r\u0002\u001d9\fW.\u001b8h'R\u0014\u0018\r^3hsB\u0019a%a\r\n\u0007\u0005UrE\u0001\bOC6LgnZ*ue\u0006$XmZ=\t\u0011\u0005e\u00121\u0003a\u0001\u0003w\tQ![:LKf\u00042!EA\u001f\u0013\r\tyD\u0005\u0002\b\u0005>|G.Z1o\u0011!\t\u0019\u0005\u0001C\u0001\r\u0005\u0015\u0013!D4fi\u00063(o\\\"p]\u001aLw\r\u0006\u0006\u0002H\u0005]\u0013\u0011LA.\u0003;\u0002B!!\u0013\u0002T5\u0011\u00111\n\u0006\u0005\u0003\u001b\ny%\u0001\u0004d_:4\u0017n\u001a\u0006\u0004\u0003#B\u0011!B1ce&\u001c\u0018\u0002BA+\u0003\u0017\u0012A\u0002V8BmJ|7i\u001c8gS\u001eD\u0001\"!\t\u0002B\u0001\u0007\u00111\u0005\u0005\t\u0003_\t\t\u00051\u0001\u00022!A\u0011\u0011HA!\u0001\u0004\tY\u0004\u0003\u0005\u0002`\u0005\u0005\u0003\u0019AA1\u0003-\u00198\r[3nC&#w\n\u001d;\u0011\tEA\u0018\u0011D\u0004\b\u0003K\u0012\u0001\u0012AA4\u0003A!\u0016M\u00197f/JLG/\u001a:LC\u001a\\\u0017\rE\u0002\u0018\u0003S2a!\u0001\u0002\t\u0002\u0005-4cAA5!!9a(!\u001b\u0005\u0002\u0005=DCAA4\u0011!9\u0015\u0011\u000eb\u0001\n\u0013A\u0005b\u0002)\u0002j\u0001\u0006I!\u0013\u0005\t\u0003o\nI\u0007\"\u0001\u0002z\u0005)\u0011\r\u001d9msR1\u00111PA@\u0003\u0003#2!QA?\u0011\u0019\t\u0014Q\u000fa\u0002e!11$!\u001eA\u0002qA\u0001\"a!\u0002v\u0001\u0007\u0011QQ\u0001\u0005G>tg\r\u0005\u0003\u0002\b\u0006MUBAAE\u0015\u0011\ti%a#\u000b\t\u00055\u0015qR\u0001\tif\u0004Xm]1gK*\u0011\u0011\u0011S\u0001\u0004G>l\u0017\u0002BAK\u0003\u0013\u0013aaQ8oM&<\u0007\u0002C>\u0002j\u0011\u0005a!!'\u0015\u00075\nY\n\u0003\u0005\u0002\u0004\u0006]\u0005\u0019AAC\u0001")
/* loaded from: input_file:za/co/absa/pramen/extras/writer/TableWriterKafka.class */
public class TableWriterKafka implements TableWriter {
    private final String topicName;
    private final KafkaWriterConfig kafkaConfig;
    private final Map<String, String> extraOptions;
    private final Logger log = LoggerFactory.getLogger(getClass());
    private static Symbol symbol$1 = Symbol$.MODULE$.apply("value");
    private static Symbol symbol$2 = Symbol$.MODULE$.apply("key");

    public static TableWriterKafka apply(String str, Config config, SparkSession sparkSession) {
        return TableWriterKafka$.MODULE$.apply(str, config, sparkSession);
    }

    private Logger log() {
        return this.log;
    }

    @Override // za.co.absa.pramen.extras.writer.TableWriter
    public long write(Dataset<Row> dataset, LocalDate localDate, Option<Object> option) {
        Dataset<Row> outputDataFrame = getOutputDataFrame(dataset);
        long count = outputDataFrame.count();
        if (count <= 0) {
            log().info(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"Nothing to write to '", "'..."})).s(Predef$.MODULE$.genericWrapArray(new Object[]{this.topicName})));
            return 0L;
        }
        log().info(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"Writing ", " records to '", "'..."})).s(Predef$.MODULE$.genericWrapArray(new Object[]{BoxesRunTime.boxToLong(count), this.topicName})));
        outputDataFrame.write().format("kafka").option("topic", this.topicName).option("kafka.bootstrap.servers", this.kafkaConfig.brokers()).options(this.extraOptions).save();
        return count;
    }

    public Map<String, String> getExtraOptions() {
        return this.extraOptions;
    }

    public Dataset<Row> getOutputDataFrame(Dataset<Row> dataset) {
        Dataset<Row> valueDataFrame;
        Dataset<Row> dataset2;
        Option<NamingStrategy> keyNamingStrategy = this.kafkaConfig.keyNamingStrategy();
        if (keyNamingStrategy instanceof Some) {
            valueDataFrame = getKeyValueDataFrame(dataset);
        } else {
            if (!None$.MODULE$.equals(keyNamingStrategy)) {
                throw new MatchError(keyNamingStrategy);
            }
            valueDataFrame = getValueDataFrame(dataset);
        }
        Dataset<Row> dataset3 = valueDataFrame;
        Some recordsLimit = this.kafkaConfig.recordsLimit();
        if (recordsLimit instanceof Some) {
            dataset2 = dataset3.limit(BoxesRunTime.unboxToInt(recordsLimit.x()));
        } else {
            if (!None$.MODULE$.equals(recordsLimit)) {
                throw new MatchError(recordsLimit);
            }
            dataset2 = dataset3;
        }
        return dataset2;
    }

    public Dataset<Row> getValueDataFrame(Dataset<Row> dataset) {
        Column struct = functions$.MODULE$.struct(Predef$.MODULE$.wrapRefArray((Object[]) Predef$.MODULE$.refArrayOps(dataset.columns()).map(new TableWriterKafka$$anonfun$1(this, dataset), Array$.MODULE$.canBuildFrom(ClassTag$.MODULE$.apply(Column.class)))));
        return dataset.select(Predef$.MODULE$.wrapRefArray(new Column[]{za.co.absa.abris.avro.functions$.MODULE$.to_avro(struct, getAvroConfig(struct, this.kafkaConfig.valueNamingStrategy(), false, this.kafkaConfig.valueSchemaId())).as(symbol$1)}));
    }

    public Dataset<Row> getKeyValueDataFrame(Dataset<Row> dataset) {
        Column struct = functions$.MODULE$.struct((Seq) this.kafkaConfig.keyColumns().map(new TableWriterKafka$$anonfun$2(this, dataset), Seq$.MODULE$.canBuildFrom()));
        Column struct2 = functions$.MODULE$.struct(Predef$.MODULE$.wrapRefArray((Object[]) Predef$.MODULE$.refArrayOps(dataset.columns()).map(new TableWriterKafka$$anonfun$3(this, dataset), Array$.MODULE$.canBuildFrom(ClassTag$.MODULE$.apply(Column.class)))));
        return dataset.select(Predef$.MODULE$.wrapRefArray(new Column[]{za.co.absa.abris.avro.functions$.MODULE$.to_avro(struct, getAvroConfig(struct, (NamingStrategy) this.kafkaConfig.keyNamingStrategy().get(), true, this.kafkaConfig.keySchemaId())).as(symbol$2), za.co.absa.abris.avro.functions$.MODULE$.to_avro(struct2, getAvroConfig(struct2, this.kafkaConfig.valueNamingStrategy(), false, this.kafkaConfig.valueSchemaId())).as(symbol$1)}));
    }

    public int registerSchema(Column column, Map<String, String> map, NamingStrategy namingStrategy, boolean z) {
        Schema fixNullableFields = AvroUtils$.MODULE$.fixNullableFields(AvroUtils$.MODULE$.convertSparkToAvroSchema(column.expr().dataType()));
        SchemaManager create = SchemaManagerFactory$.MODULE$.create(map);
        SchemaSubject subject = namingStrategy.getSubject(this.topicName, z);
        int register = create.register(subject, fixNullableFields);
        log().info(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"Schema for subject '", "' is registered with id = ", ""})).s(Predef$.MODULE$.genericWrapArray(new Object[]{subject, BoxesRunTime.boxToInteger(register)})));
        return register;
    }

    public ToAvroConfig getAvroConfig(Column column, NamingStrategy namingStrategy, boolean z, Option<Object> option) {
        int registerSchema;
        Map<String, String> $plus$plus = Predef$.MODULE$.Map().apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(AbrisConfig$.MODULE$.SCHEMA_REGISTRY_URL()), this.kafkaConfig.schemaRegistryUrl())})).$plus$plus(this.kafkaConfig.schemaRegistryExtraOptions());
        ConfigUtils$.MODULE$.logExtraOptions("Schema registry options", $plus$plus, Predef$.MODULE$.Set().apply(Predef$.MODULE$.wrapRefArray(new String[]{"basic.auth.user.info"})));
        if (option instanceof Some) {
            registerSchema = BoxesRunTime.unboxToInt(((Some) option).x());
        } else {
            if (!None$.MODULE$.equals(option)) {
                throw new MatchError(option);
            }
            registerSchema = registerSchema(column, $plus$plus, namingStrategy, z);
        }
        return AbrisConfig$.MODULE$.toConfluentAvro().downloadSchemaById(registerSchema).usingSchemaRegistry($plus$plus);
    }

    public TableWriterKafka(String str, KafkaWriterConfig kafkaWriterConfig, Map<String, String> map, SparkSession sparkSession) {
        this.topicName = str;
        this.kafkaConfig = kafkaWriterConfig;
        this.extraOptions = map;
    }
}
