package it.agilelab.bigdata.wasp.producers;

import akka.actor.Actor;
import akka.actor.ActorContext;
import akka.actor.ActorRef;
import akka.actor.PoisonPill$;
import akka.actor.Props$;
import akka.actor.SupervisorStrategy;
import akka.actor.package$;
import akka.cluster.Cluster;
import akka.cluster.Cluster$;
import akka.routing.BalancingPool;
import akka.routing.BalancingPool$;
import it.agilelab.bigdata.wasp.core.WaspSystem$;
import it.agilelab.bigdata.wasp.core.kafka.CheckOrCreateTopic;
import it.agilelab.bigdata.wasp.core.logging.Logging;
import it.agilelab.bigdata.wasp.core.logging.WaspLogger;
import it.agilelab.bigdata.wasp.core.utils.ConfigManager$;
import it.agilelab.bigdata.wasp.models.ProducerModel;
import it.agilelab.bigdata.wasp.models.TopicModel;
import it.agilelab.bigdata.wasp.repository.core.bl.ProducerBL;
import it.agilelab.bigdata.wasp.repository.core.bl.TopicBL;
import java.lang.invoke.MethodHandles;
import java.lang.invoke.MethodType;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import scala.Option;
import scala.PartialFunction;
import scala.collection.immutable.Iterable;
import scala.collection.immutable.Iterable$;
import scala.concurrent.ExecutionContext$Implicits$;
import scala.concurrent.Future;
import scala.concurrent.Future$;
import scala.concurrent.duration.FiniteDuration;
import scala.concurrent.duration.package;
import scala.reflect.ClassTag$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.ScalaRunTime$;
import scala.runtime.StructuralCallSite;
import scala.util.Either;

