package com.netflix.atlas.stream;

import akka.NotUsed;
import akka.stream.ThrottleMode$Shaping$;
import akka.stream.scaladsl.Source;
import akka.stream.scaladsl.Source$;
import akka.stream.stage.GraphStageLogic;
import akka.stream.stage.InHandler;
import akka.stream.stage.OutHandler;
import com.fasterxml.jackson.module.scala.JavaTypeable$;
import com.netflix.atlas.akka.DiagnosticMessage$;
import com.netflix.atlas.akka.StreamOps;
import com.netflix.atlas.eval.stream.Evaluator;
import com.netflix.atlas.json.Json$;
import com.netflix.atlas.stream.DataSourceValidator;
import java.util.UUID;
import org.reactivestreams.Publisher;
import scala.MatchError;
import scala.Tuple2;
import scala.collection.immutable.List;
import scala.concurrent.duration.package;
import scala.concurrent.duration.package$;
import scala.reflect.ClassTag$;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.util.Left;
import scala.util.Right;

/* compiled from: EvalFlow.scala */
/* loaded from: input_file:com/netflix/atlas/stream/EvalFlow$$anon$1.class */
public final class EvalFlow$$anon$1 extends GraphStageLogic implements InHandler, OutHandler {
    private final String streamId;
    private StreamOps.SourceQueue<Evaluator.MessageEnvelope> queue;
    private Publisher<Evaluator.MessageEnvelope> pub;
    private boolean messageSourcePushed;
    private final /* synthetic */ EvalFlow $outer;

    public void onDownstreamFinish() throws Exception {
        OutHandler.onDownstreamFinish$(this);
    }

    private String streamId() {
        return this.streamId;
    }

    private StreamOps.SourceQueue<Evaluator.MessageEnvelope> queue() {
        return this.queue;
    }

    private void queue_$eq(StreamOps.SourceQueue<Evaluator.MessageEnvelope> sourceQueue) {
        this.queue = sourceQueue;
    }

    private Publisher<Evaluator.MessageEnvelope> pub() {
        return this.pub;
    }

    private void pub_$eq(Publisher<Evaluator.MessageEnvelope> publisher) {
        this.pub = publisher;
    }

    private boolean messageSourcePushed() {
        return this.messageSourcePushed;
    }

    private void messageSourcePushed_$eq(boolean z) {
        this.messageSourcePushed = z;
    }

    public void preStart() {
        this.$outer.com$netflix$atlas$stream$EvalFlow$$evalService.unregister(streamId());
        Tuple2<StreamOps.SourceQueue<Evaluator.MessageEnvelope>, Publisher<Evaluator.MessageEnvelope>> register = this.$outer.com$netflix$atlas$stream$EvalFlow$$evalService.register(streamId());
        if (register == null) {
            throw new MatchError(register);
        }
        Tuple2 tuple2 = new Tuple2((StreamOps.SourceQueue) register._1(), (Publisher) register._2());
        StreamOps.SourceQueue<Evaluator.MessageEnvelope> sourceQueue = (StreamOps.SourceQueue) tuple2._1();
        Publisher<Evaluator.MessageEnvelope> publisher = (Publisher) tuple2._2();
        queue_$eq(sourceQueue);
        pub_$eq(publisher);
        queue().offer(new Evaluator.MessageEnvelope("_", DiagnosticMessage$.MODULE$.info(new StringBuilder(25).append("Connected with streamId: ").append(streamId()).toString())));
        pull(this.$outer.com$netflix$atlas$stream$EvalFlow$$in());
    }

    public void onPull() {
        if (messageSourcePushed()) {
            completeStage();
        } else {
            push(this.$outer.com$netflix$atlas$stream$EvalFlow$$out(), sourceWithHeartbeat());
            messageSourcePushed_$eq(true);
        }
    }

    private Source<Evaluator.MessageEnvelope, NotUsed> sourceWithHeartbeat() {
        return Source$.MODULE$.fromPublisher(pub()).merge(Source$.MODULE$.repeat(new Evaluator.MessageEnvelope("_", DiagnosticMessage$.MODULE$.info("heartbeat"))).throttle(1, new package.DurationInt(package$.MODULE$.DurationInt(5)).seconds(), 1, ThrottleMode$Shaping$.MODULE$), true);
    }

    public void onPush() {
        Boolean bool;
        Left validate = this.$outer.com$netflix$atlas$stream$EvalFlow$$validator.validate((String) grab(this.$outer.com$netflix$atlas$stream$EvalFlow$$in()));
        if (validate instanceof Left) {
            bool = BoxesRunTime.boxToBoolean(queue().offer(new Evaluator.MessageEnvelope("_", DiagnosticMessage$.MODULE$.error(Json$.MODULE$.encode((List) validate.value(), JavaTypeable$.MODULE$.collectionJavaTypeable(JavaTypeable$.MODULE$.gen0JavaTypeable(ClassTag$.MODULE$.apply(DataSourceValidator.IdAndError.class)), ClassTag$.MODULE$.apply(List.class)))))));
        } else {
            if (!(validate instanceof Right)) {
                throw new MatchError(validate);
            }
            Evaluator.DataSources dataSources = (Evaluator.DataSources) ((Right) validate).value();
            queue().offer(new Evaluator.MessageEnvelope("_", DiagnosticMessage$.MODULE$.info("Validation Passed")));
            this.$outer.com$netflix$atlas$stream$EvalFlow$$evalService.updateDataSources(streamId(), dataSources);
            bool = BoxedUnit.UNIT;
        }
        pull(this.$outer.com$netflix$atlas$stream$EvalFlow$$in());
    }

    public void onUpstreamFinish() {
        super.completeStage();
        this.$outer.com$netflix$atlas$stream$EvalFlow$$evalService.unregister(streamId());
    }

    public void onUpstreamFailure(Throwable th) {
        super.failStage(th);
        this.$outer.com$netflix$atlas$stream$EvalFlow$$evalService.unregister(streamId());
    }

    public void onDownstreamFinish(Throwable th) {
        OutHandler.onDownstreamFinish$(this, th);
        this.$outer.com$netflix$atlas$stream$EvalFlow$$evalService.unregister(streamId());
    }

    /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
    public EvalFlow$$anon$1(EvalFlow evalFlow) {
        super(evalFlow.m1shape());
        if (evalFlow == null) {
            throw null;
        }
        this.$outer = evalFlow;
        InHandler.$init$(this);
        OutHandler.$init$(this);
        this.streamId = UUID.randomUUID().toString();
        this.messageSourcePushed = false;
        setHandlers(evalFlow.com$netflix$atlas$stream$EvalFlow$$in(), evalFlow.com$netflix$atlas$stream$EvalFlow$$out(), this);
    }
}
