package it.agilelab.bigdata.wasp.consumers.spark.plugins.elastic;

import akka.actor.ActorRef;
import it.agilelab.bigdata.wasp.consumers.spark.writers.SparkStructuredStreamingWriter;
import it.agilelab.bigdata.wasp.core.WaspSystem$;
import it.agilelab.bigdata.wasp.core.logging.Logging;
import it.agilelab.bigdata.wasp.core.logging.WaspLogger;
import it.agilelab.bigdata.wasp.core.utils.ElasticConfiguration;
import it.agilelab.bigdata.wasp.models.IndexModel;
import it.agilelab.bigdata.wasp.models.configuration.ElasticConfigModel;
import it.agilelab.bigdata.wasp.repository.core.bl.IndexBL;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.streaming.DataStreamWriter;
import scala.Option;
import scala.Option$;
import scala.Predef$;
import scala.Predef$ArrowAssoc$;
import scala.Tuple2;
import scala.collection.immutable.Map;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxesRunTime;

/* compiled from: ElasticWriters.scala */
@ScalaSignature(bytes = "\u0006\u0001\u0005\u001da\u0001B\u0004\t\u0001eA\u0001\u0002\u000e\u0001\u0003\u0002\u0003\u0006I!\u000e\u0005\t}\u0001\u0011\t\u0011)A\u0005\u007f!A!\n\u0001B\u0001B\u0003%1\n\u0003\u0005W\u0001\t\u0005\t\u0015!\u0003X\u0011\u0015y\u0006\u0001\"\u0001a\u0011\u00159\u0007\u0001\"\u0011i\u0005-*E.Y:uS\u000e\u001cX-\u0019:dQN\u0003\u0018M]6TiJ,8\r^;sK\u0012\u001cFO]3b[&twm\u0016:ji\u0016\u0014(BA\u0005\u000b\u0003\u001d)G.Y:uS\u000eT!a\u0003\u0007\u0002\u000fAdWoZ5og*\u0011QBD\u0001\u0006gB\f'o\u001b\u0006\u0003\u001fA\t\u0011bY8ogVlWM]:\u000b\u0005E\u0011\u0012\u0001B<bgBT!a\u0005\u000b\u0002\u000f\tLw\rZ1uC*\u0011QCF\u0001\tC\u001eLG.\u001a7bE*\tq#\u0001\u0002ji\u000e\u00011#\u0002\u0001\u001bA\u0019r\u0003CA\u000e\u001f\u001b\u0005a\"\"A\u000f\u0002\u000bM\u001c\u0017\r\\1\n\u0005}a\"AB!osJ+g\r\u0005\u0002\"I5\t!E\u0003\u0002$\u0019\u00059qO]5uKJ\u001c\u0018BA\u0013#\u0005y\u0019\u0006/\u0019:l'R\u0014Xo\u0019;ve\u0016$7\u000b\u001e:fC6LgnZ,sSR,'\u000f\u0005\u0002(Y5\t\u0001F\u0003\u0002*U\u0005)Q\u000f^5mg*\u00111\u0006E\u0001\u0005G>\u0014X-\u0003\u0002.Q\t!R\t\\1ti&\u001c7i\u001c8gS\u001e,(/\u0019;j_:\u0004\"a\f\u001a\u000e\u0003AR!!\r\u0016\u0002\u000f1|wmZ5oO&\u00111\u0007\r\u0002\b\u0019><w-\u001b8h\u0003\u001dIg\u000eZ3y\u00052\u0003\"A\u000e\u001f\u000e\u0003]R!\u0001O\u001d\u0002\u0005\td'BA\u0016;\u0015\tY\u0004#\u0001\u0006sKB|7/\u001b;pefL!!P\u001c\u0003\u000f%sG-\u001a=C\u0019\u0006\u00111o\u001d\t\u0003\u0001\"k\u0011!\u0011\u0006\u0003\u0005\u000e\u000b1a]9m\u0015\tiAI\u0003\u0002F\r\u00061\u0011\r]1dQ\u0016T\u0011aR\u0001\u0004_J<\u0017BA%B\u00051\u0019\u0006/\u0019:l'\u0016\u001c8/[8o\u0003\u0011q\u0017-\\3\u0011\u00051\u001bfBA'R!\tqE$D\u0001P\u0015\t\u0001\u0006$\u0001\u0004=e>|GOP\u0005\u0003%r\ta\u0001\u0015:fI\u00164\u0017B\u0001+V\u0005\u0019\u0019FO]5oO*\u0011!\u000bH\u0001\u0012K2\f7\u000f^5d\u0003\u0012l\u0017N\\!di>\u0014\bC\u0001-^\u001b\u0005I&B\u0001.\\\u0003\u0015\t7\r^8s\u0015\u0005a\u0016\u0001B1lW\u0006L!AX-\u0003\u0011\u0005\u001bGo\u001c:SK\u001a\fa\u0001P5oSRtD#B1dI\u00164\u0007C\u00012\u0001\u001b\u0005A\u0001\"\u0002\u001b\u0006\u0001\u0004)\u0004\"\u0002 \u0006\u0001\u0004y\u0004\"\u0002&\u0006\u0001\u0004Y\u0005\"\u0002,\u0006\u0001\u00049\u0016!B<sSR,GCA5s!\rQWn\\\u0007\u0002W*\u0011A.Q\u0001\ngR\u0014X-Y7j]\u001eL!A\\6\u0003!\u0011\u000bG/Y*ue\u0016\fWn\u0016:ji\u0016\u0014\bC\u0001!q\u0013\t\t\u0018IA\u0002S_^DQa\u001d\u0004A\u0002Q\faa\u001d;sK\u0006l\u0007cA;\u0002\u00029\u0011aO \b\u0003ovt!\u0001\u001f?\u000f\u0005e\\hB\u0001({\u0013\u00059\u0015BA#G\u0013\tiA)\u0003\u0002C\u0007&\u0011q0Q\u0001\ba\u0006\u001c7.Y4f\u0013\u0011\t\u0019!!\u0002\u0003\u0013\u0011\u000bG/\u0019$sC6,'BA@B\u0001")
/* loaded from: input_file:it/agilelab/bigdata/wasp/consumers/spark/plugins/elastic/ElasticsearchSparkStructuredStreamingWriter.class */
public class ElasticsearchSparkStructuredStreamingWriter implements SparkStructuredStreamingWriter, ElasticConfiguration, Logging {
    private final IndexBL indexBL;
    private final String name;
    private final ActorRef elasticAdminActor;
    private final WaspLogger logger;
    private ElasticConfigModel elasticConfig;
    private volatile boolean bitmap$0;