/* compiled from: ProducerGuardian.scala */
@ScalaSignature(bytes = "\u0006\u0001\u0005Md!B\u000e\u001d\u0003\u00039\u0003\u0002\u0003 \u0001\u0005\u0003\u0005\u000b\u0011B \t\u0011Q\u0003!\u0011!Q\u0001\nUCQ\u0001\u0019\u0001\u0005\u0002\u0005DqA\u001b\u0001C\u0002\u001b\u00051\u000eC\u0005m\u0001\u0001\u0007\t\u0019!C\u0001[\"IA\u000f\u0001a\u0001\u0002\u0004%\t!\u001e\u0005\nw\u0002\u0001\r\u0011!Q!\n9D\u0011\u0002 \u0001A\u0002\u0003\u0007I\u0011A?\t\u0017\u0005%\u0001\u00011AA\u0002\u0013\u0005\u00111\u0002\u0005\u000b\u0003\u001f\u0001\u0001\u0019!A!B\u0013q\bBCA\t\u0001\u0001\u0007\t\u0019!C\u0001W\"Y\u00111\u0003\u0001A\u0002\u0003\u0007I\u0011AA\u000b\u0011)\tI\u0002\u0001a\u0001\u0002\u0003\u0006K!\u0016\u0005\f\u00037\u0001\u0001\u0019!a\u0001\n\u0003\ti\u0002C\u0006\u0002&\u0001\u0001\r\u00111A\u0005\u0002\u0005\u001d\u0002bCA\u0016\u0001\u0001\u0007\t\u0011)Q\u0005\u0003?A\u0011\"!\f\u0001\u0005\u0004%\t!a\f\t\u0011\u0005m\u0002\u0001)A\u0005\u0003cAq!!\u0010\u0001\t\u0003\ny\u0004C\u0004\u0002B\u0001!\t%a\u0010\t\u000f\u0005\r\u0003\u0001\"\u0011\u0002F!9\u0011Q\u000b\u0001\u0005\u0002\u0005\u0015\u0003bBA,\u0001\u0011\u0005\u0011Q\t\u0005\b\u00033\u0002a\u0011AA \u0011\u001d\tY\u0006\u0001C\u0001\u0003\u007fAq!!\u0018\u0001\t\u0003\tyF\u0001\tQe>$WoY3s\u000fV\f'\u000fZ5b]*\u0011QDH\u0001\naJ|G-^2feNT!a\b\u0011\u0002\t]\f7\u000f\u001d\u0006\u0003C\t\nqAY5hI\u0006$\u0018M\u0003\u0002$I\u0005A\u0011mZ5mK2\f'MC\u0001&\u0003\tIGo\u0001\u0001\u0014\t\u0001AcF\u000e\t\u0003S1j\u0011A\u000b\u0006\u0002W\u0005)1oY1mC&\u0011QF\u000b\u0002\u0007\u0003:L(+\u001a4\u0011\u0005=\"T\"\u0001\u0019\u000b\u0005E\u0012\u0014!B1di>\u0014(\"A\u001a\u0002\t\u0005\\7.Y\u0005\u0003kA\u0012Q!Q2u_J\u0004\"a\u000e\u001f\u000e\u0003aR!!\u000f\u001e\u0002\u000f1|wmZ5oO*\u00111HH\u0001\u0005G>\u0014X-\u0003\u0002>q\t9Aj\\4hS:<\u0017aA3omJ\u0011\u0001\t\u000b\u0004\u0005\u0003\u0002\u0001qH\u0001\u0007=e\u00164\u0017N\\3nK:$h\bC\u0004D\u0001\n\u0007i\u0011\u0001#\u0002\u0015A\u0014x\u000eZ;dKJ\u0014E*F\u0001F!\t1E*D\u0001H\u0015\tA\u0015*\u0001\u0002cY*\u00111H\u0013\u0006\u0003\u0017z\t!B]3q_NLGo\u001c:z\u0013\tiuI\u0001\u0006Qe>$WoY3s\u00052Cqa\u0014!C\u0002\u001b\u0005\u0001+A\u0004u_BL7M\u0011'\u0016\u0003E\u0003\"A\u0012*\n\u0005M;%a\u0002+pa&\u001c'\tT\u0001\raJ|G-^2fe:\u000bW.\u001a\t\u0003-vs!aV.\u0011\u0005aSS\"A-\u000b\u0005i3\u0013A\u0002\u001fs_>$h(\u0003\u0002]U\u00051\u0001K]3eK\u001aL!AX0\u0003\rM#(/\u001b8h\u0015\ta&&\u0001\u0004=S:LGO\u0010\u000b\u0004E\u0012L\u0007CA2\u0001\u001b\u0005a\u0002\"\u0002 \u0004\u0001\u0004)'C\u00014)\r\u0011\t\u0005\u0001A3\t\u000f\r3'\u0019!D\u0001\t\"9qJ\u001ab\u0001\u000e\u0003\u0001\u0006\"\u0002+\u0004\u0001\u0004)\u0016\u0001\u00028b[\u0016,\u0012!V\u0001\taJ|G-^2feV\ta\u000e\u0005\u0002pe6\t\u0001O\u0003\u0002r=\u00051Qn\u001c3fYNL!a\u001d9\u0003\u001bA\u0013x\u000eZ;dKJlu\u000eZ3m\u00031\u0001(o\u001c3vG\u0016\u0014x\fJ3r)\t1\u0018\u0010\u0005\u0002*o&\u0011\u0001P\u000b\u0002\u0005+:LG\u000fC\u0004{\r\u0005\u0005\t\u0019\u00018\u0002\u0007a$\u0013'A\u0005qe>$WoY3sA\u0005y\u0011m]:pG&\fG/\u001a3U_BL7-F\u0001\u007f!\u0011Is0a\u0001\n\u0007\u0005\u0005!F\u0001\u0004PaRLwN\u001c\t\u0004_\u0006\u0015\u0011bAA\u0004a\nQAk\u001c9jG6{G-\u001a7\u0002'\u0005\u001c8o\\2jCR,G\rV8qS\u000e|F%Z9\u0015\u0007Y\fi\u0001C\u0004{\u0013\u0005\u0005\t\u0019\u0001@\u0002!\u0005\u001c8o\\2jCR,G\rV8qS\u000e\u0004\u0013a\u0003:pkR,'o\u00188b[\u0016\fqB]8vi\u0016\u0014xL\\1nK~#S-\u001d\u000b\u0004m\u0006]\u0001b\u0002>\r\u0003\u0003\u0005\r!V\u0001\re>,H/\u001a:`]\u0006lW\rI\u0001\rW\u000647.Y0s_V$XM]\u000b\u0003\u0003?\u00012aLA\u0011\u0013\r\t\u0019\u0003\r\u0002\t\u0003\u000e$xN\u001d*fM\u0006\u00012.\u00194lC~\u0013x.\u001e;fe~#S-\u001d\u000b\u0004m\u0006%\u0002\u0002\u0003>\u0010\u0003\u0003\u0005\r!a\b\u0002\u001b-\fgm[1`e>,H/\u001a:!\u0003\u001d\u0019G.^:uKJ,\"!!\r\u0011\t\u0005M\u0012qG\u0007\u0003\u0003kQ1!!\f3\u0013\u0011\tI$!\u000e\u0003\u000f\rcWo\u001d;fe\u0006A1\r\\;ti\u0016\u0014\b%\u0001\u0005qe\u0016\u001cF/\u0019:u)\u00051\u0018\u0001\u00039pgR\u001cFo\u001c9\u0002\u000fI,7-Z5wKV\u0011\u0011q\t\t\u0005\u0003\u0013\nyED\u00020\u0003\u0017J1!!\u00141\u0003\u0015\t5\r^8s\u0013\u0011\t\t&a\u0015\u0003\u000fI+7-Z5wK*\u0019\u0011Q\n\u0019\u0002\u001bUt\u0017N\\5uS\u0006d\u0017N_3e\u0003-Ig.\u001b;jC2L'0\u001a3\u0002!M$\u0018M\u001d;DQ&dG-Q2u_J\u001c\u0018aD:u_B\u001c\u0005.\u001b7e\u0003\u000e$xN]:\u0002\u0015%t\u0017\u000e^5bY&TX\r\u0006\u0002\u0002bA1\u00111MA7+ZtA!!\u001a\u0002j9\u0019\u0001,a\u001a\n\u0003-J1!a\u001b+\u0003\u001d\u0001\u0018mY6bO\u0016LA!a\u001c\u0002r\t1Q)\u001b;iKJT1!a\u001b+\u0001")
/* loaded from: input_file:it/agilelab/bigdata/wasp/producers/ProducerGuardian.class */
public abstract class ProducerGuardian implements Actor, Logging {
    private final Object env;
    public final String it$agilelab$bigdata$wasp$producers$ProducerGuardian$$producerName;
    private ProducerModel producer;
    private Option<TopicModel> associatedTopic;
    private String router_name;
    private ActorRef kafka_router;
    private final Cluster cluster;
    private final WaspLogger logger;
    private final ActorContext context;
    private final ActorRef self;

    public static Method reflMethod$Method1(Class cls) {
        StructuralCallSite apply = (StructuralCallSite) StructuralCallSite.bootstrap(MethodHandles.lookup(), "apply", MethodType.methodType(StructuralCallSite.class), MethodType.methodType(Object.class)).dynamicInvoker().invoke() /* invoke-custom */;
        Method find = apply.find(cls);
        if (find != null) {
            return find;
        }
        Method ensureAccessible = ScalaRunTime$.MODULE$.ensureAccessible(cls.getMethod("producerBL", apply.parameterTypes()));
        apply.add(cls, ensureAccessible);
        return ensureAccessible;
    }

    public static Method reflMethod$Method2(Class cls) {
        StructuralCallSite apply = (StructuralCallSite) StructuralCallSite.bootstrap(MethodHandles.lookup(), "apply", MethodType.methodType(StructuralCallSite.class), MethodType.methodType(Object.class)).dynamicInvoker().invoke() /* invoke-custom */;
        Method find = apply.find(cls);
        if (find != null) {
            return find;
        }
        Method ensureAccessible = ScalaRunTime$.MODULE$.ensureAccessible(cls.getMethod("producerBL", apply.parameterTypes()));
        apply.add(cls, ensureAccessible);
        return ensureAccessible;
    }