    public WaspLogger logger() {
        return this.logger;
    }

    public void it$agilelab$bigdata$wasp$core$logging$Logging$_setter_$logger_$eq(WaspLogger waspLogger) {
        this.logger = waspLogger;
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v0 */
    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Throwable] */
    /* JADX WARN: Type inference failed for: r0v8, types: [it.agilelab.bigdata.wasp.consumers.spark.plugins.elastic.ElasticsearchSparkStructuredStreamingWriter] */
    private ElasticConfigModel elasticConfig$lzycompute() {
        ?? r0 = this;
        synchronized (r0) {
            if (!this.bitmap$0) {
                this.elasticConfig = ElasticConfiguration.elasticConfig$(this);
                r0 = this;
                r0.bitmap$0 = true;
            }
        }
        return this.elasticConfig;
    }

    public ElasticConfigModel elasticConfig() {
        return !this.bitmap$0 ? elasticConfig$lzycompute() : this.elasticConfig;
    }

    public DataStreamWriter<Row> write(Dataset<Row> dataset) {
        Option byName = this.indexBL.getByName(this.name);
        if (!byName.isDefined()) {
            String sb = new StringBuilder(59).append("The index '").append(this.name).append("' does not exits pay ATTENTION spark won't start").toString();
            logger().error(() -> {
                return sb;
            });
            throw new Exception(sb);
        }
        IndexModel indexModel = (IndexModel) byName.get();
        String eventuallyTimedName = indexModel.eventuallyTimedName();
        String resource = indexModel.resource();
        logger().info(() -> {
            return new StringBuilder(57).append("Check or create the index model: '").append(indexModel.toString()).append(" with this index name: ").append(eventuallyTimedName).toString();
        });
        if (indexModel.schema().isEmpty()) {
            throw new Exception(new StringBuilder(51).append("There no define schema in the index configuration: ").append(indexModel).toString());
        }
        String lowerCase = indexModel.name().toLowerCase();
        String name = indexModel.name();
        if (lowerCase != null ? !lowerCase.equals(name) : name != null) {
            throw new Exception(new StringBuilder(38).append("The index name must be all lowercase: ").append(indexModel).toString());
        }
        Map $plus = Option$.MODULE$.option2Iterable(((IndexModel) byName.get()).idField().map(str -> {
            return new Tuple2("es.mapping.id", str);
        })).toMap(Predef$.MODULE$.$conforms()).$plus(Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("path"), resource));
        if (BoxesRunTime.unboxToBoolean(WaspSystem$.MODULE$.$qmark$qmark(this.elasticAdminActor, new CheckOrCreateIndex(eventuallyTimedName, indexModel.name(), indexModel.dataType(), indexModel.getJsonSchema()), WaspSystem$.MODULE$.$qmark$qmark$default$3()))) {
            return dataset.writeStream().options($plus).format("es");
        }
        String sb2 = new StringBuilder(52).append("Error creating elastic index: ").append(indexModel).append(" with this index name ").append(eventuallyTimedName).toString();
        logger().error(() -> {
            return sb2;
        });
        throw new Exception(sb2);
    }

    public ElasticsearchSparkStructuredStreamingWriter(IndexBL indexBL, SparkSession sparkSession, String str, ActorRef actorRef) {
        this.indexBL = indexBL;
        this.name = str;
        this.elasticAdminActor = actorRef;
        ElasticConfiguration.$init$(this);
        Logging.$init$(this);
    }
}