    public static Method reflMethod$Method3(Class cls) {
        StructuralCallSite apply = (StructuralCallSite) StructuralCallSite.bootstrap(MethodHandles.lookup(), "apply", MethodType.methodType(StructuralCallSite.class), MethodType.methodType(Object.class)).dynamicInvoker().invoke() /* invoke-custom */;
        Method find = apply.find(cls);
        if (find != null) {
            return find;
        }
        Method ensureAccessible = ScalaRunTime$.MODULE$.ensureAccessible(cls.getMethod("topicBL", apply.parameterTypes()));
        apply.add(cls, ensureAccessible);
        return ensureAccessible;
    }

    public static Method reflMethod$Method4(Class cls) {
        StructuralCallSite apply = (StructuralCallSite) StructuralCallSite.bootstrap(MethodHandles.lookup(), "apply", MethodType.methodType(StructuralCallSite.class), MethodType.methodType(Object.class)).dynamicInvoker().invoke() /* invoke-custom */;
        Method find = apply.find(cls);
        if (find != null) {
            return find;
        }
        Method ensureAccessible = ScalaRunTime$.MODULE$.ensureAccessible(cls.getMethod("producerBL", apply.parameterTypes()));
        apply.add(cls, ensureAccessible);
        return ensureAccessible;
    }

    public static Method reflMethod$Method5(Class cls) {
        StructuralCallSite apply = (StructuralCallSite) StructuralCallSite.bootstrap(MethodHandles.lookup(), "apply", MethodType.methodType(StructuralCallSite.class), MethodType.methodType(Object.class)).dynamicInvoker().invoke() /* invoke-custom */;
        Method find = apply.find(cls);
        if (find != null) {
            return find;
        }
        Method ensureAccessible = ScalaRunTime$.MODULE$.ensureAccessible(cls.getMethod("producerBL", apply.parameterTypes()));
        apply.add(cls, ensureAccessible);
        return ensureAccessible;
    }

    public final ActorRef sender() {
        return Actor.sender$(this);
    }

    public void aroundReceive(PartialFunction<Object, BoxedUnit> partialFunction, Object obj) {
        Actor.aroundReceive$(this, partialFunction, obj);
    }

    public void aroundPreStart() {
        Actor.aroundPreStart$(this);
    }

    public void aroundPostStop() {
        Actor.aroundPostStop$(this);
    }

    public void aroundPreRestart(Throwable th, Option<Object> option) {
        Actor.aroundPreRestart$(this, th, option);
    }

    public void aroundPostRestart(Throwable th) {
        Actor.aroundPostRestart$(this, th);
    }

    public SupervisorStrategy supervisorStrategy() {
        return Actor.supervisorStrategy$(this);
    }

    public void preRestart(Throwable th, Option<Object> option) throws Exception {
        Actor.preRestart$(this, th, option);
    }

    public void postRestart(Throwable th) throws Exception {
        Actor.postRestart$(this, th);
    }

    public void unhandled(Object obj) {
        Actor.unhandled$(this, obj);
    }

    public WaspLogger logger() {
        return this.logger;
    }

    public void it$agilelab$bigdata$wasp$core$logging$Logging$_setter_$logger_$eq(WaspLogger waspLogger) {
        this.logger = waspLogger;
    }

    public ActorContext context() {
        return this.context;
    }

    public final ActorRef self() {
        return this.self;
    }

    public void akka$actor$Actor$_setter_$context_$eq(ActorContext actorContext) {
        this.context = actorContext;
    }

    public final void akka$actor$Actor$_setter_$self_$eq(ActorRef actorRef) {
        this.self = actorRef;
    }

    public abstract String name();

    public ProducerModel producer() {
        return this.producer;
    }

    public void producer_$eq(ProducerModel producerModel) {
        this.producer = producerModel;
    }

    public Option<TopicModel> associatedTopic() {
        return this.associatedTopic;
    }

    public void associatedTopic_$eq(Option<TopicModel> option) {
        this.associatedTopic = option;
    }

    public String router_name() {
        return this.router_name;
    }

    public void router_name_$eq(String str) {
        this.router_name = str;
    }

    public ActorRef kafka_router() {
        return this.kafka_router;
    }

    public void kafka_router_$eq(ActorRef actorRef) {
        this.kafka_router = actorRef;
    }

    public Cluster cluster() {
        return this.cluster;
    }

    public void preStart() {
        context().become(uninitialized());
    }

    public void postStop() {
        package$.MODULE$.actorRef2Scala(kafka_router()).$bang(PoisonPill$.MODULE$, self());
    }

    public PartialFunction<Object, BoxedUnit> receive() {
        return uninitialized();
    }

    public PartialFunction<Object, BoxedUnit> uninitialized() {
        return new ProducerGuardian$$anonfun$uninitialized$1(this);
    }

    public PartialFunction<Object, BoxedUnit> initialized() {
        return new ProducerGuardian$$anonfun$initialized$1(this);
    }

    public abstract void startChildActors();

    public void stopChildActors() {
        logger().info(() -> {
            return new StringBuilder(41).append("Producer '").append(this.it$agilelab$bigdata$wasp$producers$ProducerGuardian$$producerName).append("': stopping actors bound to ").append(this.self()).append("...").toString();
        });
        FiniteDuration $minus = WaspSystem$.MODULE$.generalTimeout().duration().$minus(new package.DurationInt(scala.concurrent.duration.package$.MODULE$.DurationInt(15)).seconds());
        Future traverse = Future$.MODULE$.traverse(context().children(), actorRef -> {
            return akka.pattern.package$.MODULE$.gracefulStop(actorRef, $minus, akka.pattern.package$.MODULE$.gracefulStop$default$3());
        }, Iterable$.MODULE$.canBuildFrom(), ExecutionContext$Implicits$.MODULE$.global());
        ActorRef sender = sender();
        traverse.map(iterable -> {
            $anonfun$stopChildActors$3(this, sender, iterable);
            return BoxedUnit.UNIT;
        }, ExecutionContext$Implicits$.MODULE$.global());
    }

    public Either<String, BoxedUnit> initialize() {
        Object obj = this.env;
        try {
            Option byName = ((ProducerBL) reflMethod$Method1(obj.getClass()).invoke(obj, new Object[0])).getByName(this.it$agilelab$bigdata$wasp$producers$ProducerGuardian$$producerName);
            if (!byName.isDefined()) {
                String sb = new StringBuilder(30).append("Producer '").append(this.it$agilelab$bigdata$wasp$producers$ProducerGuardian$$producerName).append("': error not defined").toString();
                logger().error(() -> {
                    return sb;
                });
                return scala.package$.MODULE$.Left().apply(sb);
            }
            producer_$eq((ProducerModel) byName.get());
            if (!producer().hasOutput()) {
                String sb2 = new StringBuilder(34).append("Producer '").append(this.it$agilelab$bigdata$wasp$producers$ProducerGuardian$$producerName).append("': error undefined topic").toString();
                logger().error(() -> {
                    return sb2;
                });
                return scala.package$.MODULE$.Left().apply(sb2);
            }
            Object obj2 = this.env;
            try {
                ProducerBL producerBL = (ProducerBL) reflMethod$Method2(obj2.getClass()).invoke(obj2, new Object[0]);
                Object obj3 = this.env;
                try {
                    Option<TopicModel> topic = producerBL.getTopic((TopicBL) reflMethod$Method3(obj3.getClass()).invoke(obj3, new Object[0]), producer());
                    associatedTopic_$eq(topic);
                    logger().info(() -> {
                        return new StringBuilder(26).append("Producer '").append(this.it$agilelab$bigdata$wasp$producers$ProducerGuardian$$producerName).append("': topic found: ").append(this.associatedTopic()).toString();
                    });
                    if (!BoxesRunTime.unboxToBoolean(WaspSystem$.MODULE$.$qmark$qmark(WaspSystem$.MODULE$.kafkaAdminActor(), new CheckOrCreateTopic(((TopicModel) topic.get()).name(), ((TopicModel) topic.get()).partitions(), ((TopicModel) topic.get()).replicas()), WaspSystem$.MODULE$.$qmark$qmark$default$3()))) {
                        String sb3 = new StringBuilder(34).append("Producer '").append(this.it$agilelab$bigdata$wasp$producers$ProducerGuardian$$producerName).append("': error creating topic ").append(((TopicModel) topic.get()).name()).toString();
                        logger().error(() -> {
                            return sb3;
                        });
                        return scala.package$.MODULE$.Left().apply(sb3);
                    }
                    router_name_$eq(new StringBuilder(24).append("kafka-ingestion-router-").append(name()).append("-").append(System.currentTimeMillis()).toString());
                    kafka_router_$eq(WaspSystem$.MODULE$.actorSystem().actorOf(new BalancingPool(5, BalancingPool$.MODULE$.apply$default$2(), BalancingPool$.MODULE$.apply$default$3()).props(Props$.MODULE$.apply(() -> {
                        return new KafkaPublisherActor(ConfigManager$.MODULE$.getKafkaConfig());
                    }, ClassTag$.MODULE$.apply(KafkaPublisherActor.class))), router_name()));
                    context().become(initialized());
                    startChildActors();
                    Object obj4 = this.env;
                    try {
                        ((ProducerBL) reflMethod$Method4(obj4.getClass()).invoke(obj4, new Object[0])).setIsActive(producer(), true);
                        return scala.package$.MODULE$.Right().apply(BoxedUnit.UNIT);
                    } catch (InvocationTargetException e) {
                        throw e.getCause();
                    }
                } catch (InvocationTargetException e2) {
                    throw e2.getCause();
                }
            } catch (InvocationTargetException e3) {
                throw e3.getCause();
            }
        } catch (InvocationTargetException e4) {
            throw e4.getCause();
        }
    }

    public static final /* synthetic */ boolean $anonfun$stopChildActors$4(boolean z, boolean z2) {
        return z && z2;
    }

    public static final /* synthetic */ void $anonfun$stopChildActors$3(ProducerGuardian producerGuardian, ActorRef actorRef, Iterable iterable) {
        if (!BoxesRunTime.unboxToBoolean(iterable.reduceLeft((obj, obj2) -> {
            return BoxesRunTime.boxToBoolean($anonfun$stopChildActors$4(BoxesRunTime.unboxToBoolean(obj), BoxesRunTime.unboxToBoolean(obj2)));
        }))) {
            String sb = new StringBuilder(63).append("Producer '").append(producerGuardian.it$agilelab$bigdata$wasp$producers$ProducerGuardian$$producerName).append("': something went wrong! Unable to shutdown all nodes").toString();
            producerGuardian.logger().error(() -> {
                return sb;
            });
            package$.MODULE$.actorRef2Scala(actorRef).$bang(scala.package$.MODULE$.Left().apply(sb), producerGuardian.self());
            return;
        }
        producerGuardian.logger().info(() -> {
            return new StringBuilder(41).append("Producer '").append(producerGuardian.it$agilelab$bigdata$wasp$producers$ProducerGuardian$$producerName).append("': graceful shutdown completed.").toString();
        });
        Object obj3 = producerGuardian.env;
        try {
            ((ProducerBL) reflMethod$Method5(obj3.getClass()).invoke(obj3, new Object[0])).setIsActive(producerGuardian.producer(), false);
            producerGuardian.logger().info(() -> {
                return new StringBuilder(64).append("Producer '").append(producerGuardian.it$agilelab$bigdata$wasp$producers$ProducerGuardian$$producerName).append("': transitioning from 'initialized' to 'uninitialized'").toString();
            });
            package$.MODULE$.actorRef2Scala(producerGuardian.kafka_router()).$bang(PoisonPill$.MODULE$, producerGuardian.self());
            producerGuardian.context().become(producerGuardian.uninitialized());
            package$.MODULE$.actorRef2Scala(actorRef).$bang(scala.package$.MODULE$.Right().apply(BoxedUnit.UNIT), producerGuardian.self());
        } catch (InvocationTargetException e) {
            throw e.getCause();
        }
    }

    public ProducerGuardian(Object obj, String str) {
        this.env = obj;
        this.it$agilelab$bigdata$wasp$producers$ProducerGuardian$$producerName = str;
        Actor.$init$(this);
        Logging.$init$(this);
        this.cluster = Cluster$.MODULE$.apply(context().system());
    }
}
